diff --git a/acmc/phen.py b/acmc/phen.py index 4a48fd3053c285bf101fb37af7dfe4a17929bfa7..4ef51e66887dd8eeefb70ef211bf03fb89854411 100644 --- a/acmc/phen.py +++ b/acmc/phen.py @@ -472,6 +472,7 @@ def process_actions(df, concept_set): return df + # Perform QA Checks on columns individually and append to df def preprocess_source_concepts(df, concept_set, code_file_path): """Parses each column individually - Order and length will not be preserved!""" @@ -479,8 +480,8 @@ def preprocess_source_concepts(df, concept_set, code_file_path): code_errors = [] # list of errors from processing # remove unnamed columns due to extra commas, missing headers, or incorrect parsing - df = df.drop(columns=[col for col in df.columns if "Unnamed" in col]) - + df = df.drop(columns=[col for col in df.columns if "Unnamed" in col]) + # Preprocess codes code_types = parse.CodeTypeParser().code_types for code_type in concept_set["file"]["columns"]: @@ -498,8 +499,8 @@ def preprocess_source_concepts(df, concept_set, code_file_path): if len(errors) > 0: code_errors.extend(errors) logger.warning(f"Codes validation failed with {len(errors)} errors") - - # add processed codes to df + + # add processed codes to df new_col_name = f"{source_col_name}_SOURCE" df = df.rename(columns={source_col_name: new_col_name}) process_codes = pd.DataFrame({code_type: codes}).join(df) @@ -508,70 +509,77 @@ def preprocess_source_concepts(df, concept_set, code_file_path): ignore_index=True, ) - - logger.debug(out.head()) + logger.debug(out.head()) return out, code_errors + def get_code_type_from_col_name(col_name): return col_name.split("_")[0] - + # Translate Df with multiple codes into single code type Series -def translate_codes(df, source_code_types, target_code_type, concept_name): - codes = pd.Series([], dtype=str) +def translate_codes(source_df, target_code_type, concept_name): + """Translates each source code type the source coding list into a target type and returns all conversions as a concept set""" + # codes = pd.DataFrame([], dtype=str) + codes = pd.DataFrame( + columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string" + ) # Convert codes to target type logger.info(f"Converting to target code type {target_code_type}") - for source_code_type, source_code_column in source_code_types.items(): - + for source_code_type in source_df.columns: + # if target code type is the same as thet source code type, no translation, just appending source as target - if source_code_type == target_code_type: - codes = pd.concat([codes, df[source_code_type]]) + if source_code_type == target_code_type: + copy_df = pd.DataFrame( + { + "SOURCE_CONCEPT": source_df[source_code_type], + "SOURCE_CONCEPT_TYPE": source_code_type, + "CONCEPT": source_df[source_code_type], + } + ) + codes = pd.concat([codes, copy_df]) logger.debug( - f"Target code type {target_code_type} is the same as source code type {len(df)}, copying codes rather than translating" - ) - else: + f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating" + ) + else: # get the translation filename using source to target code types filename = f"{source_code_type}_to_{target_code_type}.parquet" map_path = trud.PROCESSED_PATH / filename # do the mapping if it exists if map_path.exists(): - # get column from processed df and rename the series to what's needed for parquet - - col = df[source_code_type] - df_map = pd.read_parquet(map_path) - # merge on corresponding codes and take target column - translated_df = pd.merge(col, df_map, how="left")[target_code_type] - logger.debug("TRANSLATE") - logger.debug(translated_df.head()) - - # TODO: BUG mask does not match column - codes = pd.concat([codes, translated_df]) - logger.debug("CODES") - logger.debug(codes.head()) - + # get mapping + df_map = pd.read_parquet(map_path) + + # do mapping + translated_df = pd.merge( + source_df[source_code_type], df_map, how="left" + ) + + # normalise the output + translated_df.columns = ["SOURCE_CONCEPT", "CONCEPT"] + translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type + + # add to list of codes + codes = pd.concat([codes, translated_df]) + else: logger.warning( f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" ) - logger.debug("FULL CONCATED") - logger.debug(codes.head()) codes = codes.dropna() # delete NaNs - logger.debug(f"FULL CONCATED {len(codes.index)}") - - # Append to output if translated + # added concept set type to output if any translations if len(codes.index) > 0: - codes = pd.DataFrame({"CONCEPT": codes}) - codes["CONCEPT_SET"] = np.repeat(concept_name.strip(), len(codes)) + codes["CONCEPT_SET"] = concept_name else: logger.debug(f"No codes converted with target code type {target_code_type}") - return codes + return codes def sql_row_exist(conn, table, column, value): @@ -664,7 +672,7 @@ def map(phen_dir, target_code_type): f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}" ) - if target_code_type is not None: + if target_code_type is not None: map_target_code_type(phen_path, phenotype, target_code_type) else: for t in phenotype["map"]: @@ -699,6 +707,13 @@ def map_target_code_type(phen_path, phenotype, target_code_type): codes_file_path, ) + # create df with just the source code columns + source_column_names = list(concept_set["file"]["columns"].keys()) + source_df = df[source_column_names] + + logger.debug(source_df.columns) + logger.debug(source_df.head()) + logger.debug(f"Length of errors from preprocess_source_concepts {len(errors)}") if len(errors) > 0: code_errors.extend(errors) @@ -718,26 +733,21 @@ def map_target_code_type(phen_path, phenotype, target_code_type): for cat, grp in df_grp: if cat == concept_set["file"]["category"]: grp = grp.drop(columns=[divide_col]) # delete categorical column + source_df = grp[source_column_names] trans_out = translate_codes( - grp, - source_code_types=concept_set["file"]["columns"], + source_df, target_code_type=target_code_type, - concept_name=concept_set["name"] + concept_name=concept_set["name"], ) out = pd.concat([out, trans_out]) else: + source_df = df[source_column_names] trans_out = translate_codes( - df, - source_code_types=concept_set["file"]["columns"], - target_code_type=target_code_type, - concept_name=concept_set["name"]) - out = pd.concat([out, trans_out]) - logger.debug("TEST") - logger.debug(df.columns) - logger.debug(df.head) - - logger.debug(out.columns) - logger.debug(out.head) + source_df, + target_code_type=target_code_type, + concept_name=concept_set["name"], + ) + out = pd.concat([out, trans_out]) if len(code_errors) > 0: logger.error(f"The map processing has {len(code_errors)} errors") @@ -753,15 +763,54 @@ def map_target_code_type(phen_path, phenotype, target_code_type): f"No output after map processing, check config {str(config_path.resolve())}" ) - # Final processing + # final processing out = out.reset_index(drop=True) out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"]) + out_count = len(out.index) + # added metadata + # Loop over each source_concept_type and perform the left join on all columns apart from source code columns + result_list = [] + source_column_names = list(concept_set["file"]["columns"].keys()) + for source_concept_type in source_column_names: + + # Filter output based on the current source_concept_type + out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type] + filtered_count = len(out_filtered_df.index) + + # Remove the source type columns except the current type will leave the metadata and the join + remove_types = [ + type for type in source_column_names if type != source_concept_type + ] + metadata_df = df.drop(columns=remove_types) + metadata_df = metadata_df.rename( + columns={source_concept_type: "SOURCE_CONCEPT"} + ) + metadata_df_count = len(metadata_df.index) + + # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata + result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT") + result_count = len(result.index) + + logger.debug( + f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}" + ) + + # Append the result to the result_list + result_list.append(result) + + # Concatenate all the results into a single DataFrame + final_out = pd.concat(result_list, ignore_index=True) + final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) + logger.debug( + f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}" + ) + # Save output to map directory output_filename = target_code_type + ".csv" map_path = phen_path / MAP_DIR / output_filename - out.to_csv(map_path, index=False) + final_out.to_csv(map_path, index=False) logger.info(f"Saved mapped concepts to {str(map_path.resolve())}") # save concept sets as separate files @@ -776,7 +825,7 @@ def map_target_code_type(phen_path, phenotype, target_code_type): concept_set_path.mkdir(parents=True, exist_ok=True) # write each concept as a separate file - for name, concept in out.groupby("CONCEPT_SET"): + for name, concept in final_out.groupby("CONCEPT_SET"): concept = concept.sort_values(by="CONCEPT") # sort rows concept = concept.dropna(how="all", axis=1) # remove empty cols concept = concept.reindex(