diff --git a/README.md b/README.md index f9efe2b86eb95acf255f5a30631c663ca2264b26..090dd7bc7f1c30e3f6eba01b4bacdb78baeeaaad 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,21 @@ MELD-B refers to various diagnostic code formats included in target datasets. - The convertion Tables will be saved as `.parquet` tables in the folder `maps/processed/`. - NHS TRUD defines one-way mappings and does <b>NOT ADVISE</b> reversing the mappings. If you still wish to reverse these into two-way mappings, duplicate the given `.parquet` table and reverse the filename (e.g. `read2_code_to_snomed_code.parquet` to `snomed_code_to_read2_code.parquet`) +4. Populate the SQLite3 database with OMOP Vocabularies. These can be download from https://athena.ohdsi.org/vocabulary/list. + - Install the following vocabularies by ticking the box: + - 1-SNOMED + - 2-ICD9CM + - 17-Readv2 + - 21-ATC + - 55-OPCS4 + - 57-HES Specialty + - 70-ICD10CM + - 75-dm+d + - 144-UK Biobank + - 154-NHS Ethnic Category + - 155-NHS Place of Service + - Use the command `python omop_api.py --install <INSERT PATH>` to load vocabularies into database (insert your own path to unzipped download folder). + ### JSON phenotype mapping Mappings from Imported Code Lists to Outputted MELD-B Concept's Code list are defined in JSON format within `PHEN_assign_v3.json`. diff --git a/main.py b/main.py index 41a835552d4b3a316df906cfb8a5b4100caaa33c..f1c082a512f3f2f6c7831ecc5c9f24ddb12015e3 100644 --- a/main.py +++ b/main.py @@ -18,7 +18,9 @@ from parse import Atc_code from parse import Med_code from parse import code_types from parse import omop_vocab_types - +from omop_api import db_path +from omop_api import omop_publish_concept_sets +from omop_api import omop_setup pd.set_option('mode.chained_assignment', None) @@ -142,7 +144,7 @@ def map_file(df, target_code_type, out, concepts, meta_columns=[], no_translate= codes["CONCEPT_SET"] = np.repeat(concept.strip(), len(codes)) out = pd.concat([out, codes]) return out - + def sql_row_exist(conn, table, column, value): # Execute and check if a result exists cur = conn.cursor() @@ -152,87 +154,6 @@ def sql_row_exist(conn, table, column, value): return exists - -#Setup SQLite3 Database for OMOP -def omop_setup(db_path): - conn = sqlite3.connect(db_path) - cur = conn.cursor() - - #TODO: Check if DB populated with necessary VOCABULARY - - #TODO: populate VOCABULARY with ATHENA download - - #create meldb VOCABULARY - meldb_version='v3.2.10' - meldb_description = 'Multidisciplinary Ecosystem to study Lifecourse Determinants and Prevention of Early-onset Burdensome Multimorbidity' - meldb_reference = 'https://www.it-innovation.soton.ac.uk/projects/meldb' - df_test = pd.DataFrame([{ - "vocabulary_id": 'MELDB', - "vocabulary_name": meldb_description, - "vocabulary_reference": meldb_reference, - "vocabulary_version": meldb_version, - # "vocabulary_concept_id": 0, - }]) - df_test.to_sql("VOCABULARY", conn, if_exists='append', index=False) - - cur.execute(""" - CREATE TABLE CONCEPT_SET ( - concept_set_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each concept set - atlas_id INTEGER, -- Unique identifier generated by ATLAS - concept_set_name TEXT, -- Optional name for the concept set - concept_set_description TEXT, -- Optional description for the concept set - vocabulary_id TEXT NOT NULL, -- Foreign key to VOCABULARY table - FOREIGN KEY (vocabulary_id) REFERENCES VOCABULARY(vocabulary_id) - );""") - - - cur.execute(""" - CREATE TABLE CONCEPT_SET_ITEM ( - concept_set_item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each mapping - concept_set_id INTEGER NOT NULL, -- Foreign key to CONCEPT_SET table - concept_id INTEGER NOT NULL, -- Foreign key to CONCEPT table - FOREIGN KEY (concept_set_id) REFERENCES CONCEPT_SET(concept_set_id), - FOREIGN KEY (concept_id) REFERENCES CONCEPT(concept_id) - );""") - - conn.close() - -def omop_publish_concept_sets(out, db_path, vocab_output, vocab_type): - conn = sqlite3.connect(db_path) - cur = conn.cursor() - - for concept_set_name, grp in out.groupby("CONCEPT_SET"): - #Create Concept_Set - if not sql_row_exist(conn, "CONCEPT_SET", "concept_set_name", concept_set_name): - cur.execute(f"INSERT INTO CONCEPT_SET (concept_set_name, vocabulary_id) VALUES ('{concept_set_name}', 'MELDB');") - else: - print("concept_set", concept_set_name, "already exists") - #TODO: ask to remove old concept_set? - - #Get Concept_set_Id - query = "SELECT concept_set_id FROM CONCEPT_SET WHERE concept_set_name = ? AND vocabulary_id = ?;" - cur.execute(query, (concept_set_name, vocab_output, )) - concept_set_id = cur.fetchone()[0] - - #Get corresponing Concept_id (OMOP) for each Concept_code (e.g. SNOMED) - concept_codes = "'"+"', '".join(list(grp["CONCEPT"].astype(str)))+"'" - query = f"SELECT concept_id FROM CONCEPT WHERE vocabulary_id = ? AND concept_code IN ({concept_codes});" - cur.execute(query, (vocab_type, )) - df_out = pd.DataFrame(cur.fetchall(), columns=["concept_id"]) - - if not len(grp) == len(df_out): - print("ERROR: Some", vocab_type, "Codes do not exist in OMOP Database") - - #Create Concept_set_item - df_out["concept_set_id"] = concept_set_id - df_out.to_sql("CONCEPT_SET_ITEM", conn, if_exists='append', index=False) - - conn.close() - -# def omop_reset(db_path): -# cur.execute("DROP TABLE CONCEPT_SET;") -# cur.execute("DROP TABLE CONCEPT_SET_ITEM;") - def run_all(mapping_file, target_code_type, no_translate=False, no_verify=False, log_errors_path="MELD_errors.csv", @@ -242,6 +163,7 @@ def run_all(mapping_file, target_code_type, if mapping_file.endswith(".json"): mapping = json.load(open(mapping_file,'rb')) folders = mapping["codes"] + summary_config = mapping["concept_sets"] else: raise Exception("Unsupported filetype provided for source file") @@ -334,7 +256,6 @@ def run_all(mapping_file, target_code_type, out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"]) #Merge with Concept Types in Summary Excel File - summary_config = mapping["concepts"] if "excel_sheet" in summary_config: summary_df = read_table_file(summary_config["file"], excel_sheet=summary_config["excel_sheet"]) else: @@ -346,8 +267,9 @@ def run_all(mapping_file, target_code_type, else: summary_cols_all += v + output_version = summary_config["version"] summary_df = summary_df[summary_cols_all] #select all relevant columns - summary_df = summary_df.rename(columns={summary_config["columns"]["concept_name"]: "CONCEPT_SET"}) + summary_df = summary_df.rename(columns={summary_config["columns"]["concept_set_name"]: "CONCEPT_SET"}) summary_df = summary_df.drop_duplicates() #remove duplicates out = out.merge(summary_df, how="left", on='CONCEPT_SET') @@ -355,11 +277,17 @@ def run_all(mapping_file, target_code_type, print(bcolors.HEADER, "---"*5, "OUTPUT", "---"*5, bcolors.ENDC) print(out) if output_path == "atlas": - #Export to DB - db_path = "codes/omop_54.sqlite" - vocab_output = "MELDB" #TODO: parameterise output name + + vocab_id = summary_config["omop"]["vocabulary_id"] + vocab_version = summary_config["version"] + vocab_name = summary_config["omop"]["vocabulary_name"] + vocab_reference = summary_config["omop"]["vocabulary_reference"] - omop_publish_concept_sets(out, db_path, vocab_output, omop_vocab_types[target_code_type]) + #Create New OMOP Vocabulary + omop_setup(db_path, vocab_id, vocab_version, vocab_name, vocab_reference) + + #Export to DB + omop_publish_concept_sets(out, db_path, vocab_id, omop_vocab_types[target_code_type], vocab_version) else: # export as CSV to /output out.to_csv(output_path, index=False) @@ -386,7 +314,7 @@ if __name__ == '__main__': parser.add_argument("--no-translate", action='store_true', help="Do not translate code types") parser.add_argument("--no-verify", action='store_true', help="Do not verify codes are correct") parser.add_argument("--output", type=str, help="File Location to save output csv to") - parser.add_argument("--error-log", type=str, help="File Location to save output csv to") + parser.add_argument("--error-log", type=str, help="File Location to save error log csv to") args = parser.parse_args() config = vars(args) diff --git a/omop_api.py b/omop_api.py new file mode 100644 index 0000000000000000000000000000000000000000..1e5d7a6b869c24a9e48fcd13ab9fe48af7146250 --- /dev/null +++ b/omop_api.py @@ -0,0 +1,177 @@ +import os +import argparse +import sqlite3 +import pandas as pd + +db_path = "codes/omop_54.sqlite" + +#Populate SQLite3 Database with default OMOP CONCEPTS +def omop_install (db_path, folder_path): + conn = sqlite3.connect(db_path) + + # Check if the folder exists + if not os.path.isdir(folder_path): + raise Exception(f"Error: The folder '{folder_path}' does not exist.") + + # Iterate through files in the folder + for filename in os.listdir(folder_path): + if filename.endswith(".csv"): # Check if the file is a CSV + file_path = os.path.join(folder_path, filename) + try: + print(f"Reading file: {file_path}") + # Read the CSV file with the specified delimiter + df = pd.read_csv(file_path, delimiter="\t", low_memory=False) + table_name = os.path.splitext(os.path.basename(file_path))[0] #Get name of file + + #Export Table to sqlite db + df.to_sql(table_name, conn, if_exists='replace', index=False) + + except Exception as e: + raise Exception(f"Error reading file {file_path}: {e}") + + conn.close() + +def table_exists(cursor, table_name): + # Query to check if the table exists + cursor.execute( + """ + SELECT name + FROM sqlite_master + WHERE type='table' AND name=? + """, + (table_name,) + ) + + # Fetch the result + result = cursor.fetchone() + + return result is not None + +def omop_vocab_exists(cursor, vocab_id): + # Query to check if the table exists + cursor.execute( + """ + SELECT vocabulary_id + FROM VOCABULARY + WHERE vocabulary_id=? + """, + (vocab_id,) + ) + + # Fetch the result + result = cursor.fetchone() + + return result is not None + +#Setup SQLite3 Database for OMOP +def omop_setup(db_path, vocab_id, vocab_version, vocab_name, vocab_reference): + conn = sqlite3.connect(db_path) + cur = conn.cursor() + + #Check if DB populated with necessary VOCABULARY + if not table_exists(cur, "VOCABULARY"): + raise Exception(f"Error {db_path} is not yet populated with OMOP VOCABULARY. Please download from https://athena.ohdsi.org/.") + + #Check if Vocabulary already exists + elif not omop_vocab_exists(cur, vocab_id): + #Create VOCABULARY + df_test = pd.DataFrame([{ + "vocabulary_id": vocab_id, + "vocabulary_name": vocab_name, + "vocabulary_reference": vocab_reference, + "vocabulary_version": vocab_version, + # "vocabulary_concept_id": 0, + }]) + df_test.to_sql("VOCABULARY", conn, if_exists='append', index=False) + + #Check if CONCEPT_SET table exists + if not table_exists(cur, "CONCEPT_SET"): + cur.execute(""" + CREATE TABLE CONCEPT_SET ( + concept_set_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each concept set + atlas_id INTEGER, -- Unique identifier generated by ATLAS + concept_set_name TEXT, -- Optional name for the concept set + concept_set_description TEXT, -- Optional description for the concept set + vocabulary_id TEXT NOT NULL, -- Foreign key to VOCABULARY table + FOREIGN KEY (vocabulary_id) REFERENCES VOCABULARY(vocabulary_id) + );""") + + #Check if CONCEPT_SET_ITEM table exists + if not table_exists(cur, "CONCEPT_SET_ITEM"): + cur.execute(""" + CREATE TABLE CONCEPT_SET_ITEM ( + concept_set_item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each mapping + concept_set_id INTEGER NOT NULL, -- Foreign key to CONCEPT_SET table + concept_id INTEGER NOT NULL, -- Foreign key to CONCEPT table + FOREIGN KEY (concept_set_id) REFERENCES CONCEPT_SET(concept_set_id), + FOREIGN KEY (concept_id) REFERENCES CONCEPT(concept_id) + );""") + + conn.close() + +def omop_publish_concept_sets(out, db_path, vocab_output, vocab_type, output_version): + conn = sqlite3.connect(db_path) + cur = conn.cursor() + + for concept_set_name, grp in out.groupby("CONCEPT_SET"): + #Create Concept_Set + if not sql_row_exist(conn, "CONCEPT_SET", "concept_set_name", concept_set_name): + cur.execute(f"INSERT INTO CONCEPT_SET (concept_set_name, vocabulary_id) VALUES ('{concept_set_name}', 'MELDB');") + else: + print("concept_set", concept_set_name, "already exists") + #TODO: ask to remove old concept_set? + + #Get Concept_set_Id + query = "SELECT concept_set_id FROM CONCEPT_SET WHERE concept_set_name = ? AND vocabulary_id = ?;" + cur.execute(query, (concept_set_name, vocab_output, )) + concept_set_id = cur.fetchone()[0] + + #Get corresponing Concept_id (OMOP) for each Concept_code (e.g. SNOMED) + concept_codes = "'"+"', '".join(list(grp["CONCEPT"].astype(str)))+"'" + query = f"SELECT concept_id FROM CONCEPT WHERE vocabulary_id = ? AND concept_code IN ({concept_codes});" + cur.execute(query, (vocab_type, )) + df_out = pd.DataFrame(cur.fetchall(), columns=["concept_id"]) + + if not len(grp) == len(df_out): + print("ERROR: Some", vocab_type, "Codes do not exist in OMOP Database") + + #Create Concept_set_item + df_out["concept_set_id"] = concept_set_id + df_out.to_sql("CONCEPT_SET_ITEM", conn, if_exists='append', index=False) + + conn.close() + +def omop_clear(db_path): + conn = sqlite3.connect(db_path) + cur = conn.cursor() + + cur.execute("DROP TABLE CONCEPT_SET;") + cur.execute("DROP TABLE CONCEPT_SET_ITEM;") + + conn.close() + + + +def main(): + parser = argparse.ArgumentParser( + description="Installation of SQLite3 OMOP Database.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument("-i", "--install", type=str, help="Install OMOP Vocabularies in Database with download from athena.ohdsi.org") + parser.add_argument("--clear", action="store_true", help="Delete ALL CONCEPT_SETS from OMOP Database") + parser.add_argument("--delete", action="store_true", help="Delete ALL DATA from OMOP Database") + + args = parser.parse_args() + config = vars(args) + + if config["install"] is not None: + omop_install(db_path, config["install"]) + elif config["clear"]: + omop_clear(db_path) + elif config["delete"]: + omop_reset(db_path) + + + +if __name__ == "__main__": + main() \ No newline at end of file