acmc.omop
OMOP Module
This module provides functionality to manage OMOP vocabularies.
1""" 2OMOP Module 3================ 4This module provides functionality to manage OMOP vocabularies. 5""" 6 7import os 8import argparse 9import sqlite3 10import pandas as pd 11import logging 12import zipfile 13import shutil 14import json 15import yaml 16from pathlib import Path 17 18from acmc import util, logging_config 19 20# setup logging 21logger = logging_config.setup_logger() 22 23# constants 24VOCAB_PATH = Path("./vocab/omop") 25OMOP_CDM_Version = "54" 26OMOP_DB_FILENAME = f"omop_{OMOP_CDM_Version}.sqlite" 27DB_PATH = VOCAB_PATH / OMOP_DB_FILENAME 28VERSION_FILE = "omop_version.yaml" 29VERSION_PATH = VOCAB_PATH / VERSION_FILE 30EXPORT_FILE = f"omop_{OMOP_CDM_Version}_export.sqlite" 31 32vocabularies = { 33 "source": "OHDSI Athena", 34 "url": "https://athena.ohdsi.org/vocabulary/list", 35 "cdm_version": OMOP_CDM_Version, 36 "version": "", 37 "vocabularies": [ 38 {"id": 1, "name": "SNOMED"}, # No license required 39 {"id": 2, "name": "ICD9CM"}, # No license required 40 {"id": 17, "name": "Readv2"}, # No license required 41 {"id": 21, "name": "ATC"}, # No license required 42 {"id": 55, "name": "OPCS4"}, # No license required 43 {"id": 57, "name": "HES Specialty"}, # No license required 44 {"id": 70, "name": "ICD10CM"}, # No license required 45 {"id": 75, "name": "dm+d"}, # No license required 46 {"id": 144, "name": "UK Biobank"}, # No license required 47 {"id": 154, "name": "NHS Ethnic Category"}, # No license required 48 {"id": 155, "name": "NHS Place of Service"}, # No license required 49 ], 50 "tables": [], 51} 52 53omop_vocab_types = { 54 "read2": "Read", 55 "read3": None, 56 "icd10": "ICD10CM", 57 "snomed": "SNOMED", 58 "opcs4": "OPCS4", 59 "atc": "ATC", 60 "med": None, 61 "cprd": None, 62} 63 64 65# Populate SQLite3 Database with default OMOP CONCEPTS 66def install(omop_zip_file: str, version: str): 67 """Installs the OMOP release csv files in a file-based sql database""" 68 logger.info(f"Installing OMOP from zip file: {omop_zip_file}") 69 omop_zip_path = Path(omop_zip_file) 70 71 # Check if the file exists and is a ZIP file 72 if not omop_zip_path.exists(): 73 msg = f"{omop_zip_path} does not exist." 74 logger.error(msg) 75 raise ValueError(msg) 76 if not zipfile.is_zipfile(omop_zip_path): 77 msg = f"Error: {omop_zip_path} is not a valid ZIP file." 78 logger.error(msg) 79 raise ValueError(msg) 80 81 # check codes directory exists and if not create it 82 if not VOCAB_PATH.exists(): 83 VOCAB_PATH.mkdir(parents=True) 84 logger.debug(f"OMOP directory '{VOCAB_PATH}' created.") 85 else: 86 # removing existing OMOP files 87 csv_files = list(VOCAB_PATH.glob("*.csv")) 88 for file in csv_files: 89 file.unlink() 90 logger.debug(f"Deleted OMOP csv file: {file}") 91 92 # Extract ZIP contents 93 with zipfile.ZipFile(omop_zip_path, "r") as zip_ref: 94 zip_ref.extractall(VOCAB_PATH) 95 logger.info(f"Extracted OMOP zip file {omop_zip_path} to {VOCAB_PATH}/") 96 97 # connect to database, if it does not exist it will be created 98 conn = sqlite3.connect(DB_PATH) 99 100 # Iterate through files in the folder 101 csv_files = list(VOCAB_PATH.glob("*.csv")) 102 total_tables_count = len(csv_files) 103 table_count = 1 104 for filename in csv_files: 105 try: 106 logger.info( 107 f"Processing {table_count} of {total_tables_count} tables: {filename}" 108 ) 109 # read the CSV file with the specified delimiter 110 df = pd.read_csv(filename, delimiter="\t", low_memory=False) 111 112 # export Table to sqlite db 113 df.to_sql(filename.stem, conn, if_exists="replace", index=False) 114 115 # add to the metadata 116 list(vocabularies["tables"]).append(filename.stem) 117 table_count = table_count + 1 118 except Exception as e: 119 raise Exception(f"Error reading file {filename}: {e}") 120 121 conn.close() 122 123 # write version file 124 write_version_file(version) 125 126 logger.info(f"OMOP installation completed") 127 128 129def write_version_file(version: str): 130 """Writes the OMOP vocaburaries and version to a file""" 131 vocabularies["version"] = version 132 with open(VERSION_PATH, "w") as file: 133 yaml.dump( 134 vocabularies, 135 file, 136 Dumper=util.QuotedDumper, 137 default_flow_style=False, 138 sort_keys=False, 139 default_style='"', 140 ) 141 142 143def clear(db_path: Path): 144 """Clears the OMOP sql database""" 145 logger.info(f"Clearing OMOP data from database") 146 if not db_path.is_file(): 147 raise FileNotFoundError(f"Error: OMOP DB file '{db_path}' does not exist.") 148 conn = sqlite3.connect(db_path) 149 cur = conn.cursor() 150 cur.execute("SELECT name FROM sqlite_master WHERE type='table';") 151 152 # Fetch and print table names 153 tables = cur.fetchall() 154 logger.debug("Tables in database:", [table[0] for table in tables]) 155 156 # cur.execute("DROP TABLE CONCEPT_SET;") 157 # cur.execute("DROP TABLE CONCEPT_SET_ITEM;") 158 159 conn.close() 160 logger.info(f"OMOP database cleared") 161 162 163def delete(db_path: Path): 164 """Deletes the OMOP sql database""" 165 logger.info(f"Deleting OMOP database") 166 if not db_path.is_file(): 167 raise FileNotFoundError(f"Error: OMOP DB file '{db_path}' does not exist.") 168 169 db_path.unlink() 170 logger.info(f"OMOP database deleted") 171 172 173def table_exists(cursor: sqlite3.Cursor, table_name: str) -> bool: 174 # Query to check if the table exists 175 cursor.execute( 176 """ 177 SELECT name 178 FROM sqlite_master 179 WHERE type='table' AND name=? 180 """, 181 (table_name,), 182 ) 183 184 # Fetch the result 185 result = cursor.fetchone() 186 187 return result is not None 188 189 190def vocab_exists(cursor: sqlite3.Cursor, vocab_id: str) -> bool: 191 # Query to check if the table exists 192 cursor.execute( 193 """ 194 SELECT vocabulary_id 195 FROM VOCABULARY 196 WHERE vocabulary_id=? 197 """, 198 (vocab_id,), 199 ) 200 201 # Fetch the result 202 result = cursor.fetchone() 203 204 return result is not None 205 206 207def concept_set_exist(cursor: sqlite3.Cursor, concept_set_name: str) -> bool: 208 query = f"SELECT EXISTS (SELECT 1 FROM CONCEPT_SET WHERE concept_set_name = ?)" 209 cursor.execute(query, (concept_set_name,)) 210 211 # 1 if exists, 0 otherwise 212 return cursor.fetchone()[0] == 1 213 214 215def export(map_path: Path, export_path: Path, version: str, omop_metadata) -> Path: 216 logger.debug(f"exporting with metadata {omop_metadata} at version {version}") 217 218 # copy the baseline omop database 219 export_db_path = export_path / EXPORT_FILE 220 shutil.copy(DB_PATH, export_db_path) 221 222 # connect to db 223 conn = sqlite3.connect(export_db_path) 224 cur = conn.cursor() 225 226 # Create VOCABULARY 227 df_test = pd.DataFrame( 228 [ 229 { 230 "vocabulary_id": omop_metadata["vocabulary_id"], 231 "vocabulary_name": omop_metadata["vocabulary_name"], 232 "vocabulary_reference": omop_metadata["vocabulary_reference"], 233 "vocabulary_version": version, 234 # "vocabulary_concept_id": 0, 235 } 236 ] 237 ) 238 df_test.to_sql("VOCABULARY", conn, if_exists="append", index=False) 239 240 # Create CONCEPT_SET 241 cur.execute( 242 """ 243 CREATE TABLE CONCEPT_SET ( 244 concept_set_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each concept set 245 atlas_id INTEGER, -- Unique identifier generated by ATLAS 246 concept_set_name TEXT, -- Optional name for the concept set 247 concept_set_description TEXT, -- Optional description for the concept set 248 vocabulary_id TEXT NOT NULL, -- Foreign key to VOCABULARY table 249 FOREIGN KEY (vocabulary_id) REFERENCES VOCABULARY(vocabulary_id) 250 );""" 251 ) 252 253 # Create CONCEPT_SET_ITEM 254 cur.execute( 255 """ 256 CREATE TABLE CONCEPT_SET_ITEM ( 257 concept_set_item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each mapping 258 concept_set_id INTEGER NOT NULL, -- Foreign key to CONCEPT_SET table 259 concept_id INTEGER NOT NULL, -- Foreign key to CONCEPT table 260 FOREIGN KEY (concept_set_id) REFERENCES CONCEPT_SET(concept_set_id), 261 FOREIGN KEY (concept_id) REFERENCES CONCEPT(concept_id) 262 );""" 263 ) 264 265 # read map files 266 map_files = list(map_path.glob("*.csv")) 267 total = len(map_files) 268 logger.info(f"Exporting {total} map files") 269 for index, map_file in enumerate(map_files): 270 logger.info(f"Processing {index+1} of {total}: {map_file}") 271 df = pd.read_csv(map_file) 272 273 for concept_set_name, grp in df.groupby("CONCEPT_SET"): 274 # create Concept_Set 275 if not concept_set_exist(cur, str(concept_set_name)): 276 cur.execute( 277 f"INSERT INTO CONCEPT_SET (concept_set_name, vocabulary_id) VALUES ('{concept_set_name}', '{omop_metadata['vocabulary_id']}');" 278 ) 279 else: 280 logger.debug(f"Concept_set {concept_set_name} already exists") 281 # TODO: ask to remove old concept_set? 282 283 # get Concept_set_Id 284 query = "SELECT concept_set_id FROM CONCEPT_SET WHERE concept_set_name = ? AND vocabulary_id = ?;" 285 target_code_type = map_file.stem 286 cur.execute( 287 query, 288 ( 289 concept_set_name, 290 omop_metadata["vocabulary_id"], 291 ), 292 ) 293 # FAILS HERE WITH NONE REUR 294 logger.debug(f"target code type {target_code_type}") 295 logger.debug(f"omop code type {omop_vocab_types[target_code_type]}") 296 concept_set_id = cur.fetchone()[0] 297 logger.debug(f"concept set id {concept_set_id}") 298 299 # get corresponing Concept_id (OMOP) for each Concept_code (e.g. SNOMED) 300 concept_codes = "'" + "', '".join(list(grp["CONCEPT"].astype(str))) + "'" 301 query = f"SELECT concept_id FROM CONCEPT WHERE vocabulary_id = ? AND concept_code IN ({concept_codes});" 302 cur.execute(query, (omop_vocab_types[target_code_type],)) 303 df_out = pd.DataFrame(cur.fetchall(), columns=["concept_id"]) 304 305 if not len(grp) == len(df_out): 306 logger.error( 307 f"ERROR: Some {omop_vocab_types[target_code_type]} Codes do not exist in OMOP Database" 308 ) 309 310 # Create Concept_set_item 311 df_out["concept_set_id"] = concept_set_id 312 df_out.to_sql("CONCEPT_SET_ITEM", conn, if_exists="append", index=False) 313 314 # Output all tables to CSV 315 # Get the list of all tables 316 cur.execute("SELECT name FROM sqlite_master WHERE type='table';") 317 tables = cur.fetchall() # List of tables 318 319 # Export each table to a separate CSV file 320 for table in tables: 321 table_name = table[0] 322 # ignore SQLite's internal system table 323 if table_name != "sqlite_sequence": 324 df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn) 325 output_file = f"{table_name}.csv" 326 output_path = export_path / output_file 327 df.to_csv(output_path, index=False) # Save as CSV 328 logger.info(f"Exported {table_name} to {table_name}.csv") 329 330 conn.close() 331 332 logger.debug(f"Created export db successfully") 333 334 return export_db_path
logger =
<Logger acmc_logger (INFO)>
VOCAB_PATH =
PosixPath('vocab/omop')
OMOP_CDM_Version =
'54'
OMOP_DB_FILENAME =
'omop_54.sqlite'
DB_PATH =
PosixPath('vocab/omop/omop_54.sqlite')
VERSION_FILE =
'omop_version.yaml'
VERSION_PATH =
PosixPath('vocab/omop/omop_version.yaml')
EXPORT_FILE =
'omop_54_export.sqlite'
vocabularies =
{'source': 'OHDSI Athena', 'url': 'https://athena.ohdsi.org/vocabulary/list', 'cdm_version': '54', 'version': '', 'vocabularies': [{'id': 1, 'name': 'SNOMED'}, {'id': 2, 'name': 'ICD9CM'}, {'id': 17, 'name': 'Readv2'}, {'id': 21, 'name': 'ATC'}, {'id': 55, 'name': 'OPCS4'}, {'id': 57, 'name': 'HES Specialty'}, {'id': 70, 'name': 'ICD10CM'}, {'id': 75, 'name': 'dm+d'}, {'id': 144, 'name': 'UK Biobank'}, {'id': 154, 'name': 'NHS Ethnic Category'}, {'id': 155, 'name': 'NHS Place of Service'}], 'tables': []}
omop_vocab_types =
{'read2': 'Read', 'read3': None, 'icd10': 'ICD10CM', 'snomed': 'SNOMED', 'opcs4': 'OPCS4', 'atc': 'ATC', 'med': None, 'cprd': None}
def
install(omop_zip_file: str, version: str):
67def install(omop_zip_file: str, version: str): 68 """Installs the OMOP release csv files in a file-based sql database""" 69 logger.info(f"Installing OMOP from zip file: {omop_zip_file}") 70 omop_zip_path = Path(omop_zip_file) 71 72 # Check if the file exists and is a ZIP file 73 if not omop_zip_path.exists(): 74 msg = f"{omop_zip_path} does not exist." 75 logger.error(msg) 76 raise ValueError(msg) 77 if not zipfile.is_zipfile(omop_zip_path): 78 msg = f"Error: {omop_zip_path} is not a valid ZIP file." 79 logger.error(msg) 80 raise ValueError(msg) 81 82 # check codes directory exists and if not create it 83 if not VOCAB_PATH.exists(): 84 VOCAB_PATH.mkdir(parents=True) 85 logger.debug(f"OMOP directory '{VOCAB_PATH}' created.") 86 else: 87 # removing existing OMOP files 88 csv_files = list(VOCAB_PATH.glob("*.csv")) 89 for file in csv_files: 90 file.unlink() 91 logger.debug(f"Deleted OMOP csv file: {file}") 92 93 # Extract ZIP contents 94 with zipfile.ZipFile(omop_zip_path, "r") as zip_ref: 95 zip_ref.extractall(VOCAB_PATH) 96 logger.info(f"Extracted OMOP zip file {omop_zip_path} to {VOCAB_PATH}/") 97 98 # connect to database, if it does not exist it will be created 99 conn = sqlite3.connect(DB_PATH) 100 101 # Iterate through files in the folder 102 csv_files = list(VOCAB_PATH.glob("*.csv")) 103 total_tables_count = len(csv_files) 104 table_count = 1 105 for filename in csv_files: 106 try: 107 logger.info( 108 f"Processing {table_count} of {total_tables_count} tables: {filename}" 109 ) 110 # read the CSV file with the specified delimiter 111 df = pd.read_csv(filename, delimiter="\t", low_memory=False) 112 113 # export Table to sqlite db 114 df.to_sql(filename.stem, conn, if_exists="replace", index=False) 115 116 # add to the metadata 117 list(vocabularies["tables"]).append(filename.stem) 118 table_count = table_count + 1 119 except Exception as e: 120 raise Exception(f"Error reading file {filename}: {e}") 121 122 conn.close() 123 124 # write version file 125 write_version_file(version) 126 127 logger.info(f"OMOP installation completed")
Installs the OMOP release csv files in a file-based sql database
def
write_version_file(version: str):
130def write_version_file(version: str): 131 """Writes the OMOP vocaburaries and version to a file""" 132 vocabularies["version"] = version 133 with open(VERSION_PATH, "w") as file: 134 yaml.dump( 135 vocabularies, 136 file, 137 Dumper=util.QuotedDumper, 138 default_flow_style=False, 139 sort_keys=False, 140 default_style='"', 141 )
Writes the OMOP vocaburaries and version to a file
def
clear(db_path: pathlib.Path):
144def clear(db_path: Path): 145 """Clears the OMOP sql database""" 146 logger.info(f"Clearing OMOP data from database") 147 if not db_path.is_file(): 148 raise FileNotFoundError(f"Error: OMOP DB file '{db_path}' does not exist.") 149 conn = sqlite3.connect(db_path) 150 cur = conn.cursor() 151 cur.execute("SELECT name FROM sqlite_master WHERE type='table';") 152 153 # Fetch and print table names 154 tables = cur.fetchall() 155 logger.debug("Tables in database:", [table[0] for table in tables]) 156 157 # cur.execute("DROP TABLE CONCEPT_SET;") 158 # cur.execute("DROP TABLE CONCEPT_SET_ITEM;") 159 160 conn.close() 161 logger.info(f"OMOP database cleared")
Clears the OMOP sql database
def
delete(db_path: pathlib.Path):
164def delete(db_path: Path): 165 """Deletes the OMOP sql database""" 166 logger.info(f"Deleting OMOP database") 167 if not db_path.is_file(): 168 raise FileNotFoundError(f"Error: OMOP DB file '{db_path}' does not exist.") 169 170 db_path.unlink() 171 logger.info(f"OMOP database deleted")
Deletes the OMOP sql database
def
table_exists(cursor: sqlite3.Cursor, table_name: str) -> bool:
174def table_exists(cursor: sqlite3.Cursor, table_name: str) -> bool: 175 # Query to check if the table exists 176 cursor.execute( 177 """ 178 SELECT name 179 FROM sqlite_master 180 WHERE type='table' AND name=? 181 """, 182 (table_name,), 183 ) 184 185 # Fetch the result 186 result = cursor.fetchone() 187 188 return result is not None
def
vocab_exists(cursor: sqlite3.Cursor, vocab_id: str) -> bool:
191def vocab_exists(cursor: sqlite3.Cursor, vocab_id: str) -> bool: 192 # Query to check if the table exists 193 cursor.execute( 194 """ 195 SELECT vocabulary_id 196 FROM VOCABULARY 197 WHERE vocabulary_id=? 198 """, 199 (vocab_id,), 200 ) 201 202 # Fetch the result 203 result = cursor.fetchone() 204 205 return result is not None
def
concept_set_exist(cursor: sqlite3.Cursor, concept_set_name: str) -> bool:
def
export( map_path: pathlib.Path, export_path: pathlib.Path, version: str, omop_metadata) -> pathlib.Path:
216def export(map_path: Path, export_path: Path, version: str, omop_metadata) -> Path: 217 logger.debug(f"exporting with metadata {omop_metadata} at version {version}") 218 219 # copy the baseline omop database 220 export_db_path = export_path / EXPORT_FILE 221 shutil.copy(DB_PATH, export_db_path) 222 223 # connect to db 224 conn = sqlite3.connect(export_db_path) 225 cur = conn.cursor() 226 227 # Create VOCABULARY 228 df_test = pd.DataFrame( 229 [ 230 { 231 "vocabulary_id": omop_metadata["vocabulary_id"], 232 "vocabulary_name": omop_metadata["vocabulary_name"], 233 "vocabulary_reference": omop_metadata["vocabulary_reference"], 234 "vocabulary_version": version, 235 # "vocabulary_concept_id": 0, 236 } 237 ] 238 ) 239 df_test.to_sql("VOCABULARY", conn, if_exists="append", index=False) 240 241 # Create CONCEPT_SET 242 cur.execute( 243 """ 244 CREATE TABLE CONCEPT_SET ( 245 concept_set_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each concept set 246 atlas_id INTEGER, -- Unique identifier generated by ATLAS 247 concept_set_name TEXT, -- Optional name for the concept set 248 concept_set_description TEXT, -- Optional description for the concept set 249 vocabulary_id TEXT NOT NULL, -- Foreign key to VOCABULARY table 250 FOREIGN KEY (vocabulary_id) REFERENCES VOCABULARY(vocabulary_id) 251 );""" 252 ) 253 254 # Create CONCEPT_SET_ITEM 255 cur.execute( 256 """ 257 CREATE TABLE CONCEPT_SET_ITEM ( 258 concept_set_item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each mapping 259 concept_set_id INTEGER NOT NULL, -- Foreign key to CONCEPT_SET table 260 concept_id INTEGER NOT NULL, -- Foreign key to CONCEPT table 261 FOREIGN KEY (concept_set_id) REFERENCES CONCEPT_SET(concept_set_id), 262 FOREIGN KEY (concept_id) REFERENCES CONCEPT(concept_id) 263 );""" 264 ) 265 266 # read map files 267 map_files = list(map_path.glob("*.csv")) 268 total = len(map_files) 269 logger.info(f"Exporting {total} map files") 270 for index, map_file in enumerate(map_files): 271 logger.info(f"Processing {index+1} of {total}: {map_file}") 272 df = pd.read_csv(map_file) 273 274 for concept_set_name, grp in df.groupby("CONCEPT_SET"): 275 # create Concept_Set 276 if not concept_set_exist(cur, str(concept_set_name)): 277 cur.execute( 278 f"INSERT INTO CONCEPT_SET (concept_set_name, vocabulary_id) VALUES ('{concept_set_name}', '{omop_metadata['vocabulary_id']}');" 279 ) 280 else: 281 logger.debug(f"Concept_set {concept_set_name} already exists") 282 # TODO: ask to remove old concept_set? 283 284 # get Concept_set_Id 285 query = "SELECT concept_set_id FROM CONCEPT_SET WHERE concept_set_name = ? AND vocabulary_id = ?;" 286 target_code_type = map_file.stem 287 cur.execute( 288 query, 289 ( 290 concept_set_name, 291 omop_metadata["vocabulary_id"], 292 ), 293 ) 294 # FAILS HERE WITH NONE REUR 295 logger.debug(f"target code type {target_code_type}") 296 logger.debug(f"omop code type {omop_vocab_types[target_code_type]}") 297 concept_set_id = cur.fetchone()[0] 298 logger.debug(f"concept set id {concept_set_id}") 299 300 # get corresponing Concept_id (OMOP) for each Concept_code (e.g. SNOMED) 301 concept_codes = "'" + "', '".join(list(grp["CONCEPT"].astype(str))) + "'" 302 query = f"SELECT concept_id FROM CONCEPT WHERE vocabulary_id = ? AND concept_code IN ({concept_codes});" 303 cur.execute(query, (omop_vocab_types[target_code_type],)) 304 df_out = pd.DataFrame(cur.fetchall(), columns=["concept_id"]) 305 306 if not len(grp) == len(df_out): 307 logger.error( 308 f"ERROR: Some {omop_vocab_types[target_code_type]} Codes do not exist in OMOP Database" 309 ) 310 311 # Create Concept_set_item 312 df_out["concept_set_id"] = concept_set_id 313 df_out.to_sql("CONCEPT_SET_ITEM", conn, if_exists="append", index=False) 314 315 # Output all tables to CSV 316 # Get the list of all tables 317 cur.execute("SELECT name FROM sqlite_master WHERE type='table';") 318 tables = cur.fetchall() # List of tables 319 320 # Export each table to a separate CSV file 321 for table in tables: 322 table_name = table[0] 323 # ignore SQLite's internal system table 324 if table_name != "sqlite_sequence": 325 df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn) 326 output_file = f"{table_name}.csv" 327 output_path = export_path / output_file 328 df.to_csv(output_path, index=False) # Save as CSV 329 logger.info(f"Exported {table_name} to {table_name}.csv") 330 331 conn.close() 332 333 logger.debug(f"Created export db successfully") 334 335 return export_db_path