Skip to content
Snippets Groups Projects
Commit 51f46741 authored by mjbonifa's avatar mjbonifa
Browse files

Merge branch '28-fix-add-metadata-from-codes-back-into-mapping' into 'dev'

fix: started to add the metadata back in, the translation function is trickly...

Closes #28

See merge request meldb/concepts-processing!26
parents e08cfa07 7ccf8657
No related branches found
No related tags found
No related merge requests found
......@@ -46,6 +46,8 @@ DIVIDE_COL_ACTION = "divide_col"
COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION]
CODE_FILE_TYPES = [".xlsx", ".xls", ".csv"]
SOURCE_COL_SUFFIX = "_acmc_source"
TARGET_COL_SUFFIX = "_acmc_target"
# config.yaml schema
CONFIG_SCHEMA = {
......@@ -472,29 +474,23 @@ def process_actions(df, concept_set):
# Perform QA Checks on columns individually and append to df
def preprocess_codes(df, concept_set, code_file_path, target_code_type=None):
def preprocess_source_concepts(df, concept_set, code_file_path):
"""Parses each column individually - Order and length will not be preserved!"""
out = pd.DataFrame([]) # create output df to append to
code_errors = [] # list of errors from processing
# TODO: Is there a better way of processing this action as it's distributed across
# different parts of the programme.
if (
"actions" in concept_set["file"]
and "divide_col" in concept_set["file"]["actions"]
):
divide_col_df = df[concept_set["file"]["actions"]["divide_col"]]
else:
divide_col_df = pd.DataFrame()
# remove unnamed columns due to extra commas, missing headers, or incorrect parsing
df = df.drop(columns=[col for col in df.columns if "Unnamed" in col])
# Preprocess codes
code_types = parse.CodeTypeParser().code_types
for code_type in concept_set["file"]["columns"]:
parser = code_types[code_type]
logger.info(f"Processing {code_type} codes...")
logger.info(f"Processing {code_type} codes for {code_file_path}")
# get code types
codes = df[concept_set["file"]["columns"][code_type]].dropna()
# get codes by column name
source_col_name = concept_set["file"]["columns"][code_type]
codes = df[source_col_name].dropna()
codes = codes.astype(str) # convert to string
codes = codes.str.strip() # remove excess spaces
......@@ -504,62 +500,86 @@ def preprocess_codes(df, concept_set, code_file_path, target_code_type=None):
code_errors.extend(errors)
logger.warning(f"Codes validation failed with {len(errors)} errors")
# append to output dataframe
# add processed codes to df
new_col_name = f"{source_col_name}_SOURCE"
df = df.rename(columns={source_col_name: new_col_name})
process_codes = pd.DataFrame({code_type: codes}).join(df)
out = pd.concat(
[out, pd.DataFrame({code_type: codes}).join(divide_col_df)],
[out, process_codes],
ignore_index=True,
)
logger.debug(out.head())
return out, code_errors
def get_code_type_from_col_name(col_name):
return col_name.split("_")[0]
# Translate Df with multiple codes into single code type Series
def translate_codes(df, target_code_type):
codes = pd.Series([], dtype=str)
def translate_codes(source_df, target_code_type, concept_name):
"""Translates each source code type the source coding list into a target type and returns all conversions as a concept set"""
# codes = pd.DataFrame([], dtype=str)
codes = pd.DataFrame(
columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string"
)
# Convert codes to target type
logger.info(f"Converting to target code type {target_code_type}")
for col_name in df.columns:
for source_code_type in source_df.columns:
# if target code type is the same as thet source code type, no translation, just appending source as target
if col_name == target_code_type:
if source_code_type == target_code_type:
copy_df = pd.DataFrame(
{
"SOURCE_CONCEPT": source_df[source_code_type],
"SOURCE_CONCEPT_TYPE": source_code_type,
"CONCEPT": source_df[source_code_type],
}
)
codes = pd.concat([codes, copy_df])
logger.debug(
f"Target code type {target_code_type} has source code types {len(df)}, copying rather than translating"
f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating"
)
codes = pd.concat([codes, df[target_code_type]])
else:
filename = f"{col_name}_to_{target_code_type}.parquet"
# get the translation filename using source to target code types
filename = f"{source_code_type}_to_{target_code_type}.parquet"
map_path = trud.PROCESSED_PATH / filename
# do the mapping if it exists
if map_path.exists():
col = df[col_name]
# get mapping
df_map = pd.read_parquet(map_path)
# merge on corresponding codes and take target column
translated = pd.merge(col, df_map, how="left")[target_code_type]
# TODO: BUG mask does not match column
codes = pd.concat([codes, translated]) # merge to output
else:
logger.warning(
f"No mapping from {col_name} to {target_code_type}, file {str(map_path.resolve())} does not exist"
# do mapping
translated_df = pd.merge(
source_df[source_code_type], df_map, how="left"
)
return codes
# normalise the output
translated_df.columns = ["SOURCE_CONCEPT", "CONCEPT"]
translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type
# add to list of codes
codes = pd.concat([codes, translated_df])
# Append file's codes to output Df with concept
def map_file(df, target_code_type, out, concept_name):
else:
logger.warning(
f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist"
)
# translate codes
codes = translate_codes(df, target_code_type)
codes = codes.dropna() # delete NaNs
# Append to output if translated
if len(codes) > 0:
codes = pd.DataFrame({"CONCEPT": codes})
codes["CONCEPT_SET"] = np.repeat(concept_name.strip(), len(codes))
out = pd.concat([out, codes])
# added concept set type to output if any translations
if len(codes.index) > 0:
codes["CONCEPT_SET"] = concept_name
else:
logger.debug(f"No codes converted with target code type {target_code_type}")
return out
return codes
def sql_row_exist(conn, table, column, value):
......@@ -662,7 +682,6 @@ def map(phen_dir, target_code_type):
def map_target_code_type(phen_path, phenotype, target_code_type):
logger.debug(f"Target coding format: {target_code_type}")
codes_path = phen_path / CODES_DIR
# Create output dataframe
......@@ -680,21 +699,27 @@ def map_target_code_type(phen_path, phenotype, target_code_type):
# process structural actions
df = process_actions(df, concept_set)
# Preprocessing & Validation Checks
logger.debug("Processing and validating code formats")
df, errors = preprocess_codes(
# preprocessing and validate of source concepts
logger.debug("Processing and validating source concept codes")
df, errors = preprocess_source_concepts(
df,
concept_set,
codes_file_path,
target_code_type=target_code_type,
)
logger.debug(f"Length of errors from preprocess {len(errors)}")
# create df with just the source code columns
source_column_names = list(concept_set["file"]["columns"].keys())
source_df = df[source_column_names]
logger.debug(source_df.columns)
logger.debug(source_df.head())
logger.debug(f"Length of errors from preprocess_source_concepts {len(errors)}")
if len(errors) > 0:
code_errors.extend(errors)
logger.debug(f" Length of code_errors {len(code_errors)}")
# Map
# Map source concepts codes to target codes
# if processing a source coding list with categorical data
if (
"actions" in concept_set["file"]
......@@ -708,11 +733,21 @@ def map_target_code_type(phen_path, phenotype, target_code_type):
for cat, grp in df_grp:
if cat == concept_set["file"]["category"]:
grp = grp.drop(columns=[divide_col]) # delete categorical column
out = map_file(
grp, target_code_type, out, concept_name=concept_set["name"]
source_df = grp[source_column_names]
trans_out = translate_codes(
source_df,
target_code_type=target_code_type,
concept_name=concept_set["name"],
)
out = pd.concat([out, trans_out])
else:
out = map_file(df, target_code_type, out, concept_name=concept_set["name"])
source_df = df[source_column_names]
trans_out = translate_codes(
source_df,
target_code_type=target_code_type,
concept_name=concept_set["name"],
)
out = pd.concat([out, trans_out])
if len(code_errors) > 0:
logger.error(f"The map processing has {len(code_errors)} errors")
......@@ -728,31 +763,69 @@ def map_target_code_type(phen_path, phenotype, target_code_type):
f"No output after map processing, check config {str(config_path.resolve())}"
)
# Final processing
# final processing
out = out.reset_index(drop=True)
out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"])
out_count = len(out.index)
# added metadata
# Loop over each source_concept_type and perform the left join on all columns apart from source code columns
result_list = []
source_column_names = list(concept_set["file"]["columns"].keys())
for source_concept_type in source_column_names:
# Filter output based on the current source_concept_type
out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type]
filtered_count = len(out_filtered_df.index)
# Remove the source type columns except the current type will leave the metadata and the join
remove_types = [
type for type in source_column_names if type != source_concept_type
]
metadata_df = df.drop(columns=remove_types)
metadata_df = metadata_df.rename(
columns={source_concept_type: "SOURCE_CONCEPT"}
)
metadata_df_count = len(metadata_df.index)
# Perform the left join with df2 on SOURCE_CONCEPT to add the metadata
result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT")
result_count = len(result.index)
logger.debug(
f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}"
)
# Append the result to the result_list
result_list.append(result)
# Concatenate all the results into a single DataFrame
final_out = pd.concat(result_list, ignore_index=True)
final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
logger.debug(
f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}"
)
# Save output to map directory
output_filename = target_code_type + ".csv"
map_path = phen_path / MAP_DIR / output_filename
out.to_csv(map_path, index=False)
final_out.to_csv(map_path, index=False)
logger.info(f"Saved mapped concepts to {str(map_path.resolve())}")
# save concept sets as separate files
concept_set_path = phen_path / CSV_PATH / target_code_type
# empty the concept-set directory if it exists but keep the .git file
git_items = [".git", ".gitkeep"]
# empty the concept-set directory except for hiddle files, e.g. .git
if concept_set_path.exists():
for item in concept_set_path.iterdir():
if item not in git_items:
if not item.name.startswith("."):
item.unlink()
else:
concept_set_path.mkdir(parents=True, exist_ok=True)
# write each concept as a separate file
for name, concept in out.groupby("CONCEPT_SET"):
for name, concept in final_out.groupby("CONCEPT_SET"):
concept = concept.sort_values(by="CONCEPT") # sort rows
concept = concept.dropna(how="all", axis=1) # remove empty cols
concept = concept.reindex(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment