From 4d8cbf8abf3744596b4ac3d40cead76e76a886d2 Mon Sep 17 00:00:00 2001 From: Jakub Dylag <jjd1c23@soton.ac.uk> Date: Wed, 8 Jan 2025 15:51:58 +0000 Subject: [PATCH] publish output to db --- base.py | 2 +- main.py | 131 +++++++++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 122 insertions(+), 11 deletions(-) diff --git a/base.py b/base.py index 9765727..58da8cd 100644 --- a/base.py +++ b/base.py @@ -21,7 +21,7 @@ def raise_(ex): def log_invalid_code(codes, mask, code_type=None, file_path=None, cause=None): - print("ERROR WITH CODES", file_path, codes[~mask]) + # print("ERROR WITH CODES", file_path, codes[~mask]) errors = pd.DataFrame([]) errors["CODE"] = codes[~mask].astype(str) diff --git a/main.py b/main.py index b089cf6..e5439de 100644 --- a/main.py +++ b/main.py @@ -4,6 +4,7 @@ import numpy as np # import pathlib import json import os +import sqlite3 from base import log_invalid_code from base import bcolors @@ -109,10 +110,11 @@ def convert_codes(df, target, no_translate): translated = pd.merge(col, df_map, how='left')[target] #merge on corresponding codes and take target column - log_invalid_code(col, - ~translated.isna(), - code_type=col_name, - cause=f"Translation to {target}") #log codes with no translation + #TODO: BUG mask does not match column + # log_invalid_code(col, + # ~translated.isna(), + # code_type=col_name, + # cause=f"Translation to {target}") #log codes with no translation codes = pd.concat([codes, translated]) #merge to output else: print(f"No mapping from {col_name} to {target}") @@ -140,6 +142,96 @@ def map_file(df, target_code_type, out, concepts, meta_columns=[], no_translate= out = pd.concat([out, codes]) return out +def sql_row_exist(conn, table, column, value): + # Execute and check if a result exists + cur = conn.cursor() + query = f"SELECT 1 FROM {table} WHERE {column} = ? LIMIT 1;" + cur.execute(query, (value,)) + exists = cur.fetchone() is not None + + return exists + + +#Setup SQLite3 Database for OMOP +def omop_setup(db_path): + conn = sqlite3.connect(db_path) + cur = conn.cursor() + + #TODO: Check if DB populated with necessary VOCABULARY + + #TODO: populate VOCABULARY with ATHENA download + + #create meldb VOCABULARY + meldb_version='v3.2.10' + meldb_description = 'Multidisciplinary Ecosystem to study Lifecourse Determinants and Prevention of Early-onset Burdensome Multimorbidity' + meldb_reference = 'https://www.it-innovation.soton.ac.uk/projects/meldb' + df_test = pd.DataFrame([{ + "vocabulary_id": 'MELDB', + "vocabulary_name": meldb_description, + "vocabulary_reference": meldb_reference, + "vocabulary_version": meldb_version, + # "vocabulary_concept_id": 0, + }]) + df_test.to_sql("VOCABULARY", conn, if_exists='append', index=False) + + cur.execute(""" + CREATE TABLE CONCEPT_SET ( + concept_set_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each concept set + atlas_id INTEGER, -- Unique identifier generated by ATLAS + concept_set_name TEXT, -- Optional name for the concept set + concept_set_description TEXT, -- Optional description for the concept set + vocabulary_id TEXT NOT NULL, -- Foreign key to VOCABULARY table + FOREIGN KEY (vocabulary_id) REFERENCES VOCABULARY(vocabulary_id) + );""") + + + cur.execute(""" + CREATE TABLE CONCEPT_SET_ITEM ( + concept_set_item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each mapping + concept_set_id INTEGER NOT NULL, -- Foreign key to CONCEPT_SET table + concept_id INTEGER NOT NULL, -- Foreign key to CONCEPT table + FOREIGN KEY (concept_set_id) REFERENCES CONCEPT_SET(concept_set_id), + FOREIGN KEY (concept_id) REFERENCES CONCEPT(concept_id) + );""") + + conn.close() + +def omop_publish_concept_sets(out, db_path, vocab_output, vocab_type): + conn = sqlite3.connect(db_path) + cur = conn.cursor() + + for concept_set_name, grp in out.groupby("MELDB_concept"): + #Create Concept_Set + if not sql_row_exist(conn, "CONCEPT_SET", "concept_set_name", concept_set_name): + cur.execute(f"INSERT INTO CONCEPT_SET (concept_set_name, vocabulary_id) VALUES ('{concept_set_name}', 'MELDB');") + else: + print("concept_set", concept_set_name, "already exists") + #TODO: ask to remove old concept_set? + + #Get Concept_set_Id + query = "SELECT concept_set_id FROM CONCEPT_SET WHERE concept_set_name = ? AND vocabulary_id = ?;" + cur.execute(query, (concept_set_name, vocab_output, )) + concept_set_id = cur.fetchone()[0] + + #Get corresponing Concept_id (OMOP) for each Concept_code (e.g. SNOMED) + concept_codes = "'"+"', '".join(list(grp["code"].astype(str)))+"'" + query = f"SELECT concept_id FROM CONCEPT WHERE vocabulary_id = ? AND concept_code IN ({concept_codes});" + cur.execute(query, (vocab_type, )) + df_out = pd.DataFrame(cur.fetchall(), columns=["concept_id"]) + + if not len(grp) == len(df_out): + print("ERROR: Some", vocab_type, "Codes do not exist in OMOP Database") + + #Create Concept_set_item + df_out["concept_set_id"] = concept_set_id + df_out.to_sql("CONCEPT_SET_ITEM", conn, if_exists='append', index=False) + + conn.close() + +# def omop_reset(db_path): +# cur.execute("DROP TABLE CONCEPT_SET;") +# cur.execute("DROP TABLE CONCEPT_SET_ITEM;") + def run_all(mapping_file, target_code_type, no_translate=False, no_verify=False, log_errors_path="MELD_errors.csv", @@ -257,17 +349,36 @@ def run_all(mapping_file, target_code_type, summary_df = summary_df.rename(columns={summary_config["columns"]["concept_name"]: "MELDB_concept"}) summary_df = summary_df.drop_duplicates() #remove duplicates out = out.merge(summary_df, how="left", on='MELDB_concept') - - # export as CSV to /output + + # Save Output File print(bcolors.HEADER, "---"*5, "OUTPUT", "---"*5, bcolors.ENDC) print(out) - out.to_csv(output_path, index=False) - print("saved to", output_path) + if output_path == "atlas": + #Export to DB + db_path = "codes/omop_54.sqlite" + vocab_output = "MELDB" + + vocab_types = { + "read2_code": "Read", + "read3_code": None, + "icd10_code": "ICD10CM", + "snomed_code": "SNOMED", + "opcs4_code": "OPCS4", + "atc_code": "ATC", + "med_code": None, + "cprd_code": None, + } + + omop_publish_concept_sets(out, db_path, vocab_output, vocab_types[target_code_type]) + else: + # export as CSV to /output + out.to_csv(output_path, index=False) + print("saved to", output_path) - #Remove Duplicates from Error file + # Save Error File if os.path.exists(log_errors_path): error_df = pd.read_csv(log_errors_path) - error_df = error_df.drop_duplicates() + error_df = error_df.drop_duplicates() #Remove Duplicates from Error file error_df = error_df.sort_values(by=["SOURCE", "CODE_TYPE", "CODE"]) error_df.to_csv(log_errors_path, index=False) -- GitLab