diff --git a/acmc/phen.py b/acmc/phen.py index 05ac0d1f96fbe6908faea5b9fca0d7040aa2c41e..4ef51e66887dd8eeefb70ef211bf03fb89854411 100644 --- a/acmc/phen.py +++ b/acmc/phen.py @@ -46,6 +46,8 @@ DIVIDE_COL_ACTION = "divide_col" COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION] CODE_FILE_TYPES = [".xlsx", ".xls", ".csv"] +SOURCE_COL_SUFFIX = "_acmc_source" +TARGET_COL_SUFFIX = "_acmc_target" # config.yaml schema CONFIG_SCHEMA = { @@ -472,29 +474,23 @@ def process_actions(df, concept_set): # Perform QA Checks on columns individually and append to df -def preprocess_codes(df, concept_set, code_file_path, target_code_type=None): +def preprocess_source_concepts(df, concept_set, code_file_path): """Parses each column individually - Order and length will not be preserved!""" out = pd.DataFrame([]) # create output df to append to code_errors = [] # list of errors from processing - # TODO: Is there a better way of processing this action as it's distributed across - # different parts of the programme. - if ( - "actions" in concept_set["file"] - and "divide_col" in concept_set["file"]["actions"] - ): - divide_col_df = df[concept_set["file"]["actions"]["divide_col"]] - else: - divide_col_df = pd.DataFrame() + # remove unnamed columns due to extra commas, missing headers, or incorrect parsing + df = df.drop(columns=[col for col in df.columns if "Unnamed" in col]) # Preprocess codes code_types = parse.CodeTypeParser().code_types for code_type in concept_set["file"]["columns"]: parser = code_types[code_type] - logger.info(f"Processing {code_type} codes...") + logger.info(f"Processing {code_type} codes for {code_file_path}") - # get code types - codes = df[concept_set["file"]["columns"][code_type]].dropna() + # get codes by column name + source_col_name = concept_set["file"]["columns"][code_type] + codes = df[source_col_name].dropna() codes = codes.astype(str) # convert to string codes = codes.str.strip() # remove excess spaces @@ -504,62 +500,86 @@ def preprocess_codes(df, concept_set, code_file_path, target_code_type=None): code_errors.extend(errors) logger.warning(f"Codes validation failed with {len(errors)} errors") - # append to output dataframe + # add processed codes to df + new_col_name = f"{source_col_name}_SOURCE" + df = df.rename(columns={source_col_name: new_col_name}) + process_codes = pd.DataFrame({code_type: codes}).join(df) out = pd.concat( - [out, pd.DataFrame({code_type: codes}).join(divide_col_df)], + [out, process_codes], ignore_index=True, ) + logger.debug(out.head()) + return out, code_errors +def get_code_type_from_col_name(col_name): + return col_name.split("_")[0] + + # Translate Df with multiple codes into single code type Series -def translate_codes(df, target_code_type): - codes = pd.Series([], dtype=str) +def translate_codes(source_df, target_code_type, concept_name): + """Translates each source code type the source coding list into a target type and returns all conversions as a concept set""" + # codes = pd.DataFrame([], dtype=str) + codes = pd.DataFrame( + columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string" + ) # Convert codes to target type logger.info(f"Converting to target code type {target_code_type}") - for col_name in df.columns: + + for source_code_type in source_df.columns: + # if target code type is the same as thet source code type, no translation, just appending source as target - if col_name == target_code_type: + if source_code_type == target_code_type: + copy_df = pd.DataFrame( + { + "SOURCE_CONCEPT": source_df[source_code_type], + "SOURCE_CONCEPT_TYPE": source_code_type, + "CONCEPT": source_df[source_code_type], + } + ) + codes = pd.concat([codes, copy_df]) logger.debug( - f"Target code type {target_code_type} has source code types {len(df)}, copying rather than translating" + f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating" ) - codes = pd.concat([codes, df[target_code_type]]) else: - filename = f"{col_name}_to_{target_code_type}.parquet" + # get the translation filename using source to target code types + filename = f"{source_code_type}_to_{target_code_type}.parquet" map_path = trud.PROCESSED_PATH / filename + + # do the mapping if it exists if map_path.exists(): - col = df[col_name] + # get mapping df_map = pd.read_parquet(map_path) - # merge on corresponding codes and take target column - translated = pd.merge(col, df_map, how="left")[target_code_type] - # TODO: BUG mask does not match column - codes = pd.concat([codes, translated]) # merge to output - else: - logger.warning( - f"No mapping from {col_name} to {target_code_type}, file {str(map_path.resolve())} does not exist" + + # do mapping + translated_df = pd.merge( + source_df[source_code_type], df_map, how="left" ) - return codes + # normalise the output + translated_df.columns = ["SOURCE_CONCEPT", "CONCEPT"] + translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type + # add to list of codes + codes = pd.concat([codes, translated_df]) -# Append file's codes to output Df with concept -def map_file(df, target_code_type, out, concept_name): + else: + logger.warning( + f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" + ) - # translate codes - codes = translate_codes(df, target_code_type) codes = codes.dropna() # delete NaNs - # Append to output if translated - if len(codes) > 0: - codes = pd.DataFrame({"CONCEPT": codes}) - codes["CONCEPT_SET"] = np.repeat(concept_name.strip(), len(codes)) - out = pd.concat([out, codes]) + # added concept set type to output if any translations + if len(codes.index) > 0: + codes["CONCEPT_SET"] = concept_name else: logger.debug(f"No codes converted with target code type {target_code_type}") - return out + return codes def sql_row_exist(conn, table, column, value): @@ -662,7 +682,6 @@ def map(phen_dir, target_code_type): def map_target_code_type(phen_path, phenotype, target_code_type): - logger.debug(f"Target coding format: {target_code_type}") codes_path = phen_path / CODES_DIR # Create output dataframe @@ -680,21 +699,27 @@ def map_target_code_type(phen_path, phenotype, target_code_type): # process structural actions df = process_actions(df, concept_set) - # Preprocessing & Validation Checks - logger.debug("Processing and validating code formats") - df, errors = preprocess_codes( + # preprocessing and validate of source concepts + logger.debug("Processing and validating source concept codes") + df, errors = preprocess_source_concepts( df, concept_set, codes_file_path, - target_code_type=target_code_type, ) - logger.debug(f"Length of errors from preprocess {len(errors)}") + # create df with just the source code columns + source_column_names = list(concept_set["file"]["columns"].keys()) + source_df = df[source_column_names] + + logger.debug(source_df.columns) + logger.debug(source_df.head()) + + logger.debug(f"Length of errors from preprocess_source_concepts {len(errors)}") if len(errors) > 0: code_errors.extend(errors) logger.debug(f" Length of code_errors {len(code_errors)}") - # Map + # Map source concepts codes to target codes # if processing a source coding list with categorical data if ( "actions" in concept_set["file"] @@ -708,11 +733,21 @@ def map_target_code_type(phen_path, phenotype, target_code_type): for cat, grp in df_grp: if cat == concept_set["file"]["category"]: grp = grp.drop(columns=[divide_col]) # delete categorical column - out = map_file( - grp, target_code_type, out, concept_name=concept_set["name"] + source_df = grp[source_column_names] + trans_out = translate_codes( + source_df, + target_code_type=target_code_type, + concept_name=concept_set["name"], ) + out = pd.concat([out, trans_out]) else: - out = map_file(df, target_code_type, out, concept_name=concept_set["name"]) + source_df = df[source_column_names] + trans_out = translate_codes( + source_df, + target_code_type=target_code_type, + concept_name=concept_set["name"], + ) + out = pd.concat([out, trans_out]) if len(code_errors) > 0: logger.error(f"The map processing has {len(code_errors)} errors") @@ -728,31 +763,69 @@ def map_target_code_type(phen_path, phenotype, target_code_type): f"No output after map processing, check config {str(config_path.resolve())}" ) - # Final processing + # final processing out = out.reset_index(drop=True) out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"]) + out_count = len(out.index) + # added metadata + # Loop over each source_concept_type and perform the left join on all columns apart from source code columns + result_list = [] + source_column_names = list(concept_set["file"]["columns"].keys()) + for source_concept_type in source_column_names: + + # Filter output based on the current source_concept_type + out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type] + filtered_count = len(out_filtered_df.index) + + # Remove the source type columns except the current type will leave the metadata and the join + remove_types = [ + type for type in source_column_names if type != source_concept_type + ] + metadata_df = df.drop(columns=remove_types) + metadata_df = metadata_df.rename( + columns={source_concept_type: "SOURCE_CONCEPT"} + ) + metadata_df_count = len(metadata_df.index) + + # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata + result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT") + result_count = len(result.index) + + logger.debug( + f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}" + ) + + # Append the result to the result_list + result_list.append(result) + + # Concatenate all the results into a single DataFrame + final_out = pd.concat(result_list, ignore_index=True) + final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) + logger.debug( + f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}" + ) + # Save output to map directory output_filename = target_code_type + ".csv" map_path = phen_path / MAP_DIR / output_filename - out.to_csv(map_path, index=False) + final_out.to_csv(map_path, index=False) logger.info(f"Saved mapped concepts to {str(map_path.resolve())}") # save concept sets as separate files concept_set_path = phen_path / CSV_PATH / target_code_type - # empty the concept-set directory if it exists but keep the .git file - git_items = [".git", ".gitkeep"] + # empty the concept-set directory except for hiddle files, e.g. .git if concept_set_path.exists(): for item in concept_set_path.iterdir(): - if item not in git_items: + if not item.name.startswith("."): item.unlink() else: concept_set_path.mkdir(parents=True, exist_ok=True) # write each concept as a separate file - for name, concept in out.groupby("CONCEPT_SET"): + for name, concept in final_out.groupby("CONCEPT_SET"): concept = concept.sort_values(by="CONCEPT") # sort rows concept = concept.dropna(how="all", axis=1) # remove empty cols concept = concept.reindex(