Skip to content
Snippets Groups Projects
Commit 5b98129c authored by Jakub Dylag's avatar Jakub Dylag
Browse files

omop installation script

parent 850bd595
No related branches found
No related tags found
No related merge requests found
......@@ -69,6 +69,21 @@ MELD-B refers to various diagnostic code formats included in target datasets.
- The convertion Tables will be saved as `.parquet` tables in the folder `maps/processed/`.
- NHS TRUD defines one-way mappings and does <b>NOT ADVISE</b> reversing the mappings. If you still wish to reverse these into two-way mappings, duplicate the given `.parquet` table and reverse the filename (e.g. `read2_code_to_snomed_code.parquet` to `snomed_code_to_read2_code.parquet`)
4. Populate the SQLite3 database with OMOP Vocabularies. These can be download from https://athena.ohdsi.org/vocabulary/list.
- Install the following vocabularies by ticking the box:
- 1-SNOMED
- 2-ICD9CM
- 17-Readv2
- 21-ATC
- 55-OPCS4
- 57-HES Specialty
- 70-ICD10CM
- 75-dm+d
- 144-UK Biobank
- 154-NHS Ethnic Category
- 155-NHS Place of Service
- Use the command `python omop_api.py --install <INSERT PATH>` to load vocabularies into database (insert your own path to unzipped download folder).
### JSON phenotype mapping
Mappings from Imported Code Lists to Outputted MELD-B Concept's Code list are defined in JSON format within `PHEN_assign_v3.json`.
......
......@@ -18,7 +18,9 @@ from parse import Atc_code
from parse import Med_code
from parse import code_types
from parse import omop_vocab_types
from omop_api import db_path
from omop_api import omop_publish_concept_sets
from omop_api import omop_setup
pd.set_option('mode.chained_assignment', None)
......@@ -142,7 +144,7 @@ def map_file(df, target_code_type, out, concepts, meta_columns=[], no_translate=
codes["CONCEPT_SET"] = np.repeat(concept.strip(), len(codes))
out = pd.concat([out, codes])
return out
def sql_row_exist(conn, table, column, value):
# Execute and check if a result exists
cur = conn.cursor()
......@@ -152,87 +154,6 @@ def sql_row_exist(conn, table, column, value):
return exists
#Setup SQLite3 Database for OMOP
def omop_setup(db_path):
conn = sqlite3.connect(db_path)
cur = conn.cursor()
#TODO: Check if DB populated with necessary VOCABULARY
#TODO: populate VOCABULARY with ATHENA download
#create meldb VOCABULARY
meldb_version='v3.2.10'
meldb_description = 'Multidisciplinary Ecosystem to study Lifecourse Determinants and Prevention of Early-onset Burdensome Multimorbidity'
meldb_reference = 'https://www.it-innovation.soton.ac.uk/projects/meldb'
df_test = pd.DataFrame([{
"vocabulary_id": 'MELDB',
"vocabulary_name": meldb_description,
"vocabulary_reference": meldb_reference,
"vocabulary_version": meldb_version,
# "vocabulary_concept_id": 0,
}])
df_test.to_sql("VOCABULARY", conn, if_exists='append', index=False)
cur.execute("""
CREATE TABLE CONCEPT_SET (
concept_set_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each concept set
atlas_id INTEGER, -- Unique identifier generated by ATLAS
concept_set_name TEXT, -- Optional name for the concept set
concept_set_description TEXT, -- Optional description for the concept set
vocabulary_id TEXT NOT NULL, -- Foreign key to VOCABULARY table
FOREIGN KEY (vocabulary_id) REFERENCES VOCABULARY(vocabulary_id)
);""")
cur.execute("""
CREATE TABLE CONCEPT_SET_ITEM (
concept_set_item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each mapping
concept_set_id INTEGER NOT NULL, -- Foreign key to CONCEPT_SET table
concept_id INTEGER NOT NULL, -- Foreign key to CONCEPT table
FOREIGN KEY (concept_set_id) REFERENCES CONCEPT_SET(concept_set_id),
FOREIGN KEY (concept_id) REFERENCES CONCEPT(concept_id)
);""")
conn.close()
def omop_publish_concept_sets(out, db_path, vocab_output, vocab_type):
conn = sqlite3.connect(db_path)
cur = conn.cursor()
for concept_set_name, grp in out.groupby("CONCEPT_SET"):
#Create Concept_Set
if not sql_row_exist(conn, "CONCEPT_SET", "concept_set_name", concept_set_name):
cur.execute(f"INSERT INTO CONCEPT_SET (concept_set_name, vocabulary_id) VALUES ('{concept_set_name}', 'MELDB');")
else:
print("concept_set", concept_set_name, "already exists")
#TODO: ask to remove old concept_set?
#Get Concept_set_Id
query = "SELECT concept_set_id FROM CONCEPT_SET WHERE concept_set_name = ? AND vocabulary_id = ?;"
cur.execute(query, (concept_set_name, vocab_output, ))
concept_set_id = cur.fetchone()[0]
#Get corresponing Concept_id (OMOP) for each Concept_code (e.g. SNOMED)
concept_codes = "'"+"', '".join(list(grp["CONCEPT"].astype(str)))+"'"
query = f"SELECT concept_id FROM CONCEPT WHERE vocabulary_id = ? AND concept_code IN ({concept_codes});"
cur.execute(query, (vocab_type, ))
df_out = pd.DataFrame(cur.fetchall(), columns=["concept_id"])
if not len(grp) == len(df_out):
print("ERROR: Some", vocab_type, "Codes do not exist in OMOP Database")
#Create Concept_set_item
df_out["concept_set_id"] = concept_set_id
df_out.to_sql("CONCEPT_SET_ITEM", conn, if_exists='append', index=False)
conn.close()
# def omop_reset(db_path):
# cur.execute("DROP TABLE CONCEPT_SET;")
# cur.execute("DROP TABLE CONCEPT_SET_ITEM;")
def run_all(mapping_file, target_code_type,
no_translate=False, no_verify=False,
log_errors_path="MELD_errors.csv",
......@@ -242,6 +163,7 @@ def run_all(mapping_file, target_code_type,
if mapping_file.endswith(".json"):
mapping = json.load(open(mapping_file,'rb'))
folders = mapping["codes"]
summary_config = mapping["concept_sets"]
else:
raise Exception("Unsupported filetype provided for source file")
......@@ -334,7 +256,6 @@ def run_all(mapping_file, target_code_type,
out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"])
#Merge with Concept Types in Summary Excel File
summary_config = mapping["concepts"]
if "excel_sheet" in summary_config:
summary_df = read_table_file(summary_config["file"], excel_sheet=summary_config["excel_sheet"])
else:
......@@ -346,8 +267,9 @@ def run_all(mapping_file, target_code_type,
else:
summary_cols_all += v
output_version = summary_config["version"]
summary_df = summary_df[summary_cols_all] #select all relevant columns
summary_df = summary_df.rename(columns={summary_config["columns"]["concept_name"]: "CONCEPT_SET"})
summary_df = summary_df.rename(columns={summary_config["columns"]["concept_set_name"]: "CONCEPT_SET"})
summary_df = summary_df.drop_duplicates() #remove duplicates
out = out.merge(summary_df, how="left", on='CONCEPT_SET')
......@@ -355,11 +277,17 @@ def run_all(mapping_file, target_code_type,
print(bcolors.HEADER, "---"*5, "OUTPUT", "---"*5, bcolors.ENDC)
print(out)
if output_path == "atlas":
#Export to DB
db_path = "codes/omop_54.sqlite"
vocab_output = "MELDB" #TODO: parameterise output name
vocab_id = summary_config["omop"]["vocabulary_id"]
vocab_version = summary_config["version"]
vocab_name = summary_config["omop"]["vocabulary_name"]
vocab_reference = summary_config["omop"]["vocabulary_reference"]
omop_publish_concept_sets(out, db_path, vocab_output, omop_vocab_types[target_code_type])
#Create New OMOP Vocabulary
omop_setup(db_path, vocab_id, vocab_version, vocab_name, vocab_reference)
#Export to DB
omop_publish_concept_sets(out, db_path, vocab_id, omop_vocab_types[target_code_type], vocab_version)
else:
# export as CSV to /output
out.to_csv(output_path, index=False)
......@@ -386,7 +314,7 @@ if __name__ == '__main__':
parser.add_argument("--no-translate", action='store_true', help="Do not translate code types")
parser.add_argument("--no-verify", action='store_true', help="Do not verify codes are correct")
parser.add_argument("--output", type=str, help="File Location to save output csv to")
parser.add_argument("--error-log", type=str, help="File Location to save output csv to")
parser.add_argument("--error-log", type=str, help="File Location to save error log csv to")
args = parser.parse_args()
config = vars(args)
......
import os
import argparse
import sqlite3
import pandas as pd
db_path = "codes/omop_54.sqlite"
#Populate SQLite3 Database with default OMOP CONCEPTS
def omop_install (db_path, folder_path):
conn = sqlite3.connect(db_path)
# Check if the folder exists
if not os.path.isdir(folder_path):
raise Exception(f"Error: The folder '{folder_path}' does not exist.")
# Iterate through files in the folder
for filename in os.listdir(folder_path):
if filename.endswith(".csv"): # Check if the file is a CSV
file_path = os.path.join(folder_path, filename)
try:
print(f"Reading file: {file_path}")
# Read the CSV file with the specified delimiter
df = pd.read_csv(file_path, delimiter="\t", low_memory=False)
table_name = os.path.splitext(os.path.basename(file_path))[0] #Get name of file
#Export Table to sqlite db
df.to_sql(table_name, conn, if_exists='replace', index=False)
except Exception as e:
raise Exception(f"Error reading file {file_path}: {e}")
conn.close()
def table_exists(cursor, table_name):
# Query to check if the table exists
cursor.execute(
"""
SELECT name
FROM sqlite_master
WHERE type='table' AND name=?
""",
(table_name,)
)
# Fetch the result
result = cursor.fetchone()
return result is not None
def omop_vocab_exists(cursor, vocab_id):
# Query to check if the table exists
cursor.execute(
"""
SELECT vocabulary_id
FROM VOCABULARY
WHERE vocabulary_id=?
""",
(vocab_id,)
)
# Fetch the result
result = cursor.fetchone()
return result is not None
#Setup SQLite3 Database for OMOP
def omop_setup(db_path, vocab_id, vocab_version, vocab_name, vocab_reference):
conn = sqlite3.connect(db_path)
cur = conn.cursor()
#Check if DB populated with necessary VOCABULARY
if not table_exists(cur, "VOCABULARY"):
raise Exception(f"Error {db_path} is not yet populated with OMOP VOCABULARY. Please download from https://athena.ohdsi.org/.")
#Check if Vocabulary already exists
elif not omop_vocab_exists(cur, vocab_id):
#Create VOCABULARY
df_test = pd.DataFrame([{
"vocabulary_id": vocab_id,
"vocabulary_name": vocab_name,
"vocabulary_reference": vocab_reference,
"vocabulary_version": vocab_version,
# "vocabulary_concept_id": 0,
}])
df_test.to_sql("VOCABULARY", conn, if_exists='append', index=False)
#Check if CONCEPT_SET table exists
if not table_exists(cur, "CONCEPT_SET"):
cur.execute("""
CREATE TABLE CONCEPT_SET (
concept_set_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each concept set
atlas_id INTEGER, -- Unique identifier generated by ATLAS
concept_set_name TEXT, -- Optional name for the concept set
concept_set_description TEXT, -- Optional description for the concept set
vocabulary_id TEXT NOT NULL, -- Foreign key to VOCABULARY table
FOREIGN KEY (vocabulary_id) REFERENCES VOCABULARY(vocabulary_id)
);""")
#Check if CONCEPT_SET_ITEM table exists
if not table_exists(cur, "CONCEPT_SET_ITEM"):
cur.execute("""
CREATE TABLE CONCEPT_SET_ITEM (
concept_set_item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each mapping
concept_set_id INTEGER NOT NULL, -- Foreign key to CONCEPT_SET table
concept_id INTEGER NOT NULL, -- Foreign key to CONCEPT table
FOREIGN KEY (concept_set_id) REFERENCES CONCEPT_SET(concept_set_id),
FOREIGN KEY (concept_id) REFERENCES CONCEPT(concept_id)
);""")
conn.close()
def omop_publish_concept_sets(out, db_path, vocab_output, vocab_type, output_version):
conn = sqlite3.connect(db_path)
cur = conn.cursor()
for concept_set_name, grp in out.groupby("CONCEPT_SET"):
#Create Concept_Set
if not sql_row_exist(conn, "CONCEPT_SET", "concept_set_name", concept_set_name):
cur.execute(f"INSERT INTO CONCEPT_SET (concept_set_name, vocabulary_id) VALUES ('{concept_set_name}', 'MELDB');")
else:
print("concept_set", concept_set_name, "already exists")
#TODO: ask to remove old concept_set?
#Get Concept_set_Id
query = "SELECT concept_set_id FROM CONCEPT_SET WHERE concept_set_name = ? AND vocabulary_id = ?;"
cur.execute(query, (concept_set_name, vocab_output, ))
concept_set_id = cur.fetchone()[0]
#Get corresponing Concept_id (OMOP) for each Concept_code (e.g. SNOMED)
concept_codes = "'"+"', '".join(list(grp["CONCEPT"].astype(str)))+"'"
query = f"SELECT concept_id FROM CONCEPT WHERE vocabulary_id = ? AND concept_code IN ({concept_codes});"
cur.execute(query, (vocab_type, ))
df_out = pd.DataFrame(cur.fetchall(), columns=["concept_id"])
if not len(grp) == len(df_out):
print("ERROR: Some", vocab_type, "Codes do not exist in OMOP Database")
#Create Concept_set_item
df_out["concept_set_id"] = concept_set_id
df_out.to_sql("CONCEPT_SET_ITEM", conn, if_exists='append', index=False)
conn.close()
def omop_clear(db_path):
conn = sqlite3.connect(db_path)
cur = conn.cursor()
cur.execute("DROP TABLE CONCEPT_SET;")
cur.execute("DROP TABLE CONCEPT_SET_ITEM;")
conn.close()
def main():
parser = argparse.ArgumentParser(
description="Installation of SQLite3 OMOP Database.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("-i", "--install", type=str, help="Install OMOP Vocabularies in Database with download from athena.ohdsi.org")
parser.add_argument("--clear", action="store_true", help="Delete ALL CONCEPT_SETS from OMOP Database")
parser.add_argument("--delete", action="store_true", help="Delete ALL DATA from OMOP Database")
args = parser.parse_args()
config = vars(args)
if config["install"] is not None:
omop_install(db_path, config["install"])
elif config["clear"]:
omop_clear(db_path)
elif config["delete"]:
omop_reset(db_path)
if __name__ == "__main__":
main()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment