diff --git a/acmc/phen.py b/acmc/phen.py index 05ac0d1f96fbe6908faea5b9fca0d7040aa2c41e..4a48fd3053c285bf101fb37af7dfe4a17929bfa7 100644 --- a/acmc/phen.py +++ b/acmc/phen.py @@ -46,6 +46,8 @@ DIVIDE_COL_ACTION = "divide_col" COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION] CODE_FILE_TYPES = [".xlsx", ".xls", ".csv"] +SOURCE_COL_SUFFIX = "_acmc_source" +TARGET_COL_SUFFIX = "_acmc_target" # config.yaml schema CONFIG_SCHEMA = { @@ -470,31 +472,24 @@ def process_actions(df, concept_set): return df - # Perform QA Checks on columns individually and append to df -def preprocess_codes(df, concept_set, code_file_path, target_code_type=None): +def preprocess_source_concepts(df, concept_set, code_file_path): """Parses each column individually - Order and length will not be preserved!""" out = pd.DataFrame([]) # create output df to append to code_errors = [] # list of errors from processing - # TODO: Is there a better way of processing this action as it's distributed across - # different parts of the programme. - if ( - "actions" in concept_set["file"] - and "divide_col" in concept_set["file"]["actions"] - ): - divide_col_df = df[concept_set["file"]["actions"]["divide_col"]] - else: - divide_col_df = pd.DataFrame() - + # remove unnamed columns due to extra commas, missing headers, or incorrect parsing + df = df.drop(columns=[col for col in df.columns if "Unnamed" in col]) + # Preprocess codes code_types = parse.CodeTypeParser().code_types for code_type in concept_set["file"]["columns"]: parser = code_types[code_type] - logger.info(f"Processing {code_type} codes...") + logger.info(f"Processing {code_type} codes for {code_file_path}") - # get code types - codes = df[concept_set["file"]["columns"][code_type]].dropna() + # get codes by column name + source_col_name = concept_set["file"]["columns"][code_type] + codes = df[source_col_name].dropna() codes = codes.astype(str) # convert to string codes = codes.str.strip() # remove excess spaces @@ -503,63 +498,80 @@ def preprocess_codes(df, concept_set, code_file_path, target_code_type=None): if len(errors) > 0: code_errors.extend(errors) logger.warning(f"Codes validation failed with {len(errors)} errors") - - # append to output dataframe + + # add processed codes to df + new_col_name = f"{source_col_name}_SOURCE" + df = df.rename(columns={source_col_name: new_col_name}) + process_codes = pd.DataFrame({code_type: codes}).join(df) out = pd.concat( - [out, pd.DataFrame({code_type: codes}).join(divide_col_df)], + [out, process_codes], ignore_index=True, ) + + logger.debug(out.head()) + return out, code_errors +def get_code_type_from_col_name(col_name): + return col_name.split("_")[0] + # Translate Df with multiple codes into single code type Series -def translate_codes(df, target_code_type): +def translate_codes(df, source_code_types, target_code_type, concept_name): codes = pd.Series([], dtype=str) # Convert codes to target type logger.info(f"Converting to target code type {target_code_type}") - for col_name in df.columns: + + for source_code_type, source_code_column in source_code_types.items(): + # if target code type is the same as thet source code type, no translation, just appending source as target - if col_name == target_code_type: + if source_code_type == target_code_type: + codes = pd.concat([codes, df[source_code_type]]) logger.debug( - f"Target code type {target_code_type} has source code types {len(df)}, copying rather than translating" - ) - codes = pd.concat([codes, df[target_code_type]]) - else: - filename = f"{col_name}_to_{target_code_type}.parquet" + f"Target code type {target_code_type} is the same as source code type {len(df)}, copying codes rather than translating" + ) + else: + # get the translation filename using source to target code types + filename = f"{source_code_type}_to_{target_code_type}.parquet" map_path = trud.PROCESSED_PATH / filename + + # do the mapping if it exists if map_path.exists(): - col = df[col_name] - df_map = pd.read_parquet(map_path) + # get column from processed df and rename the series to what's needed for parquet + + col = df[source_code_type] + df_map = pd.read_parquet(map_path) # merge on corresponding codes and take target column - translated = pd.merge(col, df_map, how="left")[target_code_type] + translated_df = pd.merge(col, df_map, how="left")[target_code_type] + logger.debug("TRANSLATE") + logger.debug(translated_df.head()) + # TODO: BUG mask does not match column - codes = pd.concat([codes, translated]) # merge to output + codes = pd.concat([codes, translated_df]) + logger.debug("CODES") + logger.debug(codes.head()) + else: logger.warning( - f"No mapping from {col_name} to {target_code_type}, file {str(map_path.resolve())} does not exist" + f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" ) + logger.debug("FULL CONCATED") + logger.debug(codes.head()) - return codes - - -# Append file's codes to output Df with concept -def map_file(df, target_code_type, out, concept_name): - - # translate codes - codes = translate_codes(df, target_code_type) codes = codes.dropna() # delete NaNs + logger.debug(f"FULL CONCATED {len(codes.index)}") + # Append to output if translated - if len(codes) > 0: + if len(codes.index) > 0: codes = pd.DataFrame({"CONCEPT": codes}) - codes["CONCEPT_SET"] = np.repeat(concept_name.strip(), len(codes)) - out = pd.concat([out, codes]) + codes["CONCEPT_SET"] = np.repeat(concept_name.strip(), len(codes)) else: logger.debug(f"No codes converted with target code type {target_code_type}") - return out + return codes def sql_row_exist(conn, table, column, value): @@ -652,7 +664,7 @@ def map(phen_dir, target_code_type): f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}" ) - if target_code_type is not None: + if target_code_type is not None: map_target_code_type(phen_path, phenotype, target_code_type) else: for t in phenotype["map"]: @@ -662,7 +674,6 @@ def map(phen_dir, target_code_type): def map_target_code_type(phen_path, phenotype, target_code_type): - logger.debug(f"Target coding format: {target_code_type}") codes_path = phen_path / CODES_DIR # Create output dataframe @@ -680,21 +691,20 @@ def map_target_code_type(phen_path, phenotype, target_code_type): # process structural actions df = process_actions(df, concept_set) - # Preprocessing & Validation Checks - logger.debug("Processing and validating code formats") - df, errors = preprocess_codes( + # preprocessing and validate of source concepts + logger.debug("Processing and validating source concept codes") + df, errors = preprocess_source_concepts( df, concept_set, codes_file_path, - target_code_type=target_code_type, ) - logger.debug(f"Length of errors from preprocess {len(errors)}") + logger.debug(f"Length of errors from preprocess_source_concepts {len(errors)}") if len(errors) > 0: code_errors.extend(errors) logger.debug(f" Length of code_errors {len(code_errors)}") - # Map + # Map source concepts codes to target codes # if processing a source coding list with categorical data if ( "actions" in concept_set["file"] @@ -708,11 +718,26 @@ def map_target_code_type(phen_path, phenotype, target_code_type): for cat, grp in df_grp: if cat == concept_set["file"]["category"]: grp = grp.drop(columns=[divide_col]) # delete categorical column - out = map_file( - grp, target_code_type, out, concept_name=concept_set["name"] + trans_out = translate_codes( + grp, + source_code_types=concept_set["file"]["columns"], + target_code_type=target_code_type, + concept_name=concept_set["name"] ) + out = pd.concat([out, trans_out]) else: - out = map_file(df, target_code_type, out, concept_name=concept_set["name"]) + trans_out = translate_codes( + df, + source_code_types=concept_set["file"]["columns"], + target_code_type=target_code_type, + concept_name=concept_set["name"]) + out = pd.concat([out, trans_out]) + logger.debug("TEST") + logger.debug(df.columns) + logger.debug(df.head) + + logger.debug(out.columns) + logger.debug(out.head) if len(code_errors) > 0: logger.error(f"The map processing has {len(code_errors)} errors") @@ -742,11 +767,10 @@ def map_target_code_type(phen_path, phenotype, target_code_type): # save concept sets as separate files concept_set_path = phen_path / CSV_PATH / target_code_type - # empty the concept-set directory if it exists but keep the .git file - git_items = [".git", ".gitkeep"] + # empty the concept-set directory except for hiddle files, e.g. .git if concept_set_path.exists(): for item in concept_set_path.iterdir(): - if item not in git_items: + if not item.name.startswith("."): item.unlink() else: concept_set_path.mkdir(parents=True, exist_ok=True)