acmc.omop
omop.py module
This module provides functionality to manage OMOP vocabularies.
1""" 2omop.py module 3 4This module provides functionality to manage OMOP vocabularies. 5""" 6 7import os 8import argparse 9import sqlite3 10import pandas as pd 11import logging 12import zipfile 13import shutil 14import json 15import yaml 16from pathlib import Path 17from acmc import util, logging_config 18 19# setup logging 20_logger = logging_config.setup_logger() 21 22VOCAB_PATH = Path("./vocab/omop") 23"""Default OMOP vocabulary path""" 24 25OMOP_CDM_Version = "54" 26"""Supported OMOP CDM version number""" 27 28OMOP_DB_FILENAME = f"omop_{OMOP_CDM_Version}.sqlite" 29"""Default OMOP sqllite database filename""" 30 31DB_PATH = VOCAB_PATH / OMOP_DB_FILENAME 32"""Default OMOP sqllite database path""" 33 34VERSION_FILE = "omop_version.yml" 35"""Default OMOP version file""" 36 37VERSION_PATH = VOCAB_PATH / VERSION_FILE 38"""Default OMOP version path""" 39 40EXPORT_FILE = f"omop_{OMOP_CDM_Version}_export.sqlite" 41"""Default OMOP export database filename""" 42 43vocabularies = { 44 "source": "OHDSI Athena", 45 "url": "https://athena.ohdsi.org/vocabulary/list", 46 "cdm_version": OMOP_CDM_Version, 47 "version": "", 48 "vocabularies": [ 49 {"id": 1, "name": "SNOMED"}, # No license required 50 {"id": 2, "name": "ICD9CM"}, # No license required 51 {"id": 17, "name": "Readv2"}, # No license required 52 {"id": 21, "name": "ATC"}, # No license required 53 {"id": 55, "name": "OPCS4"}, # No license required 54 {"id": 57, "name": "HES Specialty"}, # No license required 55 {"id": 70, "name": "ICD10CM"}, # No license required 56 {"id": 75, "name": "dm+d"}, # No license required 57 {"id": 144, "name": "UK Biobank"}, # No license required 58 {"id": 154, "name": "NHS Ethnic Category"}, # No license required 59 {"id": 155, "name": "NHS Place of Service"}, # No license required 60 ], 61 "tables": [], 62} 63"""Required OMOP vocabularies definition""" 64 65omop_vocab_types = { 66 "read2": "Read", 67 "read3": None, 68 "icd10": "ICD10CM", 69 "snomed": "SNOMED", 70 "opcs4": "OPCS4", 71 "atc": "ATC", 72 "med": None, 73 "cprd": None, 74} 75"""Type mappings from acmc medical coding types to OMOP vocabulary types""" 76 77 78def install(omop_zip_file: str, version: str): 79 """ "Installs the OMOP release csv files in a file-based sql database 80 81 Args: 82 omop_zip_file (str): vocabularies zip file distributed by OHDSI Athena 83 version (str): version of the vocabularies distributed by OHDSI Athena 84 85 Raises: 86 ValueError: if the zip file does not exist 87 ValueError: if the file is not a zip file 88 Exception: if error reading omop csv files 89 """ 90 _logger.info(f"Installing OMOP from zip file: {omop_zip_file}") 91 omop_zip_path = Path(omop_zip_file) 92 93 # Check if the file exists and is a ZIP file 94 if not omop_zip_path.exists(): 95 msg = f"{omop_zip_path} does not exist." 96 _logger.error(msg) 97 raise ValueError(msg) 98 99 if not zipfile.is_zipfile(omop_zip_path): 100 msg = f"Error: {omop_zip_path} is not a valid ZIP file." 101 _logger.error(msg) 102 raise ValueError(msg) 103 104 # check codes directory exists and if not create it 105 if not VOCAB_PATH.exists(): 106 VOCAB_PATH.mkdir(parents=True) 107 _logger.debug(f"OMOP directory '{VOCAB_PATH}' created.") 108 else: 109 # removing existing OMOP files 110 csv_files = list(VOCAB_PATH.glob("*.csv")) 111 for file in csv_files: 112 file.unlink() 113 _logger.debug(f"Deleted OMOP csv file: {file}") 114 115 # Extract ZIP contents 116 with zipfile.ZipFile(omop_zip_path, "r") as zip_ref: 117 zip_ref.extractall(VOCAB_PATH) 118 _logger.info(f"Extracted OMOP zip file {omop_zip_path} to {VOCAB_PATH}/") 119 120 # connect to database, if it does not exist it will be created 121 conn = sqlite3.connect(DB_PATH) 122 123 # Iterate through files in the folder 124 csv_files = list(VOCAB_PATH.glob("*.csv")) 125 total_tables_count = len(csv_files) 126 table_count = 1 127 for filename in csv_files: 128 try: 129 _logger.info( 130 f"Processing {table_count} of {total_tables_count} tables: {filename}" 131 ) 132 # read the CSV file with the specified delimiter 133 df = pd.read_csv(filename, delimiter="\t", low_memory=False) 134 135 # export Table to sqlite db 136 df.to_sql(filename.stem, conn, if_exists="replace", index=False) 137 138 # add to the metadata 139 list(vocabularies["tables"]).append(filename.stem) 140 table_count = table_count + 1 141 except Exception as e: 142 raise Exception(f"Error reading file {filename}: {e}") 143 144 conn.close() 145 146 # write version file 147 write_version_file(version) 148 149 _logger.info(f"OMOP installation completed") 150 151 152def write_version_file(version: str): 153 """Writes the OMOP vocaburaries and version to a file 154 155 Args: 156 version (str): version of the vocabularies distributed by OHDSI Athena 157 """ 158 vocabularies["version"] = version 159 with open(VERSION_PATH, "w") as file: 160 yaml.dump( 161 vocabularies, 162 file, 163 Dumper=util.QuotedDumper, 164 default_flow_style=False, 165 sort_keys=False, 166 default_style='"', 167 ) 168 169 170def clear(db_path: Path): 171 """Clears the OMOP sql database 172 173 Args: 174 db_path (Path): the path to the omop sqllite database 175 176 Raises: 177 FileNotFoundError: if the omop sqllite database does not exist 178 """ 179 _logger.info(f"Clearing OMOP data from database") 180 if not db_path.is_file(): 181 raise FileNotFoundError(f"Error: OMOP DB file '{db_path}' does not exist.") 182 conn = sqlite3.connect(db_path) 183 cur = conn.cursor() 184 cur.execute("SELECT name FROM sqlite_master WHERE type='table';") 185 186 # Fetch and print table names 187 tables = cur.fetchall() 188 _logger.debug("Tables in database:", [table[0] for table in tables]) 189 190 # cur.execute("DROP TABLE CONCEPT_SET;") 191 # cur.execute("DROP TABLE CONCEPT_SET_ITEM;") 192 193 conn.close() 194 _logger.info(f"OMOP database cleared") 195 196 197def delete(db_path: Path): 198 """Deletes the OMOP sql database 199 200 Args: 201 db_path (Path): the path to the omop sqllite database 202 203 Raises: 204 FileNotFoundError: if the omop sqllite database does not exist 205 """ 206 207 _logger.info(f"Deleting OMOP database") 208 if not db_path.is_file(): 209 raise FileNotFoundError(f"Error: OMOP DB file '{db_path}' does not exist.") 210 211 db_path.unlink() 212 _logger.info(f"OMOP database deleted") 213 214 215def table_exists(cursor: sqlite3.Cursor, table_name: str) -> bool: 216 """Query to check if the table exists 217 218 Args: 219 cursor (sqlite3.Cursor): a sqllite database cursor 220 table_name (str): the table name to check 221 222 Returns: 223 bool: true if table exists 224 """ 225 226 cursor.execute( 227 """ 228 SELECT name 229 FROM sqlite_master 230 WHERE type='table' AND name=? 231 """, 232 (table_name,), 233 ) 234 235 # Fetch the result 236 result = cursor.fetchone() 237 238 return result is not None 239 240 241def vocab_exists(cursor: sqlite3.Cursor, vocab_id: str) -> bool: 242 """Query to check if the vocabulary exists 243 244 Args: 245 cursor (sqlite3.Cursor): a sqllite database cursor 246 vocab_id (str): the vocabulary id to check 247 248 Returns: 249 bool: true if vocabulary id exists 250 """ 251 252 cursor.execute( 253 """ 254 SELECT vocabulary_id 255 FROM VOCABULARY 256 WHERE vocabulary_id=? 257 """, 258 (vocab_id,), 259 ) 260 261 # Fetch the result 262 result = cursor.fetchone() 263 264 return result is not None 265 266 267def concept_set_exist(cursor: sqlite3.Cursor, concept_set_name: str) -> bool: 268 """Query to check if the concept set exists 269 270 Args: 271 cursor (sqlite3.Cursor): a sqllite database cursor 272 concept_set_name (str): the concept set name to check 273 274 Returns: 275 bool: true if concept set exists 276 """ 277 278 query = f"SELECT EXISTS (SELECT 1 FROM CONCEPT_SET WHERE concept_set_name = ?)" 279 cursor.execute(query, (concept_set_name,)) 280 281 # 1 if exists, 0 otherwise 282 return cursor.fetchone()[0] == 1 283 284 285def export( 286 map_path: Path, export_path: Path, version: str, omop_metadata: dict 287) -> Path: 288 """Export concept sets to omop database in csv format 289 290 Args: 291 map_path (Path): path to the acmc map directory containing concept sets in csv format 292 export_path (Path): path to the directory where the omop database csv files are to be written 293 version (str): phenotype version for omop vocabulary version 294 omop_metadata (dict): phenotype omop metadata for omop vocabulary metadata 295 296 Returns: 297 Path: path to the exported sqllite database 298 """ 299 300 _logger.debug(f"exporting with metadata {omop_metadata} at version {version}") 301 302 # copy the baseline omop database 303 export_db_path = export_path / EXPORT_FILE 304 shutil.copy(DB_PATH, export_db_path) 305 306 # connect to db 307 conn = sqlite3.connect(export_db_path) 308 cur = conn.cursor() 309 310 # Create VOCABULARY 311 df_test = pd.DataFrame( 312 [ 313 { 314 "vocabulary_id": omop_metadata["vocabulary_id"], 315 "vocabulary_name": omop_metadata["vocabulary_name"], 316 "vocabulary_reference": omop_metadata["vocabulary_reference"], 317 "vocabulary_version": version, 318 # "vocabulary_concept_id": 0, 319 } 320 ] 321 ) 322 df_test.to_sql("VOCABULARY", conn, if_exists="append", index=False) 323 324 # Create CONCEPT_SET 325 cur.execute( 326 """ 327 CREATE TABLE CONCEPT_SET ( 328 concept_set_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each concept set 329 atlas_id INTEGER, -- Unique identifier generated by ATLAS 330 concept_set_name TEXT, -- Optional name for the concept set 331 concept_set_description TEXT, -- Optional description for the concept set 332 vocabulary_id TEXT NOT NULL, -- Foreign key to VOCABULARY table 333 FOREIGN KEY (vocabulary_id) REFERENCES VOCABULARY(vocabulary_id) 334 );""" 335 ) 336 337 # Create CONCEPT_SET_ITEM 338 cur.execute( 339 """ 340 CREATE TABLE CONCEPT_SET_ITEM ( 341 concept_set_item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each mapping 342 concept_set_id INTEGER NOT NULL, -- Foreign key to CONCEPT_SET table 343 concept_id INTEGER NOT NULL, -- Foreign key to CONCEPT table 344 FOREIGN KEY (concept_set_id) REFERENCES CONCEPT_SET(concept_set_id), 345 FOREIGN KEY (concept_id) REFERENCES CONCEPT(concept_id) 346 );""" 347 ) 348 349 # read map files 350 map_files = list(map_path.glob("*.csv")) 351 total = len(map_files) 352 _logger.info(f"Exporting {total} map files") 353 for index, map_file in enumerate(map_files): 354 _logger.info(f"Processing {index+1} of {total}: {map_file}") 355 df = pd.read_csv(map_file) 356 357 for concept_set_name, grp in df.groupby("CONCEPT_SET"): 358 # create Concept_Set 359 if not concept_set_exist(cur, str(concept_set_name)): 360 cur.execute( 361 f"INSERT INTO CONCEPT_SET (concept_set_name, vocabulary_id) VALUES ('{concept_set_name}', '{omop_metadata['vocabulary_id']}');" 362 ) 363 else: 364 _logger.debug(f"Concept_set {concept_set_name} already exists") 365 # TODO: ask to remove old concept_set? 366 367 # get Concept_set_Id 368 query = "SELECT concept_set_id FROM CONCEPT_SET WHERE concept_set_name = ? AND vocabulary_id = ?;" 369 target_code_type = map_file.stem 370 cur.execute( 371 query, 372 ( 373 concept_set_name, 374 omop_metadata["vocabulary_id"], 375 ), 376 ) 377 # FAILS HERE WITH NONE REUR 378 _logger.debug(f"target code type {target_code_type}") 379 _logger.debug(f"omop code type {omop_vocab_types[target_code_type]}") 380 concept_set_id = cur.fetchone()[0] 381 _logger.debug(f"concept set id {concept_set_id}") 382 383 # get corresponing Concept_id (OMOP) for each Concept_code (e.g. SNOMED) 384 concept_codes = "'" + "', '".join(list(grp["CONCEPT"].astype(str))) + "'" 385 query = f"SELECT concept_id FROM CONCEPT WHERE vocabulary_id = ? AND concept_code IN ({concept_codes});" 386 cur.execute(query, (omop_vocab_types[target_code_type],)) 387 df_out = pd.DataFrame(cur.fetchall(), columns=["concept_id"]) 388 389 if not len(grp) == len(df_out): 390 _logger.error( 391 f"ERROR: Some {omop_vocab_types[target_code_type]} Codes do not exist in OMOP Database" 392 ) 393 394 # Create Concept_set_item 395 df_out["concept_set_id"] = concept_set_id 396 df_out.to_sql("CONCEPT_SET_ITEM", conn, if_exists="append", index=False) 397 398 # Output all tables to CSV 399 # Get the list of all tables 400 cur.execute("SELECT name FROM sqlite_master WHERE type='table';") 401 tables = cur.fetchall() # List of tables 402 403 # Export each table to a separate CSV file 404 for table in tables: 405 table_name = table[0] 406 # ignore SQLite's internal system table 407 if table_name != "sqlite_sequence": 408 df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn) 409 output_file = f"{table_name}.csv" 410 output_path = export_path / output_file 411 df.to_csv(output_path, index=False) # Save as CSV 412 _logger.info(f"Exported {table_name} to {table_name}.csv") 413 414 conn.close() 415 416 _logger.debug(f"Created export db successfully") 417 418 return export_db_path
Default OMOP vocabulary path
Supported OMOP CDM version number
Default OMOP sqllite database filename
Default OMOP sqllite database path
Default OMOP version file
Default OMOP version path
Default OMOP export database filename
Required OMOP vocabularies definition
Type mappings from acmc medical coding types to OMOP vocabulary types
79def install(omop_zip_file: str, version: str): 80 """ "Installs the OMOP release csv files in a file-based sql database 81 82 Args: 83 omop_zip_file (str): vocabularies zip file distributed by OHDSI Athena 84 version (str): version of the vocabularies distributed by OHDSI Athena 85 86 Raises: 87 ValueError: if the zip file does not exist 88 ValueError: if the file is not a zip file 89 Exception: if error reading omop csv files 90 """ 91 _logger.info(f"Installing OMOP from zip file: {omop_zip_file}") 92 omop_zip_path = Path(omop_zip_file) 93 94 # Check if the file exists and is a ZIP file 95 if not omop_zip_path.exists(): 96 msg = f"{omop_zip_path} does not exist." 97 _logger.error(msg) 98 raise ValueError(msg) 99 100 if not zipfile.is_zipfile(omop_zip_path): 101 msg = f"Error: {omop_zip_path} is not a valid ZIP file." 102 _logger.error(msg) 103 raise ValueError(msg) 104 105 # check codes directory exists and if not create it 106 if not VOCAB_PATH.exists(): 107 VOCAB_PATH.mkdir(parents=True) 108 _logger.debug(f"OMOP directory '{VOCAB_PATH}' created.") 109 else: 110 # removing existing OMOP files 111 csv_files = list(VOCAB_PATH.glob("*.csv")) 112 for file in csv_files: 113 file.unlink() 114 _logger.debug(f"Deleted OMOP csv file: {file}") 115 116 # Extract ZIP contents 117 with zipfile.ZipFile(omop_zip_path, "r") as zip_ref: 118 zip_ref.extractall(VOCAB_PATH) 119 _logger.info(f"Extracted OMOP zip file {omop_zip_path} to {VOCAB_PATH}/") 120 121 # connect to database, if it does not exist it will be created 122 conn = sqlite3.connect(DB_PATH) 123 124 # Iterate through files in the folder 125 csv_files = list(VOCAB_PATH.glob("*.csv")) 126 total_tables_count = len(csv_files) 127 table_count = 1 128 for filename in csv_files: 129 try: 130 _logger.info( 131 f"Processing {table_count} of {total_tables_count} tables: {filename}" 132 ) 133 # read the CSV file with the specified delimiter 134 df = pd.read_csv(filename, delimiter="\t", low_memory=False) 135 136 # export Table to sqlite db 137 df.to_sql(filename.stem, conn, if_exists="replace", index=False) 138 139 # add to the metadata 140 list(vocabularies["tables"]).append(filename.stem) 141 table_count = table_count + 1 142 except Exception as e: 143 raise Exception(f"Error reading file {filename}: {e}") 144 145 conn.close() 146 147 # write version file 148 write_version_file(version) 149 150 _logger.info(f"OMOP installation completed")
"Installs the OMOP release csv files in a file-based sql database
Arguments:
- omop_zip_file (str): vocabularies zip file distributed by OHDSI Athena
- version (str): version of the vocabularies distributed by OHDSI Athena
Raises:
- ValueError: if the zip file does not exist
- ValueError: if the file is not a zip file
- Exception: if error reading omop csv files
153def write_version_file(version: str): 154 """Writes the OMOP vocaburaries and version to a file 155 156 Args: 157 version (str): version of the vocabularies distributed by OHDSI Athena 158 """ 159 vocabularies["version"] = version 160 with open(VERSION_PATH, "w") as file: 161 yaml.dump( 162 vocabularies, 163 file, 164 Dumper=util.QuotedDumper, 165 default_flow_style=False, 166 sort_keys=False, 167 default_style='"', 168 )
Writes the OMOP vocaburaries and version to a file
Arguments:
- version (str): version of the vocabularies distributed by OHDSI Athena
171def clear(db_path: Path): 172 """Clears the OMOP sql database 173 174 Args: 175 db_path (Path): the path to the omop sqllite database 176 177 Raises: 178 FileNotFoundError: if the omop sqllite database does not exist 179 """ 180 _logger.info(f"Clearing OMOP data from database") 181 if not db_path.is_file(): 182 raise FileNotFoundError(f"Error: OMOP DB file '{db_path}' does not exist.") 183 conn = sqlite3.connect(db_path) 184 cur = conn.cursor() 185 cur.execute("SELECT name FROM sqlite_master WHERE type='table';") 186 187 # Fetch and print table names 188 tables = cur.fetchall() 189 _logger.debug("Tables in database:", [table[0] for table in tables]) 190 191 # cur.execute("DROP TABLE CONCEPT_SET;") 192 # cur.execute("DROP TABLE CONCEPT_SET_ITEM;") 193 194 conn.close() 195 _logger.info(f"OMOP database cleared")
Clears the OMOP sql database
Arguments:
- db_path (Path): the path to the omop sqllite database
Raises:
- FileNotFoundError: if the omop sqllite database does not exist
198def delete(db_path: Path): 199 """Deletes the OMOP sql database 200 201 Args: 202 db_path (Path): the path to the omop sqllite database 203 204 Raises: 205 FileNotFoundError: if the omop sqllite database does not exist 206 """ 207 208 _logger.info(f"Deleting OMOP database") 209 if not db_path.is_file(): 210 raise FileNotFoundError(f"Error: OMOP DB file '{db_path}' does not exist.") 211 212 db_path.unlink() 213 _logger.info(f"OMOP database deleted")
Deletes the OMOP sql database
Arguments:
- db_path (Path): the path to the omop sqllite database
Raises:
- FileNotFoundError: if the omop sqllite database does not exist
216def table_exists(cursor: sqlite3.Cursor, table_name: str) -> bool: 217 """Query to check if the table exists 218 219 Args: 220 cursor (sqlite3.Cursor): a sqllite database cursor 221 table_name (str): the table name to check 222 223 Returns: 224 bool: true if table exists 225 """ 226 227 cursor.execute( 228 """ 229 SELECT name 230 FROM sqlite_master 231 WHERE type='table' AND name=? 232 """, 233 (table_name,), 234 ) 235 236 # Fetch the result 237 result = cursor.fetchone() 238 239 return result is not None
Query to check if the table exists
Arguments:
- cursor (sqlite3.Cursor): a sqllite database cursor
- table_name (str): the table name to check
Returns:
bool: true if table exists
242def vocab_exists(cursor: sqlite3.Cursor, vocab_id: str) -> bool: 243 """Query to check if the vocabulary exists 244 245 Args: 246 cursor (sqlite3.Cursor): a sqllite database cursor 247 vocab_id (str): the vocabulary id to check 248 249 Returns: 250 bool: true if vocabulary id exists 251 """ 252 253 cursor.execute( 254 """ 255 SELECT vocabulary_id 256 FROM VOCABULARY 257 WHERE vocabulary_id=? 258 """, 259 (vocab_id,), 260 ) 261 262 # Fetch the result 263 result = cursor.fetchone() 264 265 return result is not None
Query to check if the vocabulary exists
Arguments:
- cursor (sqlite3.Cursor): a sqllite database cursor
- vocab_id (str): the vocabulary id to check
Returns:
bool: true if vocabulary id exists
268def concept_set_exist(cursor: sqlite3.Cursor, concept_set_name: str) -> bool: 269 """Query to check if the concept set exists 270 271 Args: 272 cursor (sqlite3.Cursor): a sqllite database cursor 273 concept_set_name (str): the concept set name to check 274 275 Returns: 276 bool: true if concept set exists 277 """ 278 279 query = f"SELECT EXISTS (SELECT 1 FROM CONCEPT_SET WHERE concept_set_name = ?)" 280 cursor.execute(query, (concept_set_name,)) 281 282 # 1 if exists, 0 otherwise 283 return cursor.fetchone()[0] == 1
Query to check if the concept set exists
Arguments:
- cursor (sqlite3.Cursor): a sqllite database cursor
- concept_set_name (str): the concept set name to check
Returns:
bool: true if concept set exists
286def export( 287 map_path: Path, export_path: Path, version: str, omop_metadata: dict 288) -> Path: 289 """Export concept sets to omop database in csv format 290 291 Args: 292 map_path (Path): path to the acmc map directory containing concept sets in csv format 293 export_path (Path): path to the directory where the omop database csv files are to be written 294 version (str): phenotype version for omop vocabulary version 295 omop_metadata (dict): phenotype omop metadata for omop vocabulary metadata 296 297 Returns: 298 Path: path to the exported sqllite database 299 """ 300 301 _logger.debug(f"exporting with metadata {omop_metadata} at version {version}") 302 303 # copy the baseline omop database 304 export_db_path = export_path / EXPORT_FILE 305 shutil.copy(DB_PATH, export_db_path) 306 307 # connect to db 308 conn = sqlite3.connect(export_db_path) 309 cur = conn.cursor() 310 311 # Create VOCABULARY 312 df_test = pd.DataFrame( 313 [ 314 { 315 "vocabulary_id": omop_metadata["vocabulary_id"], 316 "vocabulary_name": omop_metadata["vocabulary_name"], 317 "vocabulary_reference": omop_metadata["vocabulary_reference"], 318 "vocabulary_version": version, 319 # "vocabulary_concept_id": 0, 320 } 321 ] 322 ) 323 df_test.to_sql("VOCABULARY", conn, if_exists="append", index=False) 324 325 # Create CONCEPT_SET 326 cur.execute( 327 """ 328 CREATE TABLE CONCEPT_SET ( 329 concept_set_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each concept set 330 atlas_id INTEGER, -- Unique identifier generated by ATLAS 331 concept_set_name TEXT, -- Optional name for the concept set 332 concept_set_description TEXT, -- Optional description for the concept set 333 vocabulary_id TEXT NOT NULL, -- Foreign key to VOCABULARY table 334 FOREIGN KEY (vocabulary_id) REFERENCES VOCABULARY(vocabulary_id) 335 );""" 336 ) 337 338 # Create CONCEPT_SET_ITEM 339 cur.execute( 340 """ 341 CREATE TABLE CONCEPT_SET_ITEM ( 342 concept_set_item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each mapping 343 concept_set_id INTEGER NOT NULL, -- Foreign key to CONCEPT_SET table 344 concept_id INTEGER NOT NULL, -- Foreign key to CONCEPT table 345 FOREIGN KEY (concept_set_id) REFERENCES CONCEPT_SET(concept_set_id), 346 FOREIGN KEY (concept_id) REFERENCES CONCEPT(concept_id) 347 );""" 348 ) 349 350 # read map files 351 map_files = list(map_path.glob("*.csv")) 352 total = len(map_files) 353 _logger.info(f"Exporting {total} map files") 354 for index, map_file in enumerate(map_files): 355 _logger.info(f"Processing {index+1} of {total}: {map_file}") 356 df = pd.read_csv(map_file) 357 358 for concept_set_name, grp in df.groupby("CONCEPT_SET"): 359 # create Concept_Set 360 if not concept_set_exist(cur, str(concept_set_name)): 361 cur.execute( 362 f"INSERT INTO CONCEPT_SET (concept_set_name, vocabulary_id) VALUES ('{concept_set_name}', '{omop_metadata['vocabulary_id']}');" 363 ) 364 else: 365 _logger.debug(f"Concept_set {concept_set_name} already exists") 366 # TODO: ask to remove old concept_set? 367 368 # get Concept_set_Id 369 query = "SELECT concept_set_id FROM CONCEPT_SET WHERE concept_set_name = ? AND vocabulary_id = ?;" 370 target_code_type = map_file.stem 371 cur.execute( 372 query, 373 ( 374 concept_set_name, 375 omop_metadata["vocabulary_id"], 376 ), 377 ) 378 # FAILS HERE WITH NONE REUR 379 _logger.debug(f"target code type {target_code_type}") 380 _logger.debug(f"omop code type {omop_vocab_types[target_code_type]}") 381 concept_set_id = cur.fetchone()[0] 382 _logger.debug(f"concept set id {concept_set_id}") 383 384 # get corresponing Concept_id (OMOP) for each Concept_code (e.g. SNOMED) 385 concept_codes = "'" + "', '".join(list(grp["CONCEPT"].astype(str))) + "'" 386 query = f"SELECT concept_id FROM CONCEPT WHERE vocabulary_id = ? AND concept_code IN ({concept_codes});" 387 cur.execute(query, (omop_vocab_types[target_code_type],)) 388 df_out = pd.DataFrame(cur.fetchall(), columns=["concept_id"]) 389 390 if not len(grp) == len(df_out): 391 _logger.error( 392 f"ERROR: Some {omop_vocab_types[target_code_type]} Codes do not exist in OMOP Database" 393 ) 394 395 # Create Concept_set_item 396 df_out["concept_set_id"] = concept_set_id 397 df_out.to_sql("CONCEPT_SET_ITEM", conn, if_exists="append", index=False) 398 399 # Output all tables to CSV 400 # Get the list of all tables 401 cur.execute("SELECT name FROM sqlite_master WHERE type='table';") 402 tables = cur.fetchall() # List of tables 403 404 # Export each table to a separate CSV file 405 for table in tables: 406 table_name = table[0] 407 # ignore SQLite's internal system table 408 if table_name != "sqlite_sequence": 409 df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn) 410 output_file = f"{table_name}.csv" 411 output_path = export_path / output_file 412 df.to_csv(output_path, index=False) # Save as CSV 413 _logger.info(f"Exported {table_name} to {table_name}.csv") 414 415 conn.close() 416 417 _logger.debug(f"Created export db successfully") 418 419 return export_db_path
Export concept sets to omop database in csv format
Arguments:
- map_path (Path): path to the acmc map directory containing concept sets in csv format
- export_path (Path): path to the directory where the omop database csv files are to be written
- version (str): phenotype version for omop vocabulary version
- omop_metadata (dict): phenotype omop metadata for omop vocabulary metadata
Returns:
Path: path to the exported sqllite database