From 28aecd75c988f879a8c4f73c97719efcbe10f2eb Mon Sep 17 00:00:00 2001 From: Michael Boniface <m.j.boniface@soton.ac.uk> Date: Tue, 18 Feb 2025 16:25:01 +0000 Subject: [PATCH] added OMOP versioning --- README.md | 6 + acmc.py | 9 +- examples/{config.json => config1.json} | 0 omop.py | 210 ++++++++++++++----------- phen.py | 26 +-- tests/test_commands.py | 6 + trud.py | 5 +- 7 files changed, 158 insertions(+), 104 deletions(-) rename examples/{config.json => config1.json} (100%) diff --git a/README.md b/README.md index 0029a7a..e151958 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,12 @@ The tool supports verification and mapping across diagnostic coding formats belo ## Notes +OMOP + +Content of your package + +Vocabularies release version: v20240830 + Linux/macOS: ``` diff --git a/acmc.py b/acmc.py index 8d36f9d..f242887 100644 --- a/acmc.py +++ b/acmc.py @@ -14,7 +14,7 @@ def trud_install(args): def omop_install(args): """Handle the `omop install` command.""" - omop.install(omop.OMOP_DB_PATH, args.omop_folder) + omop.install(args.omop_dir, args.version) def omop_clear(args): """Handle the `omop clear` command.""" @@ -80,7 +80,8 @@ def main(): # omop install omop_install_parser = omop_subparsers.add_parser("install", help="Install OMOP codes within database") - omop_install_parser.add_argument("-f", "--omop-folder", required=True, help="Path to extracted OMOP downloads folder") + omop_install_parser.add_argument("-d", "--omop-dir", required=True, help="Directory path to extracted OMOP downloads") + omop_install_parser.add_argument("-v", "--version", required=True, help="OMOP vocabularies release version") omop_install_parser.set_defaults(func=omop_install) # omop clear @@ -112,7 +113,7 @@ def main(): phen_map_parser.add_argument("-t", "--target-coding", required=True, choices=['read2', 'read3', 'icd10', 'snomed', 'opcs4'], help="Specify the target coding (read2, read3, icd10, snomed, opcs4)") # phen map flags phen_map_parser.add_argument("-tr", "--translate", action="store_true", default=False, help="Translate code types") - phen_map_parser.add_argument("-v", "--verify", action="store_true", default=False, help="Verify codes") + phen_map_parser.add_argument("-ve", "--verify", action="store_true", default=False, help="Verify codes") phen_map_parser.set_defaults(func=phen_map) # phen publish @@ -124,7 +125,7 @@ def main(): phen_copy_parser = phen_subparsers.add_parser("copy", help="Publish phenotype configuration") phen_copy_parser.add_argument("-d", "--phen-dir", type=str, default=str(phen.DEFAULT_PHEN_PATH.resolve()), help="Phenotype directory") phen_copy_parser.add_argument("-td", "--target-dir", type=str, default=str(BUILD_PATH.resolve()), help="Target directory for the copy") - phen_copy_parser.add_argument("-ve", "--version", type=str, default=None, help="Version to copy") + phen_copy_parser.add_argument("-v", "--version", type=str, default=None, help="Phenotype version to copy") phen_copy_parser.set_defaults(func=phen_copy) # phen diff diff --git a/examples/config.json b/examples/config1.json similarity index 100% rename from examples/config.json rename to examples/config1.json diff --git a/omop.py b/omop.py index e7ec110..ca1319d 100644 --- a/omop.py +++ b/omop.py @@ -2,108 +2,142 @@ import os import argparse import sqlite3 import pandas as pd - +import json from pathlib import Path OMOP_DB_DIR = Path('./build/omop') OMOP_DB_PATH = OMOP_DB_DIR / 'omop_54.sqlite' +VERSION_FILE = 'omop_version.json' +VERSION_PATH = OMOP_DB_DIR / VERSION_FILE + +vocabularies = { + "source": "OHDSI Athena", + "url": "https://athena.ohdsi.org/vocabulary/list", + "version": "", + "vocabularies": [ + { "id": 1, "name": "SNOMED"}, + { "id": 2, "name": "ICD9CM"}, + { "id": 17, "name": "Readv2"}, + { "id": 21, "name": "ATC"}, + { "id": 55, "name": "OPCS4"}, + { "id": 57, "name": "HES Specialty"}, + { "id": 70, "name": "ICD10CM"}, + { "id": 75, "name": "dm+d"}, + { "id": 144, "name": "UK Biobank"}, + { "id": 154, "name": "NHS Ethnic Category"}, + { "id": 155, "name": "NHS Place of Service"} + ], + "model": [] +} #Populate SQLite3 Database with default OMOP CONCEPTS -def install (db_path, omop_install_folder): - print(f"Installing OMOP database from {omop_install_folder}") - - # check folder for omop install files is a directory - omop_install_path = Path(omop_install_folder) - if not omop_install_path.is_dir(): - raise NotADirectoryError(f"Error: '{omop_install_path}' for OMOP installation files is not a directory") - - # check codes directory exists and if not create it - if not OMOP_DB_DIR.exists(): - OMOP_DB_DIR.mkdir(parents=True) - print(f"Codes directory '{OMOP_DB_DIR}' created.") - - # connect to database, if it does not exist it will be created - conn = sqlite3.connect(OMOP_DB_PATH) - # Iterate through files in the folder - for filename in os.listdir(omop_install_folder): - if filename.endswith(".csv"): # Check if the file is a CSV - file_path = os.path.join(omop_install_folder, filename) - try: - print(f"Reading file: {file_path}") - # Read the CSV file with the specified delimiter - df = pd.read_csv(file_path, delimiter="\t", low_memory=False) - table_name = os.path.splitext(os.path.basename(file_path))[0] #Get name of file - - #Export Table to sqlite db - df.to_sql(table_name, conn, if_exists='replace', index=False) - - except Exception as e: - raise Exception(f"Error reading file {file_path}: {e}") - - conn.close() - print(f"OMOP installation completed") - +def install (omop_install_folder, version, db_path=OMOP_DB_PATH): + """Installs the OMOP release csv files in a file-based sql database""" + print(f"Installing OMOP database from {omop_install_folder}") + + # check folder for omop install files is a directory + omop_install_path = Path(omop_install_folder) + if not omop_install_path.is_dir(): + raise NotADirectoryError(f"Error: '{omop_install_path}' for OMOP installation files is not a directory") + + # check codes directory exists and if not create it + if not OMOP_DB_DIR.exists(): + OMOP_DB_DIR.mkdir(parents=True) + print(f"OMOP directory '{OMOP_DB_DIR}' created.") + + # connect to database, if it does not exist it will be created + conn = sqlite3.connect(OMOP_DB_PATH) + # Iterate through files in the folder + for filename in os.listdir(omop_install_folder): + if filename.endswith(".csv"): # Check if the file is a CSV + file_path = os.path.join(omop_install_folder, filename) + try: + print(f"Reading file: {file_path}") + # read the CSV file with the specified delimiter + df = pd.read_csv(file_path, delimiter="\t", low_memory=False) + table_name = os.path.splitext(os.path.basename(file_path))[0] #Get name of file + + # export Table to sqlite db + df.to_sql(table_name, conn, if_exists='replace', index=False) + + # add to the metadata + vocabularies["model"].append(filename) + except Exception as e: + raise Exception(f"Error reading file {file_path}: {e}") + conn.close() + + # write version file + write_version_file(version) + + print(f"OMOP installation completed") + +def write_version_file(version): + """Writes the OMOP vocaburaries and version to a file""" + vocabularies['version'] = version + with open(VERSION_PATH, "w", encoding="utf-8") as f: + json.dump(vocabularies, f, indent=4) + def clear(db_path): - print(f"Clearing OMOP data from database") - omop_db_path = Path(db_path) - if not omop_db_path.is_file(): - raise FileNotFoundError(f"Error: OMOP DB file '{omop_db_path}' does not exist.") - - conn = sqlite3.connect(db_path) - cur = conn.cursor() + """Clears the OMOP sql database""" + print(f"Clearing OMOP data from database") + omop_db_path = Path(db_path) + if not omop_db_path.is_file(): + raise FileNotFoundError(f"Error: OMOP DB file '{omop_db_path}' does not exist.") + conn = sqlite3.connect(db_path) + cur = conn.cursor() + cur.execute("SELECT name FROM sqlite_master WHERE type='table';") + + # Fetch and print table names + tables = cur.fetchall() + print("Tables in database:", [table[0] for table in tables]) - cur.execute("SELECT name FROM sqlite_master WHERE type='table';") + #cur.execute("DROP TABLE CONCEPT_SET;") + #cur.execute("DROP TABLE CONCEPT_SET_ITEM;") - # Fetch and print table names - tables = cur.fetchall() - print("Tables in database:", [table[0] for table in tables]) - - #cur.execute("DROP TABLE CONCEPT_SET;") - #cur.execute("DROP TABLE CONCEPT_SET_ITEM;") - - conn.close() - print(f"OMOP database cleared") + conn.close() + print(f"OMOP database cleared") def delete(db_path): - print(f"Deleting OMOP database") - omop_db_path = Path(db_path) - if not omop_db_path.is_file(): - raise FileNotFoundError(f"Error: OMOP DB file '{omop_db_path}' does not exist.") - - omop_db_path.unlink() - print(f"OMOP database deleted") - + """Deletes the OMOP sql database""" + print(f"Deleting OMOP database") + omop_db_path = Path(db_path) + if not omop_db_path.is_file(): + raise FileNotFoundError(f"Error: OMOP DB file '{omop_db_path}' does not exist.") + + omop_db_path.unlink() + print(f"OMOP database deleted") + def table_exists(cursor, table_name): - # Query to check if the table exists - cursor.execute( - """ - SELECT name - FROM sqlite_master - WHERE type='table' AND name=? - """, - (table_name,) - ) - - # Fetch the result - result = cursor.fetchone() - - return result is not None + # Query to check if the table exists + cursor.execute( + """ + SELECT name + FROM sqlite_master + WHERE type='table' AND name=? + """, + (table_name,) + ) + + # Fetch the result + result = cursor.fetchone() + + return result is not None def vocab_exists(cursor, vocab_id): - # Query to check if the table exists - cursor.execute( - """ - SELECT vocabulary_id - FROM VOCABULARY - WHERE vocabulary_id=? - """, - (vocab_id,) - ) - - # Fetch the result - result = cursor.fetchone() - - return result is not None + # Query to check if the table exists + cursor.execute( + """ + SELECT vocabulary_id + FROM VOCABULARY + WHERE vocabulary_id=? + """, + (vocab_id,) + ) + + # Fetch the result + result = cursor.fetchone() + + return result is not None def setup(db_path, vocab_id, vocab_version, vocab_name, vocab_reference): #Setup SQLite3 Database for OMOP diff --git a/phen.py b/phen.py index 5b37619..ddbd0bb 100644 --- a/phen.py +++ b/phen.py @@ -14,9 +14,10 @@ from urllib.parse import urlparse, urlunparse # acmc dependencies import trud +import omop from base import log_invalid_code, bcolors, raise_ from parse import Read2, Read3, Icd10, Snomed, Opcs4, Atc, code_types, vocab_types -from omop import OMOP_DB_PATH, publish_concept_sets, setup +from omop import publish_concept_sets, setup pd.set_option("mode.chained_assignment", None) @@ -530,14 +531,6 @@ def map(phen_dir, target_code_type, translate=True, verify=True): out.to_csv(map_path, index=False) print(f"Saved mapped concepts to {str(map_path.resolve())}") - # save error File - error_path = phen_path / ERROR_FILE - if error_path.exists(): - error_df = pd.read_csv(error_path) - error_df = error_df.drop_duplicates() # Remove Duplicates from Error file - error_df = error_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"]) - error_df.to_csv(error_path, index=False) - # save concept sets as separate files concept_set_path = phen_path / CONCEPT_SET_DIR / target_code_type @@ -558,6 +551,18 @@ def map(phen_dir, target_code_type, translate=True, verify=True): filename = f"{name}.csv" concept_path = concept_set_path / filename concept.to_csv(concept_path, index=False ) + + # copy version files used for mapping to repo + shutil.copy(trud.VERSION_PATH, phen_path / trud.VERSION_FILE) + shutil.copy(omop.VERSION_PATH, phen_path / omop.VERSION_FILE) + + # write erros to a file + error_path = phen_path / ERROR_FILE + if error_path.exists(): + error_df = pd.read_csv(error_path) + error_df = error_df.drop_duplicates() # Remove Duplicates from Error file + error_df = error_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"]) + error_df.to_csv(error_path, index=False) print(f"Saved concept_sets to {str(concept_set_path.resolve())}") @@ -662,7 +667,8 @@ def copy(phen_dir, target_dir, version=None): print(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}") def diff(phen_dir, phen_old_dir): - + """Compare the differences between two versions of a phenotype""" + # validate phenotype directories validate(phen_old_dir) validate(phen_dir) diff --git a/tests/test_commands.py b/tests/test_commands.py index 8d559c2..375241f 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -67,3 +67,9 @@ def test_phen_diff(): args = argparse.Namespace(phen_dir="/path/to/phen", phen_dir_old="/path/to/old_phen") phen_diff(args) mock_diff.assert_called_once_with("/path/to/phen", "/path/to/old_phen") + +def test_phen_local(): + with patch("phen.diff") as mock_diff: + args = argparse.Namespace(phen_dir="/path/to/phen", phen_dir_old="/path/to/old_phen") + phen_diff(args) + mock_diff.assert_called_once_with("/path/to/phen", "/path/to/old_phen") diff --git a/trud.py b/trud.py index 952bd1d..78b01ba 100644 --- a/trud.py +++ b/trud.py @@ -16,7 +16,8 @@ import simpledbf # Constants FQDN = "isd.digital.nhs.uk" TRUD_PATH = Path('./build/trud') -TRUD_VERSION_PATH = TRUD_PATH / 'trud_version.json' +VERSION_FILE = 'trud_version.json' +VERSION_PATH = TRUD_PATH / VERSION_FILE TRUD_DOWNLOADS_DIR = TRUD_PATH / 'downloads' TRUD_PROCESSED_DIR = TRUD_PATH / 'processed' @@ -309,7 +310,7 @@ def install(): ] # save TRUD versions to file to main record of what was downloaded - with open(TRUD_VERSION_PATH, "w", encoding="utf-8") as f: + with open(VERSION_PATH, "w", encoding="utf-8") as f: # remove function from items data = [{k: v for k, v in d.items() if k != "extract"} for d in items] json.dump(data, f, indent=4) -- GitLab