acmc.omop

omop.py module

This module provides functionality to manage OMOP vocabularies.

  1"""
  2omop.py module
  3
  4This module provides functionality to manage OMOP vocabularies.
  5"""
  6
  7import os
  8import argparse
  9import sqlite3
 10import pandas as pd
 11import logging
 12import zipfile
 13import shutil
 14import json
 15import yaml
 16from pathlib import Path
 17from acmc import util, logging_config
 18
 19# setup logging
 20_logger = logging_config.setup_logger()
 21
 22VOCAB_PATH = Path("./vocab/omop")
 23"""Default OMOP vocabulary path"""
 24
 25OMOP_CDM_Version = "54"
 26"""Supported OMOP CDM version number"""
 27
 28OMOP_DB_FILENAME = f"omop_{OMOP_CDM_Version}.sqlite"
 29"""Default OMOP sqllite database filename"""
 30
 31DB_PATH = VOCAB_PATH / OMOP_DB_FILENAME
 32"""Default OMOP sqllite database path"""
 33
 34VERSION_FILE = "omop_version.yml"
 35"""Default OMOP version file"""
 36
 37VERSION_PATH = VOCAB_PATH / VERSION_FILE
 38"""Default OMOP version path"""
 39
 40EXPORT_FILE = f"omop_{OMOP_CDM_Version}_export.sqlite"
 41"""Default OMOP export database filename"""
 42
 43vocabularies = {
 44    "source": "OHDSI Athena",
 45    "url": "https://athena.ohdsi.org/vocabulary/list",
 46    "cdm_version": OMOP_CDM_Version,
 47    "version": "",
 48    "vocabularies": [
 49        {"id": 1, "name": "SNOMED"},  # No license required
 50        {"id": 2, "name": "ICD9CM"},  # No license required
 51        {"id": 17, "name": "Readv2"},  # No license required
 52        {"id": 21, "name": "ATC"},  # No license required
 53        {"id": 55, "name": "OPCS4"},  # No license required
 54        {"id": 57, "name": "HES Specialty"},  # No license required
 55        {"id": 70, "name": "ICD10CM"},  # No license required
 56        {"id": 75, "name": "dm+d"},  # No license required
 57        {"id": 144, "name": "UK Biobank"},  # No license required
 58        {"id": 154, "name": "NHS Ethnic Category"},  # No license required
 59        {"id": 155, "name": "NHS Place of Service"},  # No license required
 60    ],
 61    "tables": [],
 62}
 63"""Required OMOP vocabularies definition"""
 64
 65omop_vocab_types = {
 66    "read2": "Read",
 67    "read3": None,
 68    "icd10": "ICD10CM",
 69    "snomed": "SNOMED",
 70    "opcs4": "OPCS4",
 71    "atc": "ATC",
 72    "med": None,
 73    "cprd": None,
 74}
 75"""Type mappings from acmc medical coding types to OMOP vocabulary types"""
 76
 77
 78def install(omop_zip_file: str, version: str):
 79    """ "Installs the OMOP release csv files in a file-based sql database
 80
 81    Args:
 82        omop_zip_file (str): vocabularies zip file distributed by OHDSI Athena
 83        version (str): version of the vocabularies distributed by OHDSI Athena
 84
 85    Raises:
 86        ValueError: if the zip file does not exist
 87        ValueError: if the file is not a zip file
 88        Exception: if error reading omop csv files
 89    """
 90    _logger.info(f"Installing OMOP from zip file: {omop_zip_file}")
 91    omop_zip_path = Path(omop_zip_file)
 92
 93    # Check if the file exists and is a ZIP file
 94    if not omop_zip_path.exists():
 95        msg = f"{omop_zip_path} does not exist."
 96        _logger.error(msg)
 97        raise ValueError(msg)
 98
 99    if not zipfile.is_zipfile(omop_zip_path):
100        msg = f"Error: {omop_zip_path} is not a valid ZIP file."
101        _logger.error(msg)
102        raise ValueError(msg)
103
104    # check codes directory exists and if not create it
105    if not VOCAB_PATH.exists():
106        VOCAB_PATH.mkdir(parents=True)
107        _logger.debug(f"OMOP directory '{VOCAB_PATH}' created.")
108    else:
109        # removing existing OMOP files
110        csv_files = list(VOCAB_PATH.glob("*.csv"))
111        for file in csv_files:
112            file.unlink()
113            _logger.debug(f"Deleted OMOP csv file: {file}")
114
115    # Extract ZIP contents
116    with zipfile.ZipFile(omop_zip_path, "r") as zip_ref:
117        zip_ref.extractall(VOCAB_PATH)
118        _logger.info(f"Extracted OMOP zip file {omop_zip_path} to {VOCAB_PATH}/")
119
120    # connect to database, if it does not exist it will be created
121    conn = sqlite3.connect(DB_PATH)
122
123    # Iterate through files in the folder
124    csv_files = list(VOCAB_PATH.glob("*.csv"))
125    total_tables_count = len(csv_files)
126    table_count = 1
127    for filename in csv_files:
128        try:
129            _logger.info(
130                f"Processing {table_count} of {total_tables_count} tables: {filename}"
131            )
132            # read the CSV file with the specified delimiter
133            df = pd.read_csv(filename, delimiter="\t", low_memory=False)
134
135            # export Table to sqlite db
136            df.to_sql(filename.stem, conn, if_exists="replace", index=False)
137
138            # add to the metadata
139            list(vocabularies["tables"]).append(filename.stem)
140            table_count = table_count + 1
141        except Exception as e:
142            raise Exception(f"Error reading file {filename}: {e}")
143
144    conn.close()
145
146    # write version file
147    write_version_file(version)
148
149    _logger.info(f"OMOP installation completed")
150
151
152def write_version_file(version: str):
153    """Writes the OMOP vocaburaries and version to a file
154
155    Args:
156        version (str): version of the vocabularies distributed by OHDSI Athena
157    """
158    vocabularies["version"] = version
159    with open(VERSION_PATH, "w") as file:
160        yaml.dump(
161            vocabularies,
162            file,
163            Dumper=util.QuotedDumper,
164            default_flow_style=False,
165            sort_keys=False,
166            default_style='"',
167        )
168
169
170def clear(db_path: Path):
171    """Clears the OMOP sql database
172
173    Args:
174        db_path (Path): the path to the omop sqllite database
175
176    Raises:
177        FileNotFoundError: if the omop sqllite database does not exist
178    """
179    _logger.info(f"Clearing OMOP data from database")
180    if not db_path.is_file():
181        raise FileNotFoundError(f"Error: OMOP DB file '{db_path}' does not exist.")
182    conn = sqlite3.connect(db_path)
183    cur = conn.cursor()
184    cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
185
186    # Fetch and print table names
187    tables = cur.fetchall()
188    _logger.debug("Tables in database:", [table[0] for table in tables])
189
190    # cur.execute("DROP TABLE CONCEPT_SET;")
191    # cur.execute("DROP TABLE CONCEPT_SET_ITEM;")
192
193    conn.close()
194    _logger.info(f"OMOP database cleared")
195
196
197def delete(db_path: Path):
198    """Deletes the OMOP sql database
199
200    Args:
201        db_path (Path): the path to the omop sqllite database
202
203    Raises:
204        FileNotFoundError: if the omop sqllite database does not exist
205    """
206
207    _logger.info(f"Deleting OMOP database")
208    if not db_path.is_file():
209        raise FileNotFoundError(f"Error: OMOP DB file '{db_path}' does not exist.")
210
211    db_path.unlink()
212    _logger.info(f"OMOP database deleted")
213
214
215def table_exists(cursor: sqlite3.Cursor, table_name: str) -> bool:
216    """Query to check if the table exists
217
218    Args:
219        cursor (sqlite3.Cursor): a sqllite database cursor
220        table_name (str): the table name to check
221
222    Returns:
223        bool: true if table exists
224    """
225
226    cursor.execute(
227        """
228		SELECT name
229		FROM sqlite_master
230		WHERE type='table' AND name=?
231		""",
232        (table_name,),
233    )
234
235    # Fetch the result
236    result = cursor.fetchone()
237
238    return result is not None
239
240
241def vocab_exists(cursor: sqlite3.Cursor, vocab_id: str) -> bool:
242    """Query to check if the vocabulary exists
243
244    Args:
245        cursor (sqlite3.Cursor): a sqllite database cursor
246        vocab_id (str): the vocabulary id to check
247
248    Returns:
249        bool: true if vocabulary id exists
250    """
251
252    cursor.execute(
253        """
254		SELECT vocabulary_id 
255		FROM VOCABULARY
256		WHERE vocabulary_id=?
257		""",
258        (vocab_id,),
259    )
260
261    # Fetch the result
262    result = cursor.fetchone()
263
264    return result is not None
265
266
267def concept_set_exist(cursor: sqlite3.Cursor, concept_set_name: str) -> bool:
268    """Query to check if the concept set exists
269
270    Args:
271        cursor (sqlite3.Cursor): a sqllite database cursor
272        concept_set_name (str): the concept set name to check
273
274    Returns:
275        bool: true if concept set exists
276    """
277
278    query = f"SELECT EXISTS (SELECT 1 FROM CONCEPT_SET WHERE concept_set_name = ?)"
279    cursor.execute(query, (concept_set_name,))
280
281    # 1 if exists, 0 otherwise
282    return cursor.fetchone()[0] == 1
283
284
285def export(
286    map_path: Path, export_path: Path, version: str, omop_metadata: dict
287) -> Path:
288    """Export concept sets to omop database in csv format
289
290    Args:
291        map_path (Path): path to the acmc map directory containing concept sets in csv format
292        export_path (Path): path to the directory where the omop database csv files are to be written
293        version (str): phenotype version for omop vocabulary version
294        omop_metadata (dict): phenotype omop metadata for omop vocabulary metadata
295
296    Returns:
297        Path: path to the exported sqllite database
298    """
299
300    _logger.debug(f"exporting with metadata {omop_metadata} at version {version}")
301
302    # copy the baseline omop database
303    export_db_path = export_path / EXPORT_FILE
304    shutil.copy(DB_PATH, export_db_path)
305
306    # connect to db
307    conn = sqlite3.connect(export_db_path)
308    cur = conn.cursor()
309
310    # Create VOCABULARY
311    df_test = pd.DataFrame(
312        [
313            {
314                "vocabulary_id": omop_metadata["vocabulary_id"],
315                "vocabulary_name": omop_metadata["vocabulary_name"],
316                "vocabulary_reference": omop_metadata["vocabulary_reference"],
317                "vocabulary_version": version,
318                # "vocabulary_concept_id": 0,
319            }
320        ]
321    )
322    df_test.to_sql("VOCABULARY", conn, if_exists="append", index=False)
323
324    # Create CONCEPT_SET
325    cur.execute(
326        """
327	CREATE TABLE CONCEPT_SET (
328		concept_set_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each concept set
329		atlas_id INTEGER,                                -- Unique identifier generated by ATLAS
330		concept_set_name TEXT,                           -- Optional name for the concept set
331		concept_set_description TEXT,                    -- Optional description for the concept set
332		vocabulary_id TEXT NOT NULL,                     -- Foreign key to VOCABULARY table
333		FOREIGN KEY (vocabulary_id) REFERENCES VOCABULARY(vocabulary_id)
334	);"""
335    )
336
337    # Create CONCEPT_SET_ITEM
338    cur.execute(
339        """
340	CREATE TABLE CONCEPT_SET_ITEM (
341		concept_set_item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each mapping
342		concept_set_id INTEGER NOT NULL,                      -- Foreign key to CONCEPT_SET table
343		concept_id INTEGER NOT NULL,                          -- Foreign key to CONCEPT table
344		FOREIGN KEY (concept_set_id) REFERENCES CONCEPT_SET(concept_set_id),
345		FOREIGN KEY (concept_id) REFERENCES CONCEPT(concept_id)
346	);"""
347    )
348
349    # read map files
350    map_files = list(map_path.glob("*.csv"))
351    total = len(map_files)
352    _logger.info(f"Exporting {total} map files")
353    for index, map_file in enumerate(map_files):
354        _logger.info(f"Processing {index+1} of {total}: {map_file}")
355        df = pd.read_csv(map_file)
356
357        for concept_set_name, grp in df.groupby("CONCEPT_SET"):
358            # create Concept_Set
359            if not concept_set_exist(cur, str(concept_set_name)):
360                cur.execute(
361                    f"INSERT INTO CONCEPT_SET (concept_set_name, vocabulary_id) VALUES ('{concept_set_name}', '{omop_metadata['vocabulary_id']}');"
362                )
363            else:
364                _logger.debug(f"Concept_set {concept_set_name} already exists")
365                # TODO: ask to remove old concept_set?
366
367            # get Concept_set_Id
368            query = "SELECT concept_set_id FROM CONCEPT_SET WHERE concept_set_name = ? AND vocabulary_id = ?;"
369            target_code_type = map_file.stem
370            cur.execute(
371                query,
372                (
373                    concept_set_name,
374                    omop_metadata["vocabulary_id"],
375                ),
376            )
377            # FAILS HERE WITH NONE REUR
378            _logger.debug(f"target code type {target_code_type}")
379            _logger.debug(f"omop code type {omop_vocab_types[target_code_type]}")
380            concept_set_id = cur.fetchone()[0]
381            _logger.debug(f"concept set id {concept_set_id}")
382
383            # get corresponing Concept_id (OMOP) for each Concept_code (e.g. SNOMED)
384            concept_codes = "'" + "', '".join(list(grp["CONCEPT"].astype(str))) + "'"
385            query = f"SELECT concept_id FROM CONCEPT WHERE vocabulary_id = ? AND concept_code IN ({concept_codes});"
386            cur.execute(query, (omop_vocab_types[target_code_type],))
387            df_out = pd.DataFrame(cur.fetchall(), columns=["concept_id"])
388
389            if not len(grp) == len(df_out):
390                _logger.error(
391                    f"ERROR: Some {omop_vocab_types[target_code_type]} Codes do not exist in OMOP Database"
392                )
393
394            # Create Concept_set_item
395            df_out["concept_set_id"] = concept_set_id
396            df_out.to_sql("CONCEPT_SET_ITEM", conn, if_exists="append", index=False)
397
398    # Output all tables to CSV
399    # Get the list of all tables
400    cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
401    tables = cur.fetchall()  # List of tables
402
403    # Export each table to a separate CSV file
404    for table in tables:
405        table_name = table[0]
406        # ignore SQLite's internal system table
407        if table_name != "sqlite_sequence":
408            df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn)
409            output_file = f"{table_name}.csv"
410            output_path = export_path / output_file
411            df.to_csv(output_path, index=False)  # Save as CSV
412            _logger.info(f"Exported {table_name} to {table_name}.csv")
413
414    conn.close()
415
416    _logger.debug(f"Created export db successfully")
417
418    return export_db_path
VOCAB_PATH = PosixPath('vocab/omop')

Default OMOP vocabulary path

OMOP_CDM_Version = '54'

Supported OMOP CDM version number

OMOP_DB_FILENAME = 'omop_54.sqlite'

Default OMOP sqllite database filename

DB_PATH = PosixPath('vocab/omop/omop_54.sqlite')

Default OMOP sqllite database path

VERSION_FILE = 'omop_version.yml'

Default OMOP version file

VERSION_PATH = PosixPath('vocab/omop/omop_version.yml')

Default OMOP version path

EXPORT_FILE = 'omop_54_export.sqlite'

Default OMOP export database filename

vocabularies = {'source': 'OHDSI Athena', 'url': 'https://athena.ohdsi.org/vocabulary/list', 'cdm_version': '54', 'version': '', 'vocabularies': [{'id': 1, 'name': 'SNOMED'}, {'id': 2, 'name': 'ICD9CM'}, {'id': 17, 'name': 'Readv2'}, {'id': 21, 'name': 'ATC'}, {'id': 55, 'name': 'OPCS4'}, {'id': 57, 'name': 'HES Specialty'}, {'id': 70, 'name': 'ICD10CM'}, {'id': 75, 'name': 'dm+d'}, {'id': 144, 'name': 'UK Biobank'}, {'id': 154, 'name': 'NHS Ethnic Category'}, {'id': 155, 'name': 'NHS Place of Service'}], 'tables': []}

Required OMOP vocabularies definition

omop_vocab_types = {'read2': 'Read', 'read3': None, 'icd10': 'ICD10CM', 'snomed': 'SNOMED', 'opcs4': 'OPCS4', 'atc': 'ATC', 'med': None, 'cprd': None}

Type mappings from acmc medical coding types to OMOP vocabulary types

def install(omop_zip_file: str, version: str):
 79def install(omop_zip_file: str, version: str):
 80    """ "Installs the OMOP release csv files in a file-based sql database
 81
 82    Args:
 83        omop_zip_file (str): vocabularies zip file distributed by OHDSI Athena
 84        version (str): version of the vocabularies distributed by OHDSI Athena
 85
 86    Raises:
 87        ValueError: if the zip file does not exist
 88        ValueError: if the file is not a zip file
 89        Exception: if error reading omop csv files
 90    """
 91    _logger.info(f"Installing OMOP from zip file: {omop_zip_file}")
 92    omop_zip_path = Path(omop_zip_file)
 93
 94    # Check if the file exists and is a ZIP file
 95    if not omop_zip_path.exists():
 96        msg = f"{omop_zip_path} does not exist."
 97        _logger.error(msg)
 98        raise ValueError(msg)
 99
100    if not zipfile.is_zipfile(omop_zip_path):
101        msg = f"Error: {omop_zip_path} is not a valid ZIP file."
102        _logger.error(msg)
103        raise ValueError(msg)
104
105    # check codes directory exists and if not create it
106    if not VOCAB_PATH.exists():
107        VOCAB_PATH.mkdir(parents=True)
108        _logger.debug(f"OMOP directory '{VOCAB_PATH}' created.")
109    else:
110        # removing existing OMOP files
111        csv_files = list(VOCAB_PATH.glob("*.csv"))
112        for file in csv_files:
113            file.unlink()
114            _logger.debug(f"Deleted OMOP csv file: {file}")
115
116    # Extract ZIP contents
117    with zipfile.ZipFile(omop_zip_path, "r") as zip_ref:
118        zip_ref.extractall(VOCAB_PATH)
119        _logger.info(f"Extracted OMOP zip file {omop_zip_path} to {VOCAB_PATH}/")
120
121    # connect to database, if it does not exist it will be created
122    conn = sqlite3.connect(DB_PATH)
123
124    # Iterate through files in the folder
125    csv_files = list(VOCAB_PATH.glob("*.csv"))
126    total_tables_count = len(csv_files)
127    table_count = 1
128    for filename in csv_files:
129        try:
130            _logger.info(
131                f"Processing {table_count} of {total_tables_count} tables: {filename}"
132            )
133            # read the CSV file with the specified delimiter
134            df = pd.read_csv(filename, delimiter="\t", low_memory=False)
135
136            # export Table to sqlite db
137            df.to_sql(filename.stem, conn, if_exists="replace", index=False)
138
139            # add to the metadata
140            list(vocabularies["tables"]).append(filename.stem)
141            table_count = table_count + 1
142        except Exception as e:
143            raise Exception(f"Error reading file {filename}: {e}")
144
145    conn.close()
146
147    # write version file
148    write_version_file(version)
149
150    _logger.info(f"OMOP installation completed")

"Installs the OMOP release csv files in a file-based sql database

Arguments:
  • omop_zip_file (str): vocabularies zip file distributed by OHDSI Athena
  • version (str): version of the vocabularies distributed by OHDSI Athena
Raises:
  • ValueError: if the zip file does not exist
  • ValueError: if the file is not a zip file
  • Exception: if error reading omop csv files
def write_version_file(version: str):
153def write_version_file(version: str):
154    """Writes the OMOP vocaburaries and version to a file
155
156    Args:
157        version (str): version of the vocabularies distributed by OHDSI Athena
158    """
159    vocabularies["version"] = version
160    with open(VERSION_PATH, "w") as file:
161        yaml.dump(
162            vocabularies,
163            file,
164            Dumper=util.QuotedDumper,
165            default_flow_style=False,
166            sort_keys=False,
167            default_style='"',
168        )

Writes the OMOP vocaburaries and version to a file

Arguments:
  • version (str): version of the vocabularies distributed by OHDSI Athena
def clear(db_path: pathlib.Path):
171def clear(db_path: Path):
172    """Clears the OMOP sql database
173
174    Args:
175        db_path (Path): the path to the omop sqllite database
176
177    Raises:
178        FileNotFoundError: if the omop sqllite database does not exist
179    """
180    _logger.info(f"Clearing OMOP data from database")
181    if not db_path.is_file():
182        raise FileNotFoundError(f"Error: OMOP DB file '{db_path}' does not exist.")
183    conn = sqlite3.connect(db_path)
184    cur = conn.cursor()
185    cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
186
187    # Fetch and print table names
188    tables = cur.fetchall()
189    _logger.debug("Tables in database:", [table[0] for table in tables])
190
191    # cur.execute("DROP TABLE CONCEPT_SET;")
192    # cur.execute("DROP TABLE CONCEPT_SET_ITEM;")
193
194    conn.close()
195    _logger.info(f"OMOP database cleared")

Clears the OMOP sql database

Arguments:
  • db_path (Path): the path to the omop sqllite database
Raises:
  • FileNotFoundError: if the omop sqllite database does not exist
def delete(db_path: pathlib.Path):
198def delete(db_path: Path):
199    """Deletes the OMOP sql database
200
201    Args:
202        db_path (Path): the path to the omop sqllite database
203
204    Raises:
205        FileNotFoundError: if the omop sqllite database does not exist
206    """
207
208    _logger.info(f"Deleting OMOP database")
209    if not db_path.is_file():
210        raise FileNotFoundError(f"Error: OMOP DB file '{db_path}' does not exist.")
211
212    db_path.unlink()
213    _logger.info(f"OMOP database deleted")

Deletes the OMOP sql database

Arguments:
  • db_path (Path): the path to the omop sqllite database
Raises:
  • FileNotFoundError: if the omop sqllite database does not exist
def table_exists(cursor: sqlite3.Cursor, table_name: str) -> bool:
216def table_exists(cursor: sqlite3.Cursor, table_name: str) -> bool:
217    """Query to check if the table exists
218
219    Args:
220        cursor (sqlite3.Cursor): a sqllite database cursor
221        table_name (str): the table name to check
222
223    Returns:
224        bool: true if table exists
225    """
226
227    cursor.execute(
228        """
229		SELECT name
230		FROM sqlite_master
231		WHERE type='table' AND name=?
232		""",
233        (table_name,),
234    )
235
236    # Fetch the result
237    result = cursor.fetchone()
238
239    return result is not None

Query to check if the table exists

Arguments:
  • cursor (sqlite3.Cursor): a sqllite database cursor
  • table_name (str): the table name to check
Returns:

bool: true if table exists

def vocab_exists(cursor: sqlite3.Cursor, vocab_id: str) -> bool:
242def vocab_exists(cursor: sqlite3.Cursor, vocab_id: str) -> bool:
243    """Query to check if the vocabulary exists
244
245    Args:
246        cursor (sqlite3.Cursor): a sqllite database cursor
247        vocab_id (str): the vocabulary id to check
248
249    Returns:
250        bool: true if vocabulary id exists
251    """
252
253    cursor.execute(
254        """
255		SELECT vocabulary_id 
256		FROM VOCABULARY
257		WHERE vocabulary_id=?
258		""",
259        (vocab_id,),
260    )
261
262    # Fetch the result
263    result = cursor.fetchone()
264
265    return result is not None

Query to check if the vocabulary exists

Arguments:
  • cursor (sqlite3.Cursor): a sqllite database cursor
  • vocab_id (str): the vocabulary id to check
Returns:

bool: true if vocabulary id exists

def concept_set_exist(cursor: sqlite3.Cursor, concept_set_name: str) -> bool:
268def concept_set_exist(cursor: sqlite3.Cursor, concept_set_name: str) -> bool:
269    """Query to check if the concept set exists
270
271    Args:
272        cursor (sqlite3.Cursor): a sqllite database cursor
273        concept_set_name (str): the concept set name to check
274
275    Returns:
276        bool: true if concept set exists
277    """
278
279    query = f"SELECT EXISTS (SELECT 1 FROM CONCEPT_SET WHERE concept_set_name = ?)"
280    cursor.execute(query, (concept_set_name,))
281
282    # 1 if exists, 0 otherwise
283    return cursor.fetchone()[0] == 1

Query to check if the concept set exists

Arguments:
  • cursor (sqlite3.Cursor): a sqllite database cursor
  • concept_set_name (str): the concept set name to check
Returns:

bool: true if concept set exists

def export( map_path: pathlib.Path, export_path: pathlib.Path, version: str, omop_metadata: dict) -> pathlib.Path:
286def export(
287    map_path: Path, export_path: Path, version: str, omop_metadata: dict
288) -> Path:
289    """Export concept sets to omop database in csv format
290
291    Args:
292        map_path (Path): path to the acmc map directory containing concept sets in csv format
293        export_path (Path): path to the directory where the omop database csv files are to be written
294        version (str): phenotype version for omop vocabulary version
295        omop_metadata (dict): phenotype omop metadata for omop vocabulary metadata
296
297    Returns:
298        Path: path to the exported sqllite database
299    """
300
301    _logger.debug(f"exporting with metadata {omop_metadata} at version {version}")
302
303    # copy the baseline omop database
304    export_db_path = export_path / EXPORT_FILE
305    shutil.copy(DB_PATH, export_db_path)
306
307    # connect to db
308    conn = sqlite3.connect(export_db_path)
309    cur = conn.cursor()
310
311    # Create VOCABULARY
312    df_test = pd.DataFrame(
313        [
314            {
315                "vocabulary_id": omop_metadata["vocabulary_id"],
316                "vocabulary_name": omop_metadata["vocabulary_name"],
317                "vocabulary_reference": omop_metadata["vocabulary_reference"],
318                "vocabulary_version": version,
319                # "vocabulary_concept_id": 0,
320            }
321        ]
322    )
323    df_test.to_sql("VOCABULARY", conn, if_exists="append", index=False)
324
325    # Create CONCEPT_SET
326    cur.execute(
327        """
328	CREATE TABLE CONCEPT_SET (
329		concept_set_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each concept set
330		atlas_id INTEGER,                                -- Unique identifier generated by ATLAS
331		concept_set_name TEXT,                           -- Optional name for the concept set
332		concept_set_description TEXT,                    -- Optional description for the concept set
333		vocabulary_id TEXT NOT NULL,                     -- Foreign key to VOCABULARY table
334		FOREIGN KEY (vocabulary_id) REFERENCES VOCABULARY(vocabulary_id)
335	);"""
336    )
337
338    # Create CONCEPT_SET_ITEM
339    cur.execute(
340        """
341	CREATE TABLE CONCEPT_SET_ITEM (
342		concept_set_item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each mapping
343		concept_set_id INTEGER NOT NULL,                      -- Foreign key to CONCEPT_SET table
344		concept_id INTEGER NOT NULL,                          -- Foreign key to CONCEPT table
345		FOREIGN KEY (concept_set_id) REFERENCES CONCEPT_SET(concept_set_id),
346		FOREIGN KEY (concept_id) REFERENCES CONCEPT(concept_id)
347	);"""
348    )
349
350    # read map files
351    map_files = list(map_path.glob("*.csv"))
352    total = len(map_files)
353    _logger.info(f"Exporting {total} map files")
354    for index, map_file in enumerate(map_files):
355        _logger.info(f"Processing {index+1} of {total}: {map_file}")
356        df = pd.read_csv(map_file)
357
358        for concept_set_name, grp in df.groupby("CONCEPT_SET"):
359            # create Concept_Set
360            if not concept_set_exist(cur, str(concept_set_name)):
361                cur.execute(
362                    f"INSERT INTO CONCEPT_SET (concept_set_name, vocabulary_id) VALUES ('{concept_set_name}', '{omop_metadata['vocabulary_id']}');"
363                )
364            else:
365                _logger.debug(f"Concept_set {concept_set_name} already exists")
366                # TODO: ask to remove old concept_set?
367
368            # get Concept_set_Id
369            query = "SELECT concept_set_id FROM CONCEPT_SET WHERE concept_set_name = ? AND vocabulary_id = ?;"
370            target_code_type = map_file.stem
371            cur.execute(
372                query,
373                (
374                    concept_set_name,
375                    omop_metadata["vocabulary_id"],
376                ),
377            )
378            # FAILS HERE WITH NONE REUR
379            _logger.debug(f"target code type {target_code_type}")
380            _logger.debug(f"omop code type {omop_vocab_types[target_code_type]}")
381            concept_set_id = cur.fetchone()[0]
382            _logger.debug(f"concept set id {concept_set_id}")
383
384            # get corresponing Concept_id (OMOP) for each Concept_code (e.g. SNOMED)
385            concept_codes = "'" + "', '".join(list(grp["CONCEPT"].astype(str))) + "'"
386            query = f"SELECT concept_id FROM CONCEPT WHERE vocabulary_id = ? AND concept_code IN ({concept_codes});"
387            cur.execute(query, (omop_vocab_types[target_code_type],))
388            df_out = pd.DataFrame(cur.fetchall(), columns=["concept_id"])
389
390            if not len(grp) == len(df_out):
391                _logger.error(
392                    f"ERROR: Some {omop_vocab_types[target_code_type]} Codes do not exist in OMOP Database"
393                )
394
395            # Create Concept_set_item
396            df_out["concept_set_id"] = concept_set_id
397            df_out.to_sql("CONCEPT_SET_ITEM", conn, if_exists="append", index=False)
398
399    # Output all tables to CSV
400    # Get the list of all tables
401    cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
402    tables = cur.fetchall()  # List of tables
403
404    # Export each table to a separate CSV file
405    for table in tables:
406        table_name = table[0]
407        # ignore SQLite's internal system table
408        if table_name != "sqlite_sequence":
409            df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn)
410            output_file = f"{table_name}.csv"
411            output_path = export_path / output_file
412            df.to_csv(output_path, index=False)  # Save as CSV
413            _logger.info(f"Exported {table_name} to {table_name}.csv")
414
415    conn.close()
416
417    _logger.debug(f"Created export db successfully")
418
419    return export_db_path

Export concept sets to omop database in csv format

Arguments:
  • map_path (Path): path to the acmc map directory containing concept sets in csv format
  • export_path (Path): path to the directory where the omop database csv files are to be written
  • version (str): phenotype version for omop vocabulary version
  • omop_metadata (dict): phenotype omop metadata for omop vocabulary metadata
Returns:

Path: path to the exported sqllite database