Skip to content
Snippets Groups Projects
Commit 4d8cbf8a authored by Jakub Dylag's avatar Jakub Dylag
Browse files

publish output to db

parent a4c7db06
No related branches found
No related tags found
No related merge requests found
...@@ -21,7 +21,7 @@ def raise_(ex): ...@@ -21,7 +21,7 @@ def raise_(ex):
def log_invalid_code(codes, mask, code_type=None, file_path=None, cause=None): def log_invalid_code(codes, mask, code_type=None, file_path=None, cause=None):
print("ERROR WITH CODES", file_path, codes[~mask]) # print("ERROR WITH CODES", file_path, codes[~mask])
errors = pd.DataFrame([]) errors = pd.DataFrame([])
errors["CODE"] = codes[~mask].astype(str) errors["CODE"] = codes[~mask].astype(str)
......
...@@ -4,6 +4,7 @@ import numpy as np ...@@ -4,6 +4,7 @@ import numpy as np
# import pathlib # import pathlib
import json import json
import os import os
import sqlite3
from base import log_invalid_code from base import log_invalid_code
from base import bcolors from base import bcolors
...@@ -109,10 +110,11 @@ def convert_codes(df, target, no_translate): ...@@ -109,10 +110,11 @@ def convert_codes(df, target, no_translate):
translated = pd.merge(col, translated = pd.merge(col,
df_map, df_map,
how='left')[target] #merge on corresponding codes and take target column how='left')[target] #merge on corresponding codes and take target column
log_invalid_code(col, #TODO: BUG mask does not match column
~translated.isna(), # log_invalid_code(col,
code_type=col_name, # ~translated.isna(),
cause=f"Translation to {target}") #log codes with no translation # code_type=col_name,
# cause=f"Translation to {target}") #log codes with no translation
codes = pd.concat([codes, translated]) #merge to output codes = pd.concat([codes, translated]) #merge to output
else: else:
print(f"No mapping from {col_name} to {target}") print(f"No mapping from {col_name} to {target}")
...@@ -140,6 +142,96 @@ def map_file(df, target_code_type, out, concepts, meta_columns=[], no_translate= ...@@ -140,6 +142,96 @@ def map_file(df, target_code_type, out, concepts, meta_columns=[], no_translate=
out = pd.concat([out, codes]) out = pd.concat([out, codes])
return out return out
def sql_row_exist(conn, table, column, value):
# Execute and check if a result exists
cur = conn.cursor()
query = f"SELECT 1 FROM {table} WHERE {column} = ? LIMIT 1;"
cur.execute(query, (value,))
exists = cur.fetchone() is not None
return exists
#Setup SQLite3 Database for OMOP
def omop_setup(db_path):
conn = sqlite3.connect(db_path)
cur = conn.cursor()
#TODO: Check if DB populated with necessary VOCABULARY
#TODO: populate VOCABULARY with ATHENA download
#create meldb VOCABULARY
meldb_version='v3.2.10'
meldb_description = 'Multidisciplinary Ecosystem to study Lifecourse Determinants and Prevention of Early-onset Burdensome Multimorbidity'
meldb_reference = 'https://www.it-innovation.soton.ac.uk/projects/meldb'
df_test = pd.DataFrame([{
"vocabulary_id": 'MELDB',
"vocabulary_name": meldb_description,
"vocabulary_reference": meldb_reference,
"vocabulary_version": meldb_version,
# "vocabulary_concept_id": 0,
}])
df_test.to_sql("VOCABULARY", conn, if_exists='append', index=False)
cur.execute("""
CREATE TABLE CONCEPT_SET (
concept_set_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each concept set
atlas_id INTEGER, -- Unique identifier generated by ATLAS
concept_set_name TEXT, -- Optional name for the concept set
concept_set_description TEXT, -- Optional description for the concept set
vocabulary_id TEXT NOT NULL, -- Foreign key to VOCABULARY table
FOREIGN KEY (vocabulary_id) REFERENCES VOCABULARY(vocabulary_id)
);""")
cur.execute("""
CREATE TABLE CONCEPT_SET_ITEM (
concept_set_item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each mapping
concept_set_id INTEGER NOT NULL, -- Foreign key to CONCEPT_SET table
concept_id INTEGER NOT NULL, -- Foreign key to CONCEPT table
FOREIGN KEY (concept_set_id) REFERENCES CONCEPT_SET(concept_set_id),
FOREIGN KEY (concept_id) REFERENCES CONCEPT(concept_id)
);""")
conn.close()
def omop_publish_concept_sets(out, db_path, vocab_output, vocab_type):
conn = sqlite3.connect(db_path)
cur = conn.cursor()
for concept_set_name, grp in out.groupby("MELDB_concept"):
#Create Concept_Set
if not sql_row_exist(conn, "CONCEPT_SET", "concept_set_name", concept_set_name):
cur.execute(f"INSERT INTO CONCEPT_SET (concept_set_name, vocabulary_id) VALUES ('{concept_set_name}', 'MELDB');")
else:
print("concept_set", concept_set_name, "already exists")
#TODO: ask to remove old concept_set?
#Get Concept_set_Id
query = "SELECT concept_set_id FROM CONCEPT_SET WHERE concept_set_name = ? AND vocabulary_id = ?;"
cur.execute(query, (concept_set_name, vocab_output, ))
concept_set_id = cur.fetchone()[0]
#Get corresponing Concept_id (OMOP) for each Concept_code (e.g. SNOMED)
concept_codes = "'"+"', '".join(list(grp["code"].astype(str)))+"'"
query = f"SELECT concept_id FROM CONCEPT WHERE vocabulary_id = ? AND concept_code IN ({concept_codes});"
cur.execute(query, (vocab_type, ))
df_out = pd.DataFrame(cur.fetchall(), columns=["concept_id"])
if not len(grp) == len(df_out):
print("ERROR: Some", vocab_type, "Codes do not exist in OMOP Database")
#Create Concept_set_item
df_out["concept_set_id"] = concept_set_id
df_out.to_sql("CONCEPT_SET_ITEM", conn, if_exists='append', index=False)
conn.close()
# def omop_reset(db_path):
# cur.execute("DROP TABLE CONCEPT_SET;")
# cur.execute("DROP TABLE CONCEPT_SET_ITEM;")
def run_all(mapping_file, target_code_type, def run_all(mapping_file, target_code_type,
no_translate=False, no_verify=False, no_translate=False, no_verify=False,
log_errors_path="MELD_errors.csv", log_errors_path="MELD_errors.csv",
...@@ -258,16 +350,35 @@ def run_all(mapping_file, target_code_type, ...@@ -258,16 +350,35 @@ def run_all(mapping_file, target_code_type,
summary_df = summary_df.drop_duplicates() #remove duplicates summary_df = summary_df.drop_duplicates() #remove duplicates
out = out.merge(summary_df, how="left", on='MELDB_concept') out = out.merge(summary_df, how="left", on='MELDB_concept')
# export as CSV to /output # Save Output File
print(bcolors.HEADER, "---"*5, "OUTPUT", "---"*5, bcolors.ENDC) print(bcolors.HEADER, "---"*5, "OUTPUT", "---"*5, bcolors.ENDC)
print(out) print(out)
if output_path == "atlas":
#Export to DB
db_path = "codes/omop_54.sqlite"
vocab_output = "MELDB"
vocab_types = {
"read2_code": "Read",
"read3_code": None,
"icd10_code": "ICD10CM",
"snomed_code": "SNOMED",
"opcs4_code": "OPCS4",
"atc_code": "ATC",
"med_code": None,
"cprd_code": None,
}
omop_publish_concept_sets(out, db_path, vocab_output, vocab_types[target_code_type])
else:
# export as CSV to /output
out.to_csv(output_path, index=False) out.to_csv(output_path, index=False)
print("saved to", output_path) print("saved to", output_path)
#Remove Duplicates from Error file # Save Error File
if os.path.exists(log_errors_path): if os.path.exists(log_errors_path):
error_df = pd.read_csv(log_errors_path) error_df = pd.read_csv(log_errors_path)
error_df = error_df.drop_duplicates() error_df = error_df.drop_duplicates() #Remove Duplicates from Error file
error_df = error_df.sort_values(by=["SOURCE", "CODE_TYPE", "CODE"]) error_df = error_df.sort_values(by=["SOURCE", "CODE_TYPE", "CODE"])
error_df.to_csv(log_errors_path, index=False) error_df.to_csv(log_errors_path, index=False)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment