From 7ccf8657bdcd7118d7b4f45257ee4b9b156e5acb Mon Sep 17 00:00:00 2001 From: Michael Boniface <m.j.boniface@soton.ac.uk> Date: Thu, 27 Feb 2025 19:12:23 +0000 Subject: [PATCH] fix: readded metadata to phen output. This is now done after the translation based on any column that is not defined as a code column if the file configuraiton. The previous implementation required the user to include which metadata columns to KEEP in the configuration file which resulted in a long config. Of course we may want to exlcude metadata columns we don't want in the final phenotype but if we need that we'll have to implement exclusion. In additon we now retain the source code before it's preprocessed so that we know the starting point whichw as previously lost. Closes 28 --- acmc/phen.py | 161 +++++++++++++++++++++++++++++++++------------------ 1 file changed, 105 insertions(+), 56 deletions(-) diff --git a/acmc/phen.py b/acmc/phen.py index 4a48fd3..4ef51e6 100644 --- a/acmc/phen.py +++ b/acmc/phen.py @@ -472,6 +472,7 @@ def process_actions(df, concept_set): return df + # Perform QA Checks on columns individually and append to df def preprocess_source_concepts(df, concept_set, code_file_path): """Parses each column individually - Order and length will not be preserved!""" @@ -479,8 +480,8 @@ def preprocess_source_concepts(df, concept_set, code_file_path): code_errors = [] # list of errors from processing # remove unnamed columns due to extra commas, missing headers, or incorrect parsing - df = df.drop(columns=[col for col in df.columns if "Unnamed" in col]) - + df = df.drop(columns=[col for col in df.columns if "Unnamed" in col]) + # Preprocess codes code_types = parse.CodeTypeParser().code_types for code_type in concept_set["file"]["columns"]: @@ -498,8 +499,8 @@ def preprocess_source_concepts(df, concept_set, code_file_path): if len(errors) > 0: code_errors.extend(errors) logger.warning(f"Codes validation failed with {len(errors)} errors") - - # add processed codes to df + + # add processed codes to df new_col_name = f"{source_col_name}_SOURCE" df = df.rename(columns={source_col_name: new_col_name}) process_codes = pd.DataFrame({code_type: codes}).join(df) @@ -508,70 +509,77 @@ def preprocess_source_concepts(df, concept_set, code_file_path): ignore_index=True, ) - - logger.debug(out.head()) + logger.debug(out.head()) return out, code_errors + def get_code_type_from_col_name(col_name): return col_name.split("_")[0] - + # Translate Df with multiple codes into single code type Series -def translate_codes(df, source_code_types, target_code_type, concept_name): - codes = pd.Series([], dtype=str) +def translate_codes(source_df, target_code_type, concept_name): + """Translates each source code type the source coding list into a target type and returns all conversions as a concept set""" + # codes = pd.DataFrame([], dtype=str) + codes = pd.DataFrame( + columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string" + ) # Convert codes to target type logger.info(f"Converting to target code type {target_code_type}") - for source_code_type, source_code_column in source_code_types.items(): - + for source_code_type in source_df.columns: + # if target code type is the same as thet source code type, no translation, just appending source as target - if source_code_type == target_code_type: - codes = pd.concat([codes, df[source_code_type]]) + if source_code_type == target_code_type: + copy_df = pd.DataFrame( + { + "SOURCE_CONCEPT": source_df[source_code_type], + "SOURCE_CONCEPT_TYPE": source_code_type, + "CONCEPT": source_df[source_code_type], + } + ) + codes = pd.concat([codes, copy_df]) logger.debug( - f"Target code type {target_code_type} is the same as source code type {len(df)}, copying codes rather than translating" - ) - else: + f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating" + ) + else: # get the translation filename using source to target code types filename = f"{source_code_type}_to_{target_code_type}.parquet" map_path = trud.PROCESSED_PATH / filename # do the mapping if it exists if map_path.exists(): - # get column from processed df and rename the series to what's needed for parquet - - col = df[source_code_type] - df_map = pd.read_parquet(map_path) - # merge on corresponding codes and take target column - translated_df = pd.merge(col, df_map, how="left")[target_code_type] - logger.debug("TRANSLATE") - logger.debug(translated_df.head()) - - # TODO: BUG mask does not match column - codes = pd.concat([codes, translated_df]) - logger.debug("CODES") - logger.debug(codes.head()) - + # get mapping + df_map = pd.read_parquet(map_path) + + # do mapping + translated_df = pd.merge( + source_df[source_code_type], df_map, how="left" + ) + + # normalise the output + translated_df.columns = ["SOURCE_CONCEPT", "CONCEPT"] + translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type + + # add to list of codes + codes = pd.concat([codes, translated_df]) + else: logger.warning( f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" ) - logger.debug("FULL CONCATED") - logger.debug(codes.head()) codes = codes.dropna() # delete NaNs - logger.debug(f"FULL CONCATED {len(codes.index)}") - - # Append to output if translated + # added concept set type to output if any translations if len(codes.index) > 0: - codes = pd.DataFrame({"CONCEPT": codes}) - codes["CONCEPT_SET"] = np.repeat(concept_name.strip(), len(codes)) + codes["CONCEPT_SET"] = concept_name else: logger.debug(f"No codes converted with target code type {target_code_type}") - return codes + return codes def sql_row_exist(conn, table, column, value): @@ -664,7 +672,7 @@ def map(phen_dir, target_code_type): f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}" ) - if target_code_type is not None: + if target_code_type is not None: map_target_code_type(phen_path, phenotype, target_code_type) else: for t in phenotype["map"]: @@ -699,6 +707,13 @@ def map_target_code_type(phen_path, phenotype, target_code_type): codes_file_path, ) + # create df with just the source code columns + source_column_names = list(concept_set["file"]["columns"].keys()) + source_df = df[source_column_names] + + logger.debug(source_df.columns) + logger.debug(source_df.head()) + logger.debug(f"Length of errors from preprocess_source_concepts {len(errors)}") if len(errors) > 0: code_errors.extend(errors) @@ -718,26 +733,21 @@ def map_target_code_type(phen_path, phenotype, target_code_type): for cat, grp in df_grp: if cat == concept_set["file"]["category"]: grp = grp.drop(columns=[divide_col]) # delete categorical column + source_df = grp[source_column_names] trans_out = translate_codes( - grp, - source_code_types=concept_set["file"]["columns"], + source_df, target_code_type=target_code_type, - concept_name=concept_set["name"] + concept_name=concept_set["name"], ) out = pd.concat([out, trans_out]) else: + source_df = df[source_column_names] trans_out = translate_codes( - df, - source_code_types=concept_set["file"]["columns"], - target_code_type=target_code_type, - concept_name=concept_set["name"]) - out = pd.concat([out, trans_out]) - logger.debug("TEST") - logger.debug(df.columns) - logger.debug(df.head) - - logger.debug(out.columns) - logger.debug(out.head) + source_df, + target_code_type=target_code_type, + concept_name=concept_set["name"], + ) + out = pd.concat([out, trans_out]) if len(code_errors) > 0: logger.error(f"The map processing has {len(code_errors)} errors") @@ -753,15 +763,54 @@ def map_target_code_type(phen_path, phenotype, target_code_type): f"No output after map processing, check config {str(config_path.resolve())}" ) - # Final processing + # final processing out = out.reset_index(drop=True) out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"]) + out_count = len(out.index) + # added metadata + # Loop over each source_concept_type and perform the left join on all columns apart from source code columns + result_list = [] + source_column_names = list(concept_set["file"]["columns"].keys()) + for source_concept_type in source_column_names: + + # Filter output based on the current source_concept_type + out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type] + filtered_count = len(out_filtered_df.index) + + # Remove the source type columns except the current type will leave the metadata and the join + remove_types = [ + type for type in source_column_names if type != source_concept_type + ] + metadata_df = df.drop(columns=remove_types) + metadata_df = metadata_df.rename( + columns={source_concept_type: "SOURCE_CONCEPT"} + ) + metadata_df_count = len(metadata_df.index) + + # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata + result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT") + result_count = len(result.index) + + logger.debug( + f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}" + ) + + # Append the result to the result_list + result_list.append(result) + + # Concatenate all the results into a single DataFrame + final_out = pd.concat(result_list, ignore_index=True) + final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) + logger.debug( + f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}" + ) + # Save output to map directory output_filename = target_code_type + ".csv" map_path = phen_path / MAP_DIR / output_filename - out.to_csv(map_path, index=False) + final_out.to_csv(map_path, index=False) logger.info(f"Saved mapped concepts to {str(map_path.resolve())}") # save concept sets as separate files @@ -776,7 +825,7 @@ def map_target_code_type(phen_path, phenotype, target_code_type): concept_set_path.mkdir(parents=True, exist_ok=True) # write each concept as a separate file - for name, concept in out.groupby("CONCEPT_SET"): + for name, concept in final_out.groupby("CONCEPT_SET"): concept = concept.sort_values(by="CONCEPT") # sort rows concept = concept.dropna(how="all", axis=1) # remove empty cols concept = concept.reindex( -- GitLab