Skip to content
Snippets Groups Projects
Commit 5b3b1d29 authored by mjbonifa's avatar mjbonifa
Browse files

fix: started to add the metadata back in, the translation function is trickly...

fix: started to add the metadata back in, the translation function is trickly as it's not clear currently how the joining of data frames and indexes actually works becaiuse it's not explicit. #28
parent e08cfa07
No related branches found
No related tags found
No related merge requests found
......@@ -46,6 +46,8 @@ DIVIDE_COL_ACTION = "divide_col"
COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION]
CODE_FILE_TYPES = [".xlsx", ".xls", ".csv"]
SOURCE_COL_SUFFIX = "_acmc_source"
TARGET_COL_SUFFIX = "_acmc_target"
# config.yaml schema
CONFIG_SCHEMA = {
......@@ -470,31 +472,24 @@ def process_actions(df, concept_set):
return df
# Perform QA Checks on columns individually and append to df
def preprocess_codes(df, concept_set, code_file_path, target_code_type=None):
def preprocess_source_concepts(df, concept_set, code_file_path):
"""Parses each column individually - Order and length will not be preserved!"""
out = pd.DataFrame([]) # create output df to append to
code_errors = [] # list of errors from processing
# TODO: Is there a better way of processing this action as it's distributed across
# different parts of the programme.
if (
"actions" in concept_set["file"]
and "divide_col" in concept_set["file"]["actions"]
):
divide_col_df = df[concept_set["file"]["actions"]["divide_col"]]
else:
divide_col_df = pd.DataFrame()
# remove unnamed columns due to extra commas, missing headers, or incorrect parsing
df = df.drop(columns=[col for col in df.columns if "Unnamed" in col])
# Preprocess codes
code_types = parse.CodeTypeParser().code_types
for code_type in concept_set["file"]["columns"]:
parser = code_types[code_type]
logger.info(f"Processing {code_type} codes...")
logger.info(f"Processing {code_type} codes for {code_file_path}")
# get code types
codes = df[concept_set["file"]["columns"][code_type]].dropna()
# get codes by column name
source_col_name = concept_set["file"]["columns"][code_type]
codes = df[source_col_name].dropna()
codes = codes.astype(str) # convert to string
codes = codes.str.strip() # remove excess spaces
......@@ -504,62 +499,79 @@ def preprocess_codes(df, concept_set, code_file_path, target_code_type=None):
code_errors.extend(errors)
logger.warning(f"Codes validation failed with {len(errors)} errors")
# append to output dataframe
# add processed codes to df
new_col_name = f"{source_col_name}_SOURCE"
df = df.rename(columns={source_col_name: new_col_name})
process_codes = pd.DataFrame({code_type: codes}).join(df)
out = pd.concat(
[out, pd.DataFrame({code_type: codes}).join(divide_col_df)],
[out, process_codes],
ignore_index=True,
)
logger.debug(out.head())
return out, code_errors
def get_code_type_from_col_name(col_name):
return col_name.split("_")[0]
# Translate Df with multiple codes into single code type Series
def translate_codes(df, target_code_type):
def translate_codes(df, source_code_types, target_code_type, concept_name):
codes = pd.Series([], dtype=str)
# Convert codes to target type
logger.info(f"Converting to target code type {target_code_type}")
for col_name in df.columns:
for source_code_type, source_code_column in source_code_types.items():
# if target code type is the same as thet source code type, no translation, just appending source as target
if col_name == target_code_type:
if source_code_type == target_code_type:
codes = pd.concat([codes, df[source_code_type]])
logger.debug(
f"Target code type {target_code_type} has source code types {len(df)}, copying rather than translating"
f"Target code type {target_code_type} is the same as source code type {len(df)}, copying codes rather than translating"
)
codes = pd.concat([codes, df[target_code_type]])
else:
filename = f"{col_name}_to_{target_code_type}.parquet"
# get the translation filename using source to target code types
filename = f"{source_code_type}_to_{target_code_type}.parquet"
map_path = trud.PROCESSED_PATH / filename
# do the mapping if it exists
if map_path.exists():
col = df[col_name]
# get column from processed df and rename the series to what's needed for parquet
col = df[source_code_type]
df_map = pd.read_parquet(map_path)
# merge on corresponding codes and take target column
translated = pd.merge(col, df_map, how="left")[target_code_type]
translated_df = pd.merge(col, df_map, how="left")[target_code_type]
logger.debug("TRANSLATE")
logger.debug(translated_df.head())
# TODO: BUG mask does not match column
codes = pd.concat([codes, translated]) # merge to output
codes = pd.concat([codes, translated_df])
logger.debug("CODES")
logger.debug(codes.head())
else:
logger.warning(
f"No mapping from {col_name} to {target_code_type}, file {str(map_path.resolve())} does not exist"
f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist"
)
logger.debug("FULL CONCATED")
logger.debug(codes.head())
return codes
# Append file's codes to output Df with concept
def map_file(df, target_code_type, out, concept_name):
# translate codes
codes = translate_codes(df, target_code_type)
codes = codes.dropna() # delete NaNs
logger.debug(f"FULL CONCATED {len(codes.index)}")
# Append to output if translated
if len(codes) > 0:
if len(codes.index) > 0:
codes = pd.DataFrame({"CONCEPT": codes})
codes["CONCEPT_SET"] = np.repeat(concept_name.strip(), len(codes))
out = pd.concat([out, codes])
else:
logger.debug(f"No codes converted with target code type {target_code_type}")
return out
return codes
def sql_row_exist(conn, table, column, value):
......@@ -662,7 +674,6 @@ def map(phen_dir, target_code_type):
def map_target_code_type(phen_path, phenotype, target_code_type):
logger.debug(f"Target coding format: {target_code_type}")
codes_path = phen_path / CODES_DIR
# Create output dataframe
......@@ -680,21 +691,20 @@ def map_target_code_type(phen_path, phenotype, target_code_type):
# process structural actions
df = process_actions(df, concept_set)
# Preprocessing & Validation Checks
logger.debug("Processing and validating code formats")
df, errors = preprocess_codes(
# preprocessing and validate of source concepts
logger.debug("Processing and validating source concept codes")
df, errors = preprocess_source_concepts(
df,
concept_set,
codes_file_path,
target_code_type=target_code_type,
)
logger.debug(f"Length of errors from preprocess {len(errors)}")
logger.debug(f"Length of errors from preprocess_source_concepts {len(errors)}")
if len(errors) > 0:
code_errors.extend(errors)
logger.debug(f" Length of code_errors {len(code_errors)}")
# Map
# Map source concepts codes to target codes
# if processing a source coding list with categorical data
if (
"actions" in concept_set["file"]
......@@ -708,11 +718,26 @@ def map_target_code_type(phen_path, phenotype, target_code_type):
for cat, grp in df_grp:
if cat == concept_set["file"]["category"]:
grp = grp.drop(columns=[divide_col]) # delete categorical column
out = map_file(
grp, target_code_type, out, concept_name=concept_set["name"]
trans_out = translate_codes(
grp,
source_code_types=concept_set["file"]["columns"],
target_code_type=target_code_type,
concept_name=concept_set["name"]
)
out = pd.concat([out, trans_out])
else:
out = map_file(df, target_code_type, out, concept_name=concept_set["name"])
trans_out = translate_codes(
df,
source_code_types=concept_set["file"]["columns"],
target_code_type=target_code_type,
concept_name=concept_set["name"])
out = pd.concat([out, trans_out])
logger.debug("TEST")
logger.debug(df.columns)
logger.debug(df.head)
logger.debug(out.columns)
logger.debug(out.head)
if len(code_errors) > 0:
logger.error(f"The map processing has {len(code_errors)} errors")
......@@ -742,11 +767,10 @@ def map_target_code_type(phen_path, phenotype, target_code_type):
# save concept sets as separate files
concept_set_path = phen_path / CSV_PATH / target_code_type
# empty the concept-set directory if it exists but keep the .git file
git_items = [".git", ".gitkeep"]
# empty the concept-set directory except for hiddle files, e.g. .git
if concept_set_path.exists():
for item in concept_set_path.iterdir():
if item not in git_items:
if not item.name.startswith("."):
item.unlink()
else:
concept_set_path.mkdir(parents=True, exist_ok=True)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment