Skip to content
Snippets Groups Projects
Commit 7ccf8657 authored by mjbonifa's avatar mjbonifa
Browse files

fix: readded metadata to phen output. This is now done after the translation...

fix: readded metadata to phen output. This is now done after the translation based on any column that is not defined as a code column if the file configuraiton. The previous implementation required the user to include which metadata columns to KEEP in the configuration file which resulted in a long config. Of course we may want to exlcude metadata columns we don't want in the final phenotype but if we need that we'll have to implement exclusion. In additon we now retain the source code before it's preprocessed so that we know the starting point whichw as previously lost. Closes 28
parent 5b3b1d29
Branches
No related tags found
No related merge requests found
...@@ -472,6 +472,7 @@ def process_actions(df, concept_set): ...@@ -472,6 +472,7 @@ def process_actions(df, concept_set):
return df return df
# Perform QA Checks on columns individually and append to df # Perform QA Checks on columns individually and append to df
def preprocess_source_concepts(df, concept_set, code_file_path): def preprocess_source_concepts(df, concept_set, code_file_path):
"""Parses each column individually - Order and length will not be preserved!""" """Parses each column individually - Order and length will not be preserved!"""
...@@ -479,8 +480,8 @@ def preprocess_source_concepts(df, concept_set, code_file_path): ...@@ -479,8 +480,8 @@ def preprocess_source_concepts(df, concept_set, code_file_path):
code_errors = [] # list of errors from processing code_errors = [] # list of errors from processing
# remove unnamed columns due to extra commas, missing headers, or incorrect parsing # remove unnamed columns due to extra commas, missing headers, or incorrect parsing
df = df.drop(columns=[col for col in df.columns if "Unnamed" in col]) df = df.drop(columns=[col for col in df.columns if "Unnamed" in col])
# Preprocess codes # Preprocess codes
code_types = parse.CodeTypeParser().code_types code_types = parse.CodeTypeParser().code_types
for code_type in concept_set["file"]["columns"]: for code_type in concept_set["file"]["columns"]:
...@@ -498,8 +499,8 @@ def preprocess_source_concepts(df, concept_set, code_file_path): ...@@ -498,8 +499,8 @@ def preprocess_source_concepts(df, concept_set, code_file_path):
if len(errors) > 0: if len(errors) > 0:
code_errors.extend(errors) code_errors.extend(errors)
logger.warning(f"Codes validation failed with {len(errors)} errors") logger.warning(f"Codes validation failed with {len(errors)} errors")
# add processed codes to df # add processed codes to df
new_col_name = f"{source_col_name}_SOURCE" new_col_name = f"{source_col_name}_SOURCE"
df = df.rename(columns={source_col_name: new_col_name}) df = df.rename(columns={source_col_name: new_col_name})
process_codes = pd.DataFrame({code_type: codes}).join(df) process_codes = pd.DataFrame({code_type: codes}).join(df)
...@@ -508,70 +509,77 @@ def preprocess_source_concepts(df, concept_set, code_file_path): ...@@ -508,70 +509,77 @@ def preprocess_source_concepts(df, concept_set, code_file_path):
ignore_index=True, ignore_index=True,
) )
logger.debug(out.head())
logger.debug(out.head())
return out, code_errors return out, code_errors
def get_code_type_from_col_name(col_name): def get_code_type_from_col_name(col_name):
return col_name.split("_")[0] return col_name.split("_")[0]
# Translate Df with multiple codes into single code type Series # Translate Df with multiple codes into single code type Series
def translate_codes(df, source_code_types, target_code_type, concept_name): def translate_codes(source_df, target_code_type, concept_name):
codes = pd.Series([], dtype=str) """Translates each source code type the source coding list into a target type and returns all conversions as a concept set"""
# codes = pd.DataFrame([], dtype=str)
codes = pd.DataFrame(
columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string"
)
# Convert codes to target type # Convert codes to target type
logger.info(f"Converting to target code type {target_code_type}") logger.info(f"Converting to target code type {target_code_type}")
for source_code_type, source_code_column in source_code_types.items(): for source_code_type in source_df.columns:
# if target code type is the same as thet source code type, no translation, just appending source as target # if target code type is the same as thet source code type, no translation, just appending source as target
if source_code_type == target_code_type: if source_code_type == target_code_type:
codes = pd.concat([codes, df[source_code_type]]) copy_df = pd.DataFrame(
{
"SOURCE_CONCEPT": source_df[source_code_type],
"SOURCE_CONCEPT_TYPE": source_code_type,
"CONCEPT": source_df[source_code_type],
}
)
codes = pd.concat([codes, copy_df])
logger.debug( logger.debug(
f"Target code type {target_code_type} is the same as source code type {len(df)}, copying codes rather than translating" f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating"
) )
else: else:
# get the translation filename using source to target code types # get the translation filename using source to target code types
filename = f"{source_code_type}_to_{target_code_type}.parquet" filename = f"{source_code_type}_to_{target_code_type}.parquet"
map_path = trud.PROCESSED_PATH / filename map_path = trud.PROCESSED_PATH / filename
# do the mapping if it exists # do the mapping if it exists
if map_path.exists(): if map_path.exists():
# get column from processed df and rename the series to what's needed for parquet # get mapping
df_map = pd.read_parquet(map_path)
col = df[source_code_type]
df_map = pd.read_parquet(map_path) # do mapping
# merge on corresponding codes and take target column translated_df = pd.merge(
translated_df = pd.merge(col, df_map, how="left")[target_code_type] source_df[source_code_type], df_map, how="left"
logger.debug("TRANSLATE") )
logger.debug(translated_df.head())
# normalise the output
# TODO: BUG mask does not match column translated_df.columns = ["SOURCE_CONCEPT", "CONCEPT"]
codes = pd.concat([codes, translated_df]) translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type
logger.debug("CODES")
logger.debug(codes.head()) # add to list of codes
codes = pd.concat([codes, translated_df])
else: else:
logger.warning( logger.warning(
f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist"
) )
logger.debug("FULL CONCATED")
logger.debug(codes.head())
codes = codes.dropna() # delete NaNs codes = codes.dropna() # delete NaNs
logger.debug(f"FULL CONCATED {len(codes.index)}") # added concept set type to output if any translations
# Append to output if translated
if len(codes.index) > 0: if len(codes.index) > 0:
codes = pd.DataFrame({"CONCEPT": codes}) codes["CONCEPT_SET"] = concept_name
codes["CONCEPT_SET"] = np.repeat(concept_name.strip(), len(codes))
else: else:
logger.debug(f"No codes converted with target code type {target_code_type}") logger.debug(f"No codes converted with target code type {target_code_type}")
return codes return codes
def sql_row_exist(conn, table, column, value): def sql_row_exist(conn, table, column, value):
...@@ -664,7 +672,7 @@ def map(phen_dir, target_code_type): ...@@ -664,7 +672,7 @@ def map(phen_dir, target_code_type):
f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}" f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}"
) )
if target_code_type is not None: if target_code_type is not None:
map_target_code_type(phen_path, phenotype, target_code_type) map_target_code_type(phen_path, phenotype, target_code_type)
else: else:
for t in phenotype["map"]: for t in phenotype["map"]:
...@@ -699,6 +707,13 @@ def map_target_code_type(phen_path, phenotype, target_code_type): ...@@ -699,6 +707,13 @@ def map_target_code_type(phen_path, phenotype, target_code_type):
codes_file_path, codes_file_path,
) )
# create df with just the source code columns
source_column_names = list(concept_set["file"]["columns"].keys())
source_df = df[source_column_names]
logger.debug(source_df.columns)
logger.debug(source_df.head())
logger.debug(f"Length of errors from preprocess_source_concepts {len(errors)}") logger.debug(f"Length of errors from preprocess_source_concepts {len(errors)}")
if len(errors) > 0: if len(errors) > 0:
code_errors.extend(errors) code_errors.extend(errors)
...@@ -718,26 +733,21 @@ def map_target_code_type(phen_path, phenotype, target_code_type): ...@@ -718,26 +733,21 @@ def map_target_code_type(phen_path, phenotype, target_code_type):
for cat, grp in df_grp: for cat, grp in df_grp:
if cat == concept_set["file"]["category"]: if cat == concept_set["file"]["category"]:
grp = grp.drop(columns=[divide_col]) # delete categorical column grp = grp.drop(columns=[divide_col]) # delete categorical column
source_df = grp[source_column_names]
trans_out = translate_codes( trans_out = translate_codes(
grp, source_df,
source_code_types=concept_set["file"]["columns"],
target_code_type=target_code_type, target_code_type=target_code_type,
concept_name=concept_set["name"] concept_name=concept_set["name"],
) )
out = pd.concat([out, trans_out]) out = pd.concat([out, trans_out])
else: else:
source_df = df[source_column_names]
trans_out = translate_codes( trans_out = translate_codes(
df, source_df,
source_code_types=concept_set["file"]["columns"], target_code_type=target_code_type,
target_code_type=target_code_type, concept_name=concept_set["name"],
concept_name=concept_set["name"]) )
out = pd.concat([out, trans_out]) out = pd.concat([out, trans_out])
logger.debug("TEST")
logger.debug(df.columns)
logger.debug(df.head)
logger.debug(out.columns)
logger.debug(out.head)
if len(code_errors) > 0: if len(code_errors) > 0:
logger.error(f"The map processing has {len(code_errors)} errors") logger.error(f"The map processing has {len(code_errors)} errors")
...@@ -753,15 +763,54 @@ def map_target_code_type(phen_path, phenotype, target_code_type): ...@@ -753,15 +763,54 @@ def map_target_code_type(phen_path, phenotype, target_code_type):
f"No output after map processing, check config {str(config_path.resolve())}" f"No output after map processing, check config {str(config_path.resolve())}"
) )
# Final processing # final processing
out = out.reset_index(drop=True) out = out.reset_index(drop=True)
out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"]) out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"])
out_count = len(out.index)
# added metadata
# Loop over each source_concept_type and perform the left join on all columns apart from source code columns
result_list = []
source_column_names = list(concept_set["file"]["columns"].keys())
for source_concept_type in source_column_names:
# Filter output based on the current source_concept_type
out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type]
filtered_count = len(out_filtered_df.index)
# Remove the source type columns except the current type will leave the metadata and the join
remove_types = [
type for type in source_column_names if type != source_concept_type
]
metadata_df = df.drop(columns=remove_types)
metadata_df = metadata_df.rename(
columns={source_concept_type: "SOURCE_CONCEPT"}
)
metadata_df_count = len(metadata_df.index)
# Perform the left join with df2 on SOURCE_CONCEPT to add the metadata
result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT")
result_count = len(result.index)
logger.debug(
f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}"
)
# Append the result to the result_list
result_list.append(result)
# Concatenate all the results into a single DataFrame
final_out = pd.concat(result_list, ignore_index=True)
final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
logger.debug(
f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}"
)
# Save output to map directory # Save output to map directory
output_filename = target_code_type + ".csv" output_filename = target_code_type + ".csv"
map_path = phen_path / MAP_DIR / output_filename map_path = phen_path / MAP_DIR / output_filename
out.to_csv(map_path, index=False) final_out.to_csv(map_path, index=False)
logger.info(f"Saved mapped concepts to {str(map_path.resolve())}") logger.info(f"Saved mapped concepts to {str(map_path.resolve())}")
# save concept sets as separate files # save concept sets as separate files
...@@ -776,7 +825,7 @@ def map_target_code_type(phen_path, phenotype, target_code_type): ...@@ -776,7 +825,7 @@ def map_target_code_type(phen_path, phenotype, target_code_type):
concept_set_path.mkdir(parents=True, exist_ok=True) concept_set_path.mkdir(parents=True, exist_ok=True)
# write each concept as a separate file # write each concept as a separate file
for name, concept in out.groupby("CONCEPT_SET"): for name, concept in final_out.groupby("CONCEPT_SET"):
concept = concept.sort_values(by="CONCEPT") # sort rows concept = concept.sort_values(by="CONCEPT") # sort rows
concept = concept.dropna(how="all", axis=1) # remove empty cols concept = concept.dropna(how="all", axis=1) # remove empty cols
concept = concept.reindex( concept = concept.reindex(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment