Skip to content
Snippets Groups Projects
Commit 4cbcdfe5 authored by mjbonifa's avatar mjbonifa
Browse files

implemented first part of export to create the omop database refactoring setup...

implemented first part of export to create the omop database refactoring setup and publish functions, need to test the database and export all tables
parent f10ee23a
No related branches found
No related tags found
No related merge requests found
...@@ -12,6 +12,9 @@ __pycache__ ...@@ -12,6 +12,9 @@ __pycache__
*.save* *.save*
~$* ~$*
# Build
.tox/
# ACMC phenotype build files # ACMC phenotype build files
vocab/* vocab/*
......
...@@ -49,7 +49,7 @@ Once installed, you'll be ready to use the `acmc` tool along with the associated ...@@ -49,7 +49,7 @@ Once installed, you'll be ready to use the `acmc` tool along with the associated
- [ICD10 Edition 5 XML](https://isd.digital.nhs.uk/trud/users/guest/filters/0/categories/28/items/259/releases) - [ICD10 Edition 5 XML](https://isd.digital.nhs.uk/trud/users/guest/filters/0/categories/28/items/259/releases)
- [OPCS-4.10 Data Files](https://isd.digital.nhs.uk/trud/users/guest/filters/0/categories/10/items/119/releases) - [OPCS-4.10 Data Files](https://isd.digital.nhs.uk/trud/users/guest/filters/0/categories/10/items/119/releases)
After subscribing, you'll receive an API key once your request is approved (usually within 24 hours). After subscribing, you'll receive an API key once your request is approved (usually within a few hours).
4. **Get TRUD API KEY** 4. **Get TRUD API KEY**
...@@ -98,7 +98,7 @@ Once installed, you'll be ready to use the `acmc` tool along with the associated ...@@ -98,7 +98,7 @@ Once installed, you'll be ready to use the `acmc` tool along with the associated
* 154) NHS Ethnic Category * 154) NHS Ethnic Category
* 155) NHS Place of Service * 155) NHS Place of Service
You will be notified by email with a vocabularies version number and link to download a zip file of OMOP database tables in CSV format. The subject will be `OHDSI Standardized Vocabularies. Your download link` from `pallas@ohdsi.org` You will be notified by email (usually within an hour) with a vocabularies version number and link to download a zip file of OMOP database tables in CSV format. The subject will be `OHDSI Standardized Vocabularies. Your download link` from `pallas@ohdsi.org`
``` ```
Content of your package Content of your package
...@@ -127,7 +127,7 @@ Please execute the following process: ...@@ -127,7 +127,7 @@ Please execute the following process:
Load the unpacked files into the tables. Load the unpacked files into the tables.
``` ```
Download the OMOP file onto your computer and note the path to the file Download the OMOP file onto your computer and note the path to the file
4. **Install OMOP vocabularies** 4. **Install OMOP vocabularies**
...@@ -155,7 +155,7 @@ Expected output: ...@@ -155,7 +155,7 @@ Expected output:
## **Example Usage** ## **Example Usage**
Follow these steps to initialize and manage a phenotype using `acmc`. In this example, we use a source concept code list for the Concept Set `Abdominal Pain` created from [ClinicalCodes.org](ClinicalCodes.org). The source concept codes are is read2. We genereate versioned phenotypes for read2 and then translate to snomed with a another version. Follow these steps to initialize and manage a phenotype using `acmc`. In this example, we use a source concept list for the Concept Set `Abdominal Pain` created from [ClinicalCodes.org](ClinicalCodes.org). The source concept codes are read2. We genereate versioned phenotypes for read2 and translate to snomed in normalised, standard formats.
1. **Initialize a phenotype in the workspace** 1. **Initialize a phenotype in the workspace**
...@@ -181,7 +181,7 @@ Expected Output: ...@@ -181,7 +181,7 @@ Expected Output:
cp -r ./examples/codes/* ./workspace/phen/codes cp -r ./examples/codes/* ./workspace/phen/codes
``` ```
- [Download `res176-abdominal-pain.csv`](.//examples/codes/clinical-codes-org/Symptom%20code%20lists/Abdominal%20pain/res176-abdominal-pain.csv) - You can view the source code list here [`res176-abdominal-pain.csv`](.//examples/codes/clinical-codes-org/Symptom%20code%20lists/Abdominal%20pain/res176-abdominal-pain.csv)
- Alternatively, place your code lists in `./workspace/phen/codes`. - Alternatively, place your code lists in `./workspace/phen/codes`.
3. **Copy the example phenotype configuration file to the phenotype directory** 3. **Copy the example phenotype configuration file to the phenotype directory**
...@@ -192,7 +192,7 @@ cp -r ./examples/codes/* ./workspace/phen/codes ...@@ -192,7 +192,7 @@ cp -r ./examples/codes/* ./workspace/phen/codes
cp -r ./examples/config.json ./workspace/phen cp -r ./examples/config.json ./workspace/phen
``` ```
- [Download `config.json`](./examples/config.json) - You can view the configuarion file here [`config.json`](./examples/config.json)
- Alternatively, place your own `config.json` file in `./workspace/phen`. - Alternatively, place your own `config.json` file in `./workspace/phen`.
4. **Validate the phenotype configuration** 4. **Validate the phenotype configuration**
...@@ -330,7 +330,7 @@ Expected Output: ...@@ -330,7 +330,7 @@ Expected Output:
## Support ## Support
For issues, open an [issue in the repository](https://git.soton.ac.uk/meldb/concepts-processing/-/issues) If you need help please open an [issue in the repository](https://git.soton.ac.uk/meldb/concepts-processing/-/issues)
## Contributing ## Contributing
......
...@@ -39,6 +39,11 @@ def phen_map(args): ...@@ -39,6 +39,11 @@ def phen_map(args):
phen.map(args.phen_dir, phen.map(args.phen_dir,
args.target_coding) args.target_coding)
def phen_export(args):
"""Handle the `phen copy` command."""
phen.export(args.phen_dir,
args.version)
def phen_publish(args): def phen_publish(args):
"""Handle the `phen publish` command.""" """Handle the `phen publish` command."""
phen.publish(args.phen_dir) phen.publish(args.phen_dir)
...@@ -49,12 +54,6 @@ def phen_copy(args): ...@@ -49,12 +54,6 @@ def phen_copy(args):
args.target_dir, args.target_dir,
args.version) args.version)
def phen_copy(args):
"""Handle the `phen copy` command."""
phen.copy(args.phen_dir,
args.target_dir,
args.version)
def phen_diff(args): def phen_diff(args):
"""Handle the `phen diff` command.""" """Handle the `phen diff` command."""
phen.diff(args.phen_dir, phen.diff(args.phen_dir,
...@@ -145,6 +144,20 @@ def main(): ...@@ -145,6 +144,20 @@ def main():
help="Specify output format(s): 'csv', 'omop', or both (default: csv)") help="Specify output format(s): 'csv', 'omop', or both (default: csv)")
phen_map_parser.set_defaults(func=phen_map) phen_map_parser.set_defaults(func=phen_map)
# phen export
phen_export_parser = phen_subparsers.add_parser("export", help="Export phen to OMOP database")
phen_export_parser.add_argument("-d",
"--phen-dir",
type=str,
default=str(phen.DEFAULT_PHEN_PATH.resolve()),
help="Phenotype workspace directory")
phen_export_parser.add_argument("-v",
"--version",
type=str,
default='latest',
help="Phenotype version to export, defaults to the latest version")
phen_export_parser.set_defaults(func=phen_export)
# phen publish # phen publish
phen_publish_parser = phen_subparsers.add_parser("publish", help="Publish phenotype configuration") phen_publish_parser = phen_subparsers.add_parser("publish", help="Publish phenotype configuration")
phen_publish_parser.add_argument("-d", phen_publish_parser.add_argument("-d",
......
...@@ -5,6 +5,7 @@ import pandas as pd ...@@ -5,6 +5,7 @@ import pandas as pd
import json import json
import logging import logging
import zipfile import zipfile
import shutil
from pathlib import Path from pathlib import Path
...@@ -18,6 +19,7 @@ VOCAB_PATH = Path('./vocab/omop') ...@@ -18,6 +19,7 @@ VOCAB_PATH = Path('./vocab/omop')
DB_PATH = VOCAB_PATH / 'omop_54.sqlite' DB_PATH = VOCAB_PATH / 'omop_54.sqlite'
VERSION_FILE = 'omop_version.json' VERSION_FILE = 'omop_version.json'
VERSION_PATH = VOCAB_PATH / VERSION_FILE VERSION_PATH = VOCAB_PATH / VERSION_FILE
EXPORT_FILE = 'omop_export.db'
vocabularies = { vocabularies = {
"source": "OHDSI Athena", "source": "OHDSI Athena",
...@@ -39,6 +41,17 @@ vocabularies = { ...@@ -39,6 +41,17 @@ vocabularies = {
"tables": [] "tables": []
} }
omop_vocab_types = {
"read2": "Read",
"read3": None,
"icd10": "ICD10CM",
"snomed": "SNOMED",
"opcs4": "OPCS4",
"atc": "ATC",
"med": None,
"cprd": None,
}
#Populate SQLite3 Database with default OMOP CONCEPTS #Populate SQLite3 Database with default OMOP CONCEPTS
def install (omop_zip_file, version): def install (omop_zip_file, version):
"""Installs the OMOP release csv files in a file-based sql database""" """Installs the OMOP release csv files in a file-based sql database"""
...@@ -167,80 +180,101 @@ def vocab_exists(cursor, vocab_id): ...@@ -167,80 +180,101 @@ def vocab_exists(cursor, vocab_id):
return result is not None return result is not None
def setup(db_path, vocab_id, vocab_version, vocab_name, vocab_reference): def concept_set_exist(cursor, concept_set_name):
#Setup SQLite3 Database for OMOP
conn = sqlite3.connect(db_path) query = f"SELECT EXISTS (SELECT 1 FROM CONCEPT_SET WHERE concept_set_name = ?)"
cur = conn.cursor() cursor.execute(query, (concept_set_name,))
# 1 if exists, 0 otherwise
return cursor.fetchone()[0] 1
#Check if DB populated with necessary VOCABULARY def export(map_path, export_path, version, omop_metadata):
if not table_exists(cur, "VOCABULARY"): logger.debug(f"exporting with metadata {omop_metadata} at version {version}")
raise Exception(f"Error {db_path} is not yet populated with OMOP VOCABULARY. Please download from https://athena.ohdsi.org/.")
# copy the baseline omop database
#Check if Vocabulary already exists export_db_path = export_path / EXPORT_FILE
elif not omop_vocab_exists(cur, vocab_id): shutil.copy(DB_PATH, export_db_path)
#Create VOCABULARY
df_test = pd.DataFrame([{ # connect to db
"vocabulary_id": vocab_id, conn = sqlite3.connect(export_db_path)
"vocabulary_name": vocab_name, cur = conn.cursor()
"vocabulary_reference": vocab_reference,
"vocabulary_version": vocab_version, #Create VOCABULARY
# "vocabulary_concept_id": 0, df_test = pd.DataFrame([{
}]) "vocabulary_id": omop_metadata['vocabulary_id'],
df_test.to_sql("VOCABULARY", conn, if_exists='append', index=False) "vocabulary_name": omop_metadata['vocabulary_name'],
"vocabulary_reference": omop_metadata['vocabulary_reference'],
#Check if CONCEPT_SET table exists "vocabulary_version": version,
if not table_exists(cur, "CONCEPT_SET"): # "vocabulary_concept_id": 0,
cur.execute(""" }])
CREATE TABLE CONCEPT_SET ( df_test.to_sql("VOCABULARY", conn, if_exists='append', index=False)
concept_set_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each concept set
atlas_id INTEGER, -- Unique identifier generated by ATLAS # Create CONCEPT_SET
concept_set_name TEXT, -- Optional name for the concept set cur.execute("""
concept_set_description TEXT, -- Optional description for the concept set CREATE TABLE CONCEPT_SET (
vocabulary_id TEXT NOT NULL, -- Foreign key to VOCABULARY table concept_set_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each concept set
FOREIGN KEY (vocabulary_id) REFERENCES VOCABULARY(vocabulary_id) atlas_id INTEGER, -- Unique identifier generated by ATLAS
);""") concept_set_name TEXT, -- Optional name for the concept set
concept_set_description TEXT, -- Optional description for the concept set
vocabulary_id TEXT NOT NULL, -- Foreign key to VOCABULARY table
FOREIGN KEY (vocabulary_id) REFERENCES VOCABULARY(vocabulary_id)
);""")
#Check if CONCEPT_SET_ITEM table exists # Create CONCEPT_SET_ITEM
if not table_exists(cur, "CONCEPT_SET_ITEM"): cur.execute("""
cur.execute(""" CREATE TABLE CONCEPT_SET_ITEM (
CREATE TABLE CONCEPT_SET_ITEM ( concept_set_item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each mapping
concept_set_item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each mapping concept_set_id INTEGER NOT NULL, -- Foreign key to CONCEPT_SET table
concept_set_id INTEGER NOT NULL, -- Foreign key to CONCEPT_SET table concept_id INTEGER NOT NULL, -- Foreign key to CONCEPT table
concept_id INTEGER NOT NULL, -- Foreign key to CONCEPT table FOREIGN KEY (concept_set_id) REFERENCES CONCEPT_SET(concept_set_id),
FOREIGN KEY (concept_set_id) REFERENCES CONCEPT_SET(concept_set_id), FOREIGN KEY (concept_id) REFERENCES CONCEPT(concept_id)
FOREIGN KEY (concept_id) REFERENCES CONCEPT(concept_id) );""")
);""")
conn.close() # read map files
map_files = list(map_path.glob("*.csv"))
def publish_concept_sets(out, db_path, vocab_output, vocab_type, output_version): total = len(map_files)
conn = sqlite3.connect(db_path) logger.info(f"Exporting {total} map files")
cur = conn.cursor() for index, map_file in enumerate(map_files):
logger.info(f"Processing {index+1} of {total}: {map_file}")
for concept_set_name, grp in out.groupby("CONCEPT_SET"): df = pd.read_csv(map_file)
#Create Concept_Set
if not sql_row_exist(conn, "CONCEPT_SET", "concept_set_name", concept_set_name): for concept_set_name, grp in df.groupby("CONCEPT_SET"):
cur.execute(f"INSERT INTO CONCEPT_SET (concept_set_name, vocabulary_id) VALUES ('{concept_set_name}', 'MELDB');")
else: # create Concept_Set
logger.debug("concept_set", concept_set_name, "already exists") if not concept_set_exist(cur, concept_set_name):
#TODO: ask to remove old concept_set? cur.execute(f"INSERT INTO CONCEPT_SET (concept_set_name, vocabulary_id) VALUES ('{concept_set_name}', '{omop_metadata['vocabulary_id']}');")
else:
#Get Concept_set_Id logger.debug(f"Concept_set {concept_set_name} already exists")
query = "SELECT concept_set_id FROM CONCEPT_SET WHERE concept_set_name = ? AND vocabulary_id = ?;" #TODO: ask to remove old concept_set?
cur.execute(query, (concept_set_name, vocab_output, ))
concept_set_id = cur.fetchone()[0] # get Concept_set_Id
query = "SELECT concept_set_id FROM CONCEPT_SET WHERE concept_set_name = ? AND vocabulary_id = ?;"
#Get corresponing Concept_id (OMOP) for each Concept_code (e.g. SNOMED) target_code_type = map_file.stem
concept_codes = "'"+"', '".join(list(grp["CONCEPT"].astype(str)))+"'" cur.execute(query, (concept_set_name, omop_metadata['vocabulary_id'], ))
query = f"SELECT concept_id FROM CONCEPT WHERE vocabulary_id = ? AND concept_code IN ({concept_codes});" # FAILS HERE WITH NONE REUR
cur.execute(query, (vocab_type, )) logger.debug(f"target code type {target_code_type}")
df_out = pd.DataFrame(cur.fetchall(), columns=["concept_id"]) logger.debug(f"omop code type {omop_vocab_types[target_code_type]}")
concept_set_id = cur.fetchone()[0]
if not len(grp) == len(df_out): logger.debug(f"concept set id {concept_set_id}")
logger.error("ERROR: Some", vocab_type, "Codes do not exist in OMOP Database")
# get corresponing Concept_id (OMOP) for each Concept_code (e.g. SNOMED)
#Create Concept_set_item concept_codes = "'"+"', '".join(list(grp["CONCEPT"].astype(str)))+"'"
df_out["concept_set_id"] = concept_set_id query = f"SELECT concept_id FROM CONCEPT WHERE vocabulary_id = ? AND concept_code IN ({concept_codes});"
df_out.to_sql("CONCEPT_SET_ITEM", conn, if_exists='append', index=False) cur.execute(query, (omop_vocab_types[target_code_type], ))
df_out = pd.DataFrame(cur.fetchall(), columns=["concept_id"])
conn.close()
if not len(grp) == len(df_out):
logger.error(f"ERROR: Some {omop_vocab_types[target_code_type]} Codes do not exist in OMOP Database")
#Create Concept_set_item
df_out["concept_set_id"] = concept_set_id
df_out.to_sql("CONCEPT_SET_ITEM", conn, if_exists='append', index=False)
conn.close()
logger.debug(f"Created export db successfully")
return export_db_path
return export_db_path
...@@ -411,14 +411,3 @@ class CodeTypeParser(): ...@@ -411,14 +411,3 @@ class CodeTypeParser():
"med": Med(), "med": Med(),
"cprd": Cprd(), "cprd": Cprd(),
} }
vocab_types = {
"read2": "Read",
"read3": None,
"icd10": "ICD10CM",
"snomed": "SNOMED",
"opcs4": "OPCS4",
"atc": "ATC",
"med": None,
"cprd": None,
}
\ No newline at end of file
...@@ -27,7 +27,8 @@ DEFAULT_PHEN_PATH = Path('./workspace') / PHEN_DIR ...@@ -27,7 +27,8 @@ DEFAULT_PHEN_PATH = Path('./workspace') / PHEN_DIR
CODES_DIR = 'codes' CODES_DIR = 'codes'
MAP_DIR = 'map' MAP_DIR = 'map'
CONCEPT_SET_DIR = 'concept-set' CONCEPT_SET_DIR = 'concept-set'
DEFAULT_PHEN_DIR_LIST = [CODES_DIR, MAP_DIR, CONCEPT_SET_DIR] OMOP_DIR = 'omop'
DEFAULT_PHEN_DIR_LIST = [CODES_DIR, MAP_DIR, CONCEPT_SET_DIR, OMOP_DIR]
CONFIG_FILE = 'config.json' CONFIG_FILE = 'config.json'
DEFAULT_GIT_BRANCH = 'main' DEFAULT_GIT_BRANCH = 'main'
...@@ -626,6 +627,39 @@ def publish(phen_dir): ...@@ -626,6 +627,39 @@ def publish(phen_dir):
logger.info(f"Phenotype published successfully") logger.info(f"Phenotype published successfully")
def export(phen_dir, version):
"""Exports a phen repo at a specific tagged version into a target directory"""
logger.info(f"Exporting phenotype {phen_dir} at version {version}")
# validate configuration
validate(phen_dir)
phen_path = Path(phen_dir)
# load configuration
config_path = phen_path / CONFIG_FILE
config = json.load(open(config_path, "rb"))
map_path = phen_path / MAP_DIR
if not map_path.exists():
logger.warning(f"Map path does not exist '{map_path}'")
export_path = phen_path / OMOP_DIR
# check export directory exists and if not create it
if not export_path.exists():
export_path.mkdir(parents=True)
logger.debug(f"OMOP export directory '{export_path}' created.")
# omop export db
export_db_path = omop.export(map_path,
export_path,
config['concept_sets']['version'],
config['concept_sets']['omop'])
# write to tables
# export as csv
logger.info(f"Phenotype exported successfully")
def copy(phen_dir, target_dir, version): def copy(phen_dir, target_dir, version):
"""Copys a phen repo at a specific tagged version into a target directory""" """Copys a phen repo at a specific tagged version into a target directory"""
...@@ -650,6 +684,7 @@ def copy(phen_dir, target_dir, version): ...@@ -650,6 +684,7 @@ def copy(phen_dir, target_dir, version):
# If copy directory exists, open the repo # If copy directory exists, open the repo
logger.debug(f"Copy of repository already exists in {copy_path}. Opening the repo...") logger.debug(f"Copy of repository already exists in {copy_path}. Opening the repo...")
repo = git.Repo(copy_path) repo = git.Repo(copy_path)
# Check out the latest commit or specified version # Check out the latest commit or specified version
if version: if version:
# Checkout a specific version (e.g., branch, tag, or commit hash) # Checkout a specific version (e.g., branch, tag, or commit hash)
......
...@@ -63,3 +63,7 @@ dependencies = [ ...@@ -63,3 +63,7 @@ dependencies = [
"hatch", "hatch",
"pytest" "pytest"
] ]
[tool.hatch.envs.default.scripts]
dev = "python -m acmc"
test = "pytest tests"
\ No newline at end of file
...@@ -60,7 +60,6 @@ def test_phen_workflow(tmp_dir, monkeypatch, caplog): ...@@ -60,7 +60,6 @@ def test_phen_workflow(tmp_dir, monkeypatch, caplog):
shutil.copytree(source, destination) shutil.copytree(source, destination)
else: else:
shutil.copy(source, destination) shutil.copy(source, destination)
shutil.copy( phen_path / 'config1.json', phen_path / 'config.json')
monkeypatch.setattr(sys, "argv", ["main.py", "phen", "validate", "-d", str(phen_path.resolve())]) monkeypatch.setattr(sys, "argv", ["main.py", "phen", "validate", "-d", str(phen_path.resolve())])
main.main() main.main()
......
tox.ini 0 → 100644
[tox]
envlist = py39, py310, py311
isolated_build = true
[testenv]
description = Run pytests with hatch
deps =
pytest
hatch
commands =
hatch run test
[testenv:build]
description = Build package using Hatch
skip_install = true
deps = hatch
commands =
hatch build
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment