Skip to content
Snippets Groups Projects
Commit 42564803 authored by mjbonifa's avatar mjbonifa
Browse files

fixed hard coded mapping for codes mapping files in map.py

parent cb24de86
No related branches found
No related tags found
No related merge requests found
......@@ -4,6 +4,8 @@ import numpy as np
import json
import os
import sqlite3
import sys
import trud
from pathlib import Path
from base import log_invalid_code
......@@ -27,6 +29,11 @@ pd.set_option("mode.chained_assignment", None)
OUTPUT_PATH = Path('build') / 'phenotype_mapping.csv'
ERROR_PATH = Path('build') / 'errors.csv'
SPLIT_COL_ACTION = "split_col"
CODES_COL_ACTION = "codes_col"
DIVIDE_COL_ACTION = "divide_col"
COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION]
def read_table_file(path, excel_sheet=None):
"""
Load Code List File
......@@ -105,7 +112,6 @@ def preprocess(
return out
# Translate Df with multiple codes into single code type Series
def convert_codes(df, target, translate):
codes = pd.Series([], dtype=str)
......@@ -119,14 +125,14 @@ def convert_codes(df, target, translate):
if translate:
# Convert codes to target type
print(f"target type {target}")
for col_name in df.columns[df.columns != target]:
path_map = f"maps/processed/{col_name}_to_{target}.parquet"
if os.path.exists(path_map):
filename = f"{col_name}_to_{target}.parquet"
map_path = trud.MAPS_PROCESSED_DIR / filename
if map_path.exists():
col = df[col_name]
df_map = pd.read_parquet(path_map)
translated = pd.merge(col, df_map, how="left")[
target
] # merge on corresponding codes and take target column
df_map = pd.read_parquet(map_path)
translated = pd.merge(col, df_map, how="left")[target] # merge on corresponding codes and take target colum
# TODO: BUG mask does not match column
# log_invalid_code(col,
# ~translated.isna(),
......@@ -134,13 +140,12 @@ def convert_codes(df, target, translate):
# cause=f"Translation to {target}") #log codes with no translation
codes = pd.concat([codes, translated]) # merge to output
else:
print(f"No mapping from {col_name} to {target}")
print(f"No mapping from {col_name} to {target}, file {str(map_path.resolve())} does not exist")
else:
print("NOT TRANSLATING")
print(f"NOT TRANSLATING {col_name}")
return codes
# Append file's codes to output Df with meldb concept
def map_file(df, target_code_type, out, concepts, meta_columns=[], translate=True):
# seperate out meta_columns
......@@ -158,6 +163,45 @@ def map_file(df, target_code_type, out, concepts, meta_columns=[], translate=Tru
out = pd.concat([out, codes])
return out
def validate_config(codes_path, mapping):
concept_sets = mapping["concept_sets"]
concept_codes = mapping["codes"]
validation_errors = []
concept_set_names = []
for item in concept_sets['concept_set']:
concept_set_names.append(item['concept_set_name'])
for item in concept_codes:
# check concept codes path is a directory
concept_code_dir_path = codes_path / item['folder']
if not concept_code_dir_path.is_dir():
validation_errors.append(f"Folder directory {str(concept_code_dir_path.resolve())} is not a directory")
for file in item["files"]:
# check concepte code file exists
concept_code_file_path = concept_code_dir_path / file['file']
if not concept_code_file_path.exists():
validation_errors.append(f"Coding file {str(concept_code_file_path.resolve())} does not exist")
# check columns specified are a supported medical coding type
for column in file['columns']:
if column not in code_types and column != 'metadata':
validation_errors.append(f"Column type {column} for file {concept_code_file_path} is not supported")
# check concept_set defined for the mapping
for concept_set_mapping in file['concept_set']:
if concept_set_mapping not in concept_set_names:
validation_errors.append(f"Concept set name {concept_set_mapping} for file {concept_code_file_path} does not exist in concept set list")
# check the actions are supported
if 'actions' in file:
for action in file['actions']:
if action not in COL_ACTIONS:
validation_errors.append(f"Action {action} is not supported")
return validation_errors
def sql_row_exist(conn, table, column, value):
# Execute and check if a result exists
......@@ -180,11 +224,15 @@ def process(config_file, source_codes_dir, target_code_type, translate=True, ver
# Load configuration File
if config_path.suffix == ".json":
mapping = json.load(open(config_path, "rb"))
folders = mapping["codes"]
summary_config = mapping["concept_sets"]
validation_errors = validate_config(codes_path, mapping)
if len(validation_errors) > 0:
print(validation_errors)
raise Exception(f"Configuration file {str(config_path.resolve())} failed validation")
else:
raise Exception(f"Unsupported filetype for configuration file: {config_file}")
raise Exception(f"Unsupported configuration filetype: {str(config_path.resolve())}")
summary_config = mapping["concept_sets"]
folders = mapping["codes"]
out = pd.DataFrame([]) # Create Output File to append to
# Iterate JSON mapping file (OBJECT FORMAT)
......@@ -237,14 +285,22 @@ def process(config_file, source_codes_dir, target_code_type, translate=True, ver
pass
# out = df
elif ("concept_set" in file) and isinstance(df, pd.core.frame.DataFrame):
out = map_file(df, target_code_type, out, concepts=file["concept_set"], meta_columns=meta_columns, translate=translate,)
out = map_file(df,
target_code_type, out,
concepts=file["concept_set"],
meta_columns=meta_columns,
translate=translate,)
elif ("concept_set_categories" in file) and isinstance(df, pd.core.groupby.generic.DataFrameGroupBy):
meta_columns.remove(divide_col) # delete categorical column
for cat, grp in df:
if (cat in file["concept_set_categories"].keys()): # check if category is mapped
grp = grp.drop(columns=[divide_col]) # delete categorical column
print("Category:", cat)
out = map_file(grp, target_code_type, out, concepts=file["concept_set_categories"][cat], meta_columns=meta_columns,)
out = map_file(grp,
target_code_type,
out,
concepts=file["concept_set_categories"][cat],
meta_columns=meta_columns,)
else:
print("Folder is empty")
......@@ -261,9 +317,7 @@ def process(config_file, source_codes_dir, target_code_type, translate=True, ver
# Add Concept Set Defintions metadata
summary_df = pd.DataFrame(summary_config["concept_set"]) # transform to dataframe
if "metadata" in summary_df.columns:
summary_df = summary_df.join(
pd.json_normalize(summary_df["metadata"])
) # metadata to columns
summary_df = summary_df.join(pd.json_normalize(summary_df["metadata"])) # metadata to columns
summary_df = summary_df.drop(columns=["metadata"])
summary_df = summary_df.rename(columns={"concept_set_name": "CONCEPT_SET"})
summary_df = summary_df.drop_duplicates() # remove duplicates
......@@ -283,13 +337,11 @@ def process(config_file, source_codes_dir, target_code_type, translate=True, ver
omop_setup(OMOP_DB_PATH, vocab_id, vocab_version, vocab_name, vocab_reference)
# Export to DB
omop_publish_concept_sets(
out,
omop_publish_concept_sets(out,
OMOP_DB_PATH,
vocab_id,
omop_vocab_types[target_code_type],
vocab_version,
)
vocab_version,)
else:
# export as CSV to /output
out.to_csv(output_path, index=False)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment