Skip to content
Snippets Groups Projects
Commit 829bb6c2 authored by mjbonifa's avatar mjbonifa
Browse files

test: run mypc to check types and add annotations. run black and reformatted...

test: run mypc to check types and add annotations. run black and reformatted all python files. run tests and still pass
parent 9e8fbed6
Branches
Tags
No related merge requests found
try:
from importlib.metadata import version
except ImportError: # Python <3.8
from pkg_resources import get_distribution as version
__version__ = version("acmc")
......@@ -3,22 +3,24 @@ import logging
DEFAULT_LOG_FILE = "acmc.log"
# TODO: Determine if bcolours is still needed considering use of logging not print
class bcolors: # for printing coloured text
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
def setup_logger(log_level=logging.INFO):
"""Sets up logger as a singleton outputing to file and sysout syserr"""
# Create a logger
logger = logging.getLogger('acmc_logger')
logger = logging.getLogger("acmc_logger")
logger.setLevel(logging.INFO)
if not logger.hasHandlers():
......@@ -33,9 +35,11 @@ def setup_logger(log_level=logging.INFO):
# Create a formatter for how the log messages should look
# Add the formatter to both handlers
file_formatter = logging.Formatter('%(asctime)s - - %(levelname)s - %(message)s')
file_formatter = logging.Formatter(
"%(asctime)s - - %(levelname)s - %(message)s"
)
file_handler.setFormatter(file_formatter)
stream_formatter = logging.Formatter('[%(levelname)s] - %(message)s')
stream_formatter = logging.Formatter("[%(levelname)s] - %(message)s")
stream_handler.setFormatter(stream_formatter)
# Add the handlers to the logger
......@@ -44,9 +48,10 @@ def setup_logger(log_level=logging.INFO):
return logger
def set_log_level(log_level):
"""Sets the log level for the acmc logger"""
logger = logging.getLogger('acmc_logger')
logger = logging.getLogger("acmc_logger")
logger.setLevel(log_level) # Set logger level
# Also update handlers to match the new level
......
......@@ -8,206 +8,268 @@ from acmc import trud, omop, phen, logging_config as lc
# setup logging
logger = lc.setup_logger()
DEFAULT_WORKING_PATH = Path('./workspace')
DEFAULT_WORKING_PATH = Path("./workspace")
def trud_install(args):
"""Handle the `trud install` command."""
trud.install()
def omop_install(args):
"""Handle the `omop install` command."""
omop.install(args.omop_zip_file, args.version)
def omop_clear(args):
"""Handle the `omop clear` command."""
omop.clear(omop.DB_PATH)
def omop_delete(args):
"""Handle the `omop delete` command."""
omop.delete(omop.DB_PATH)
def phen_init(args):
"""Handle the `phen init` command."""
phen.init(args.phen_dir, args.remote_url)
def phen_validate(args):
"""Handle the `phen validate` command."""
phen.validate(args.phen_dir)
def phen_map(args):
"""Handle the `phen map` command."""
phen.map(args.phen_dir,
args.target_coding)
phen.map(args.phen_dir, args.target_coding)
def phen_export(args):
"""Handle the `phen copy` command."""
phen.export(args.phen_dir,
args.version)
phen.export(args.phen_dir, args.version)
def phen_publish(args):
"""Handle the `phen publish` command."""
phen.publish(args.phen_dir)
def phen_copy(args):
"""Handle the `phen copy` command."""
phen.copy(args.phen_dir,
args.target_dir,
args.version)
phen.copy(args.phen_dir, args.target_dir, args.version)
def phen_diff(args):
"""Handle the `phen diff` command."""
phen.diff(args.phen_dir,
args.phen_dir_old)
phen.diff(args.phen_dir, args.phen_dir_old)
def main():
parser = argparse.ArgumentParser(description="ACMC command-line tool")
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
parser.add_argument("--version", action="version", version=f"acmc {acmc.__version__}")
parser.add_argument(
"--version", action="version", version=f"acmc {acmc.__version__}"
)
# Top-level commands
subparsers = parser.add_subparsers(dest="command", required=True, help="Available commands")
subparsers = parser.add_subparsers(
dest="command", required=True, help="Available commands"
)
### TRUD Command ###
trud_parser = subparsers.add_parser("trud", help="TRUD commands")
trud_subparsers = trud_parser.add_subparsers(dest="subcommand", required=True, help="TRUD subcommands")
trud_subparsers = trud_parser.add_subparsers(
dest="subcommand", required=True, help="TRUD subcommands"
)
# trud install
trud_install_parser = trud_subparsers.add_parser("install", help="Install TRUD components")
trud_install_parser = trud_subparsers.add_parser(
"install", help="Install TRUD components"
)
trud_install_parser.set_defaults(func=trud_install)
### OMOP Command ###
omop_parser = subparsers.add_parser("omop", help="OMOP commands")
omop_subparsers = omop_parser.add_subparsers(dest="subcommand", required=True, help="OMOP subcommands")
omop_subparsers = omop_parser.add_subparsers(
dest="subcommand", required=True, help="OMOP subcommands"
)
# omop install
omop_install_parser = omop_subparsers.add_parser("install", help="Install OMOP codes within database")
omop_install_parser.add_argument("-f",
"--omop-zip-file",
required=True,
help="Path to downloaded OMOP zip file")
omop_install_parser.add_argument("-v",
"--version",
required=True,
help="OMOP vocabularies release version")
omop_install_parser = omop_subparsers.add_parser(
"install", help="Install OMOP codes within database"
)
omop_install_parser.add_argument(
"-f", "--omop-zip-file", required=True, help="Path to downloaded OMOP zip file"
)
omop_install_parser.add_argument(
"-v", "--version", required=True, help="OMOP vocabularies release version"
)
omop_install_parser.set_defaults(func=omop_install)
# omop clear
omop_clear_parser = omop_subparsers.add_parser("clear", help="Clear OMOP data from database")
omop_clear_parser = omop_subparsers.add_parser(
"clear", help="Clear OMOP data from database"
)
omop_clear_parser.set_defaults(func=omop_clear)
# omop delete
omop_delete_parser = omop_subparsers.add_parser("delete", help="Delete OMOP database")
omop_delete_parser = omop_subparsers.add_parser(
"delete", help="Delete OMOP database"
)
omop_delete_parser.set_defaults(func=omop_delete)
### PHEN Command ###
phen_parser = subparsers.add_parser("phen", help="Phen commands")
phen_subparsers = phen_parser.add_subparsers(dest="subcommand", required=True, help="Phen subcommands")
phen_subparsers = phen_parser.add_subparsers(
dest="subcommand", required=True, help="Phen subcommands"
)
# phen init
phen_init_parser = phen_subparsers.add_parser("init", help="Initiatise phenotype directory")
phen_init_parser.add_argument("-d",
phen_init_parser = phen_subparsers.add_parser(
"init", help="Initiatise phenotype directory"
)
phen_init_parser.add_argument(
"-d",
"--phen-dir",
type=str,
default=str(phen.DEFAULT_PHEN_PATH.resolve()),
help="Phenotype workspace directory")
phen_init_parser.add_argument("-r",
"--remote_url",
help="URL to remote git repository")
help="Phenotype workspace directory",
)
phen_init_parser.add_argument(
"-r", "--remote_url", help="URL to remote git repository"
)
phen_init_parser.set_defaults(func=phen_init)
# phen validate
phen_validate_parser = phen_subparsers.add_parser("validate", help="Validate phenotype configuration")
phen_validate_parser.add_argument("-d",
phen_validate_parser = phen_subparsers.add_parser(
"validate", help="Validate phenotype configuration"
)
phen_validate_parser.add_argument(
"-d",
"--phen-dir",
type=str,
default=str(phen.DEFAULT_PHEN_PATH.resolve()),
help="Phenotype workspace directory")
help="Phenotype workspace directory",
)
phen_validate_parser.set_defaults(func=phen_validate)
# phen map
phen_map_parser = phen_subparsers.add_parser("map", help="Process phen mapping")
phen_map_parser.add_argument("-d",
phen_map_parser.add_argument(
"-d",
"--phen-dir",
type=str,
default=str(phen.DEFAULT_PHEN_PATH.resolve()),
help="Phenotype workspace directory")
phen_map_parser.add_argument("-t",
help="Phenotype workspace directory",
)
phen_map_parser.add_argument(
"-t",
"--target-coding",
required=True,
choices=['read2', 'read3', 'icd10', 'snomed', 'opcs4'],
help="Specify the target coding (read2, read3, icd10, snomed, opcs4)")
phen_map_parser.add_argument("-o",
choices=["read2", "read3", "icd10", "snomed", "opcs4"],
help="Specify the target coding (read2, read3, icd10, snomed, opcs4)",
)
phen_map_parser.add_argument(
"-o",
"--output",
choices=["csv", "omop"],
nargs="+", # allows one or more values
default=["csv"], # default to CSV if not specified
help="Specify output format(s): 'csv', 'omop', or both (default: csv)")
help="Specify output format(s): 'csv', 'omop', or both (default: csv)",
)
phen_map_parser.set_defaults(func=phen_map)
# phen export
phen_export_parser = phen_subparsers.add_parser("export", help="Export phen to OMOP database")
phen_export_parser.add_argument("-d",
phen_export_parser = phen_subparsers.add_parser(
"export", help="Export phen to OMOP database"
)
phen_export_parser.add_argument(
"-d",
"--phen-dir",
type=str,
default=str(phen.DEFAULT_PHEN_PATH.resolve()),
help="Phenotype workspace directory")
phen_export_parser.add_argument("-v",
help="Phenotype workspace directory",
)
phen_export_parser.add_argument(
"-v",
"--version",
type=str,
default='latest',
help="Phenotype version to export, defaults to the latest version")
default="latest",
help="Phenotype version to export, defaults to the latest version",
)
phen_export_parser.set_defaults(func=phen_export)
# phen publish
phen_publish_parser = phen_subparsers.add_parser("publish", help="Publish phenotype configuration")
phen_publish_parser.add_argument("-d",
phen_publish_parser = phen_subparsers.add_parser(
"publish", help="Publish phenotype configuration"
)
phen_publish_parser.add_argument(
"-d",
"--phen-dir",
type=str,
default=str(phen.DEFAULT_PHEN_PATH.resolve()),
help="Phenotype workspace directory")
help="Phenotype workspace directory",
)
phen_publish_parser.set_defaults(func=phen_publish)
# phen copy
phen_copy_parser = phen_subparsers.add_parser("copy", help="Publish phenotype configuration")
phen_copy_parser.add_argument("-d",
phen_copy_parser = phen_subparsers.add_parser(
"copy", help="Publish phenotype configuration"
)
phen_copy_parser.add_argument(
"-d",
"--phen-dir",
type=str,
default=str(phen.DEFAULT_PHEN_PATH.resolve()),
help="Phenotype workspace directory")
phen_copy_parser.add_argument("-td",
help="Phenotype workspace directory",
)
phen_copy_parser.add_argument(
"-td",
"--target-dir",
type=str,
default=str(DEFAULT_WORKING_PATH.resolve()),
help="Target directory for the copy")
phen_copy_parser.add_argument("-v",
help="Target directory for the copy",
)
phen_copy_parser.add_argument(
"-v",
"--version",
type=str,
default='latest',
help="Phenotype version to copy, defaults to the latest version")
default="latest",
help="Phenotype version to copy, defaults to the latest version",
)
phen_copy_parser.set_defaults(func=phen_copy)
# phen diff
phen_diff_parser = phen_subparsers.add_parser("diff", help="Publish phenotype configuration")
phen_diff_parser.add_argument("-d",
phen_diff_parser = phen_subparsers.add_parser(
"diff", help="Publish phenotype configuration"
)
phen_diff_parser.add_argument(
"-d",
"--phen-dir",
type=str,
default=str(phen.DEFAULT_PHEN_PATH.resolve()),
help="Directory for the new phenotype version")
phen_diff_parser.add_argument("-old",
help="Directory for the new phenotype version",
)
phen_diff_parser.add_argument(
"-old",
"--phen-dir-old",
required=True,
help="Directory of the old phenotype version that is compared to the new one")
help="Directory of the old phenotype version that is compared to the new one",
)
phen_diff_parser.set_defaults(func=phen_diff)
# Parse arguments
args = parser.parse_args()
# setup logging
if(args.debug):
if args.debug:
lc.set_log_level(logging.DEBUG)
# Call the function associated with the command
args.func(args)
if __name__ == "__main__":
main()
......@@ -9,18 +9,17 @@ import json
import yaml
from pathlib import Path
from acmc import logging_config
# setup logging
logger = logging_config.setup_logger()
# constants
VOCAB_PATH = Path('./vocab/omop')
DB_PATH = VOCAB_PATH / 'omop_54.sqlite'
VERSION_FILE = 'omop_version.yaml'
VOCAB_PATH = Path("./vocab/omop")
DB_PATH = VOCAB_PATH / "omop_54.sqlite"
VERSION_FILE = "omop_version.yaml"
VERSION_PATH = VOCAB_PATH / VERSION_FILE
EXPORT_FILE = 'omop_export.db'
EXPORT_FILE = "omop_export.db"
vocabularies = {
"source": "OHDSI Athena",
......@@ -37,9 +36,9 @@ vocabularies = {
{"id": 75, "name": "dm+d"},
{"id": 144, "name": "UK Biobank"},
{"id": 154, "name": "NHS Ethnic Category"},
{ "id": 155, "name": "NHS Place of Service"}
{"id": 155, "name": "NHS Place of Service"},
],
"tables": []
"tables": [],
}
omop_vocab_types = {
......@@ -53,6 +52,7 @@ omop_vocab_types = {
"cprd": None,
}
# Populate SQLite3 Database with default OMOP CONCEPTS
def install(omop_zip_file, version):
"""Installs the OMOP release csv files in a file-based sql database"""
......@@ -81,7 +81,7 @@ def install (omop_zip_file, version):
logger.debug(f"Deleted OMOP csv file: {file}")
# Extract ZIP contents
with zipfile.ZipFile(omop_zip_path, 'r') as zip_ref:
with zipfile.ZipFile(omop_zip_path, "r") as zip_ref:
zip_ref.extractall(VOCAB_PATH)
logger.info(f"Extracted OMOP zip file {omop_zip_path} to {VOCAB_PATH}/")
......@@ -93,12 +93,14 @@ def install (omop_zip_file, version):
table_count = 1
for filename in csv_files:
try:
logger.info(f"Processing {table_count} of {total_tables_count} tables: {filename}")
logger.info(
f"Processing {table_count} of {total_tables_count} tables: {filename}"
)
# read the CSV file with the specified delimiter
df = pd.read_csv(filename, delimiter="\t", low_memory=False)
# export Table to sqlite db
df.to_sql(filename.stem, conn, if_exists='replace', index=False)
df.to_sql(filename.stem, conn, if_exists="replace", index=False)
# add to the metadata
vocabularies["tables"].append(filename.stem)
......@@ -113,12 +115,14 @@ def install (omop_zip_file, version):
logger.info(f"OMOP installation completed")
def write_version_file(version):
"""Writes the OMOP vocaburaries and version to a file"""
vocabularies['version'] = version
vocabularies["version"] = version
with open(VERSION_PATH, "w") as file:
yaml.dump(vocabularies, file, default_flow_style=False, sort_keys=False)
def clear(db_path):
"""Clears the OMOP sql database"""
logger.info(f"Clearing OMOP data from database")
......@@ -139,6 +143,7 @@ def clear(db_path):
conn.close()
logger.info(f"OMOP database cleared")
def delete(db_path):
"""Deletes the OMOP sql database"""
logger.info(f"Deleting OMOP database")
......@@ -149,6 +154,7 @@ def delete(db_path):
omop_db_path.unlink()
logger.info(f"OMOP database deleted")
def table_exists(cursor, table_name):
# Query to check if the table exists
cursor.execute(
......@@ -157,7 +163,7 @@ def table_exists(cursor, table_name):
FROM sqlite_master
WHERE type='table' AND name=?
""",
(table_name,)
(table_name,),
)
# Fetch the result
......@@ -165,6 +171,7 @@ def table_exists(cursor, table_name):
return result is not None
def vocab_exists(cursor, vocab_id):
# Query to check if the table exists
cursor.execute(
......@@ -173,7 +180,7 @@ def vocab_exists(cursor, vocab_id):
FROM VOCABULARY
WHERE vocabulary_id=?
""",
(vocab_id,)
(vocab_id,),
)
# Fetch the result
......@@ -181,6 +188,7 @@ def vocab_exists(cursor, vocab_id):
return result is not None
def concept_set_exist(cursor, concept_set_name):
query = f"SELECT EXISTS (SELECT 1 FROM CONCEPT_SET WHERE concept_set_name = ?)"
......@@ -189,6 +197,7 @@ def concept_set_exist(cursor, concept_set_name):
# 1 if exists, 0 otherwise
return cursor.fetchone()[0] == 1
def export(map_path, export_path, version, omop_metadata):
logger.debug(f"exporting with metadata {omop_metadata} at version {version}")
......@@ -201,17 +210,22 @@ def export(map_path, export_path, version, omop_metadata):
cur = conn.cursor()
# Create VOCABULARY
df_test = pd.DataFrame([{
"vocabulary_id": omop_metadata['vocabulary_id'],
"vocabulary_name": omop_metadata['vocabulary_name'],
"vocabulary_reference": omop_metadata['vocabulary_reference'],
df_test = pd.DataFrame(
[
{
"vocabulary_id": omop_metadata["vocabulary_id"],
"vocabulary_name": omop_metadata["vocabulary_name"],
"vocabulary_reference": omop_metadata["vocabulary_reference"],
"vocabulary_version": version,
# "vocabulary_concept_id": 0,
}])
df_test.to_sql("VOCABULARY", conn, if_exists='append', index=False)
}
]
)
df_test.to_sql("VOCABULARY", conn, if_exists="append", index=False)
# Create CONCEPT_SET
cur.execute("""
cur.execute(
"""
CREATE TABLE CONCEPT_SET (
concept_set_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each concept set
atlas_id INTEGER, -- Unique identifier generated by ATLAS
......@@ -219,18 +233,20 @@ def export(map_path, export_path, version, omop_metadata):
concept_set_description TEXT, -- Optional description for the concept set
vocabulary_id TEXT NOT NULL, -- Foreign key to VOCABULARY table
FOREIGN KEY (vocabulary_id) REFERENCES VOCABULARY(vocabulary_id)
);""")
);"""
)
# Create CONCEPT_SET_ITEM
cur.execute("""
cur.execute(
"""
CREATE TABLE CONCEPT_SET_ITEM (
concept_set_item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each mapping
concept_set_id INTEGER NOT NULL, -- Foreign key to CONCEPT_SET table
concept_id INTEGER NOT NULL, -- Foreign key to CONCEPT table
FOREIGN KEY (concept_set_id) REFERENCES CONCEPT_SET(concept_set_id),
FOREIGN KEY (concept_id) REFERENCES CONCEPT(concept_id)
);""")
);"""
)
# read map files
map_files = list(map_path.glob("*.csv"))
......@@ -244,7 +260,9 @@ def export(map_path, export_path, version, omop_metadata):
# create Concept_Set
if not concept_set_exist(cur, concept_set_name):
cur.execute(f"INSERT INTO CONCEPT_SET (concept_set_name, vocabulary_id) VALUES ('{concept_set_name}', '{omop_metadata['vocabulary_id']}');")
cur.execute(
f"INSERT INTO CONCEPT_SET (concept_set_name, vocabulary_id) VALUES ('{concept_set_name}', '{omop_metadata['vocabulary_id']}');"
)
else:
logger.debug(f"Concept_set {concept_set_name} already exists")
# TODO: ask to remove old concept_set?
......@@ -252,7 +270,13 @@ def export(map_path, export_path, version, omop_metadata):
# get Concept_set_Id
query = "SELECT concept_set_id FROM CONCEPT_SET WHERE concept_set_name = ? AND vocabulary_id = ?;"
target_code_type = map_file.stem
cur.execute(query, (concept_set_name, omop_metadata['vocabulary_id'], ))
cur.execute(
query,
(
concept_set_name,
omop_metadata["vocabulary_id"],
),
)
# FAILS HERE WITH NONE REUR
logger.debug(f"target code type {target_code_type}")
logger.debug(f"omop code type {omop_vocab_types[target_code_type]}")
......@@ -266,11 +290,13 @@ def export(map_path, export_path, version, omop_metadata):
df_out = pd.DataFrame(cur.fetchall(), columns=["concept_id"])
if not len(grp) == len(df_out):
logger.error(f"ERROR: Some {omop_vocab_types[target_code_type]} Codes do not exist in OMOP Database")
logger.error(
f"ERROR: Some {omop_vocab_types[target_code_type]} Codes do not exist in OMOP Database"
)
# Create Concept_set_item
df_out["concept_set_id"] = concept_set_id
df_out.to_sql("CONCEPT_SET_ITEM", conn, if_exists='append', index=False)
df_out.to_sql("CONCEPT_SET_ITEM", conn, if_exists="append", index=False)
# Output all tables to CSV
# Get the list of all tables
......
This diff is collapsed.
This diff is collapsed.
......@@ -12,20 +12,23 @@ from pathlib import Path
# setup logging
import acmc.logging_config as lc
logger = lc.setup_logger()
# Constants
FQDN = "isd.digital.nhs.uk"
VOCAB_PATH = Path('./vocab/trud')
VERSION_FILE = 'trud_version.yaml'
VOCAB_PATH = Path("./vocab/trud")
VERSION_FILE = "trud_version.yaml"
VERSION_PATH = VOCAB_PATH / VERSION_FILE
DOWNLOADS_PATH = VOCAB_PATH / 'downloads'
PROCESSED_PATH = VOCAB_PATH / 'processed'
DOWNLOADS_PATH = VOCAB_PATH / "downloads"
PROCESSED_PATH = VOCAB_PATH / "processed"
def error_exit(message):
logger.error(message, "error")
sys.exit(1)
def get_releases(item_id, API_KEY, latest=False):
"""Retrieve release information for an item from the TRUD API."""
url = f"https://{FQDN}/trud/api/v1/keys/{API_KEY}/items/{item_id}/releases"
......@@ -34,7 +37,9 @@ def get_releases(item_id, API_KEY, latest=False):
response = requests.get(url)
if response.status_code != 200:
error_exit(f"Failed to fetch releases for item {item_id}. Status code: {response.status_code}, error {response.json()['message']}. If no releases found for API key, please ensure you are subscribed to the data release and that it is not pending approval")
error_exit(
f"Failed to fetch releases for item {item_id}. Status code: {response.status_code}, error {response.json()['message']}. If no releases found for API key, please ensure you are subscribed to the data release and that it is not pending approval"
)
data = response.json()
if data.get("message") != "OK":
......@@ -42,12 +47,17 @@ def get_releases(item_id, API_KEY, latest=False):
return data.get("releases", [])
def download_release_file(item_id, release_ordinal, release, file_json_prefix, file_type=None):
def download_release_file(
item_id, release_ordinal, release, file_json_prefix, file_type=None
):
"""Download specified file type for a given release of an item."""
# check folder is a directory
if not DOWNLOADS_PATH.is_dir():
raise NotADirectoryError(f"Error: '{DOWNLOADS_PATH}' for TRUD resources is not a directory")
raise NotADirectoryError(
f"Error: '{DOWNLOADS_PATH}' for TRUD resources is not a directory"
)
file_type = file_type or file_json_prefix
file_url = release.get(f"{file_json_prefix}FileUrl")
......@@ -55,9 +65,13 @@ def download_release_file(item_id, release_ordinal, release, file_json_prefix, f
file_destination = DOWNLOADS_PATH / file_name
if not file_url or not file_name:
error_exit(f"Missing {file_type} file information for release {release_ordinal} of item {item_id}.")
error_exit(
f"Missing {file_type} file information for release {release_ordinal} of item {item_id}."
)
logger.info(f"Downloading item {item_id} {file_type} file: {file_name} from {file_url} to {file_destination}")
logger.info(
f"Downloading item {item_id} {file_type} file: {file_name} from {file_url} to {file_destination}"
)
response = requests.get(file_url, stream=True)
if response.status_code == 200:
......@@ -65,7 +79,10 @@ def download_release_file(item_id, release_ordinal, release, file_json_prefix, f
f.write(response.content)
return file_destination
else:
error_exit(f"Failed to download {file_type} file for item {item_id}. Status code: {response.status_code}")
error_exit(
f"Failed to download {file_type} file for item {item_id}. Status code: {response.status_code}"
)
def validate_download_hash(file_destination: str, item_hash: str):
with open(file_destination, "rb") as f:
......@@ -74,163 +91,206 @@ def validate_download_hash(file_destination:str, item_hash:str):
if hash.upper() == item_hash.upper():
logger.debug(f"Verified hash of {file_destination} {hash}")
else:
error_exit(f"Could not validate origin of {file_destination}. The SHA-256 hash should be: {item_hash}, but got {hash} instead")
error_exit(
f"Could not validate origin of {file_destination}. The SHA-256 hash should be: {item_hash}, but got {hash} instead"
)
def unzip_download(file_destination: str):
# check folder is a directory
if not DOWNLOADS_PATH.is_dir():
raise NotADirectoryError(f"Error: '{DOWNLOADS_PATH}' for TRUD resoruces is not a directory")
raise NotADirectoryError(
f"Error: '{DOWNLOADS_PATH}' for TRUD resoruces is not a directory"
)
with zipfile.ZipFile(file_destination, 'r') as zip_ref:
with zipfile.ZipFile(file_destination, "r") as zip_ref:
zip_ref.extractall(DOWNLOADS_PATH)
def extract_icd10():
# ICD10_edition5
file_path = DOWNLOADS_PATH / 'ICD10_Edition5_XML_20160401' / 'Content' / 'ICD10_Edition5_CodesAndTitlesAndMetadata_GB_20160401.xml'
file_path = (
DOWNLOADS_PATH
/ "ICD10_Edition5_XML_20160401"
/ "Content"
/ "ICD10_Edition5_CodesAndTitlesAndMetadata_GB_20160401.xml"
)
df = pd.read_xml(file_path)
df = df[["CODE", "ALT_CODE", "DESCRIPTION"]]
df = df.rename(columns={"CODE":"icd10",
"ALT_CODE":"icd10_alt",
"DESCRIPTION":"description"
})
output_path = PROCESSED_PATH / 'icd10.parquet'
df = df.rename(
columns={"CODE": "icd10", "ALT_CODE": "icd10_alt", "DESCRIPTION": "description"}
)
output_path = PROCESSED_PATH / "icd10.parquet"
df.to_parquet(output_path, index=False)
logger.info(f"Extracted: {output_path}")
def extract_opsc4():
file_path = DOWNLOADS_PATH / 'OPCS410 Data files txt' / 'OPCS410 CodesAndTitles Nov 2022 V1.0.txt'
file_path = (
DOWNLOADS_PATH
/ "OPCS410 Data files txt"
/ "OPCS410 CodesAndTitles Nov 2022 V1.0.txt"
)
df = pd.read_csv(file_path, sep='\t', dtype=str, header=None)
df = pd.read_csv(file_path, sep="\t", dtype=str, header=None)
df = df.rename(columns={0: "opcs4", 1: "description"})
output_path = PROCESSED_PATH / 'opcs4.parquet'
output_path = PROCESSED_PATH / "opcs4.parquet"
df.to_parquet(output_path, index=False)
logger.info(f"Extracted: {output_path}")
def extract_nhs_data_migrations():
# NHS Data Migrations
# snomed only
file_path = DOWNLOADS_PATH / 'Mapping Tables' / 'Updated' / 'Clinically Assured' / 'sctcremap_uk_20200401000001.txt'
df = pd.read_csv(file_path, sep='\t')
file_path = (
DOWNLOADS_PATH
/ "Mapping Tables"
/ "Updated"
/ "Clinically Assured"
/ "sctcremap_uk_20200401000001.txt"
)
df = pd.read_csv(file_path, sep="\t")
df = df[["SCT_CONCEPTID"]]
df = df.rename(columns={"SCT_CONCEPTID": "snomed"})
df = df.drop_duplicates()
df = df.astype(str)
output_path = PROCESSED_PATH / 'snomed.parquet'
output_path = PROCESSED_PATH / "snomed.parquet"
df.to_parquet(output_path, index=False)
logger.info(f"Extracted: {output_path}")
# r2 -> r3
file_path = DOWNLOADS_PATH / 'Mapping Tables' / 'Updated' / 'Clinically Assured' / 'rctctv3map_uk_20200401000001.txt'
df = pd.read_csv(file_path, sep='\t')
file_path = (
DOWNLOADS_PATH
/ "Mapping Tables"
/ "Updated"
/ "Clinically Assured"
/ "rctctv3map_uk_20200401000001.txt"
)
df = pd.read_csv(file_path, sep="\t")
df = df[["V2_CONCEPTID", "CTV3_CONCEPTID"]]
df = df.rename(columns={"V2_CONCEPTID":"read2",
"CTV3_CONCEPTID":"read3"})
df = df.rename(columns={"V2_CONCEPTID": "read2", "CTV3_CONCEPTID": "read3"})
output_path = PROCESSED_PATH / 'read2_to_read3.parquet'
output_path = PROCESSED_PATH / "read2_to_read3.parquet"
df.to_parquet(output_path, index=False)
logger.info(f"Extracted: {output_path}")
# r3->r2
file_path = DOWNLOADS_PATH / 'Mapping Tables' / 'Updated' / 'Clinically Assured' / 'ctv3rctmap_uk_20200401000002.txt'
df = pd.read_csv(file_path, sep='\t')
file_path = (
DOWNLOADS_PATH
/ "Mapping Tables"
/ "Updated"
/ "Clinically Assured"
/ "ctv3rctmap_uk_20200401000002.txt"
)
df = pd.read_csv(file_path, sep="\t")
df = df[["CTV3_CONCEPTID", "V2_CONCEPTID"]]
df = df.rename(columns={"CTV3_CONCEPTID":"read3",
"V2_CONCEPTID":"read2"})
df = df.rename(columns={"CTV3_CONCEPTID": "read3", "V2_CONCEPTID": "read2"})
df = df.drop_duplicates()
df = df[~df["read2"].str.match("^.*_.*$")] # remove r2 codes with '_'
output_path = PROCESSED_PATH / 'read3_to_read2.parquet'
output_path = PROCESSED_PATH / "read3_to_read2.parquet"
df.to_parquet(output_path, index=False)
logger.info(f"Extracted: {output_path}")
# r2 -> snomed
file_path = DOWNLOADS_PATH / 'Mapping Tables' / 'Updated' / 'Clinically Assured' / 'rcsctmap2_uk_20200401000001.txt'
df = pd.read_csv(file_path, sep='\t', dtype=str)
file_path = (
DOWNLOADS_PATH
/ "Mapping Tables"
/ "Updated"
/ "Clinically Assured"
/ "rcsctmap2_uk_20200401000001.txt"
)
df = pd.read_csv(file_path, sep="\t", dtype=str)
df = df[["ReadCode", "ConceptId"]]
df = df.rename(columns={"ReadCode":"read2",
"ConceptId":"snomed"})
df = df.rename(columns={"ReadCode": "read2", "ConceptId": "snomed"})
output_path = PROCESSED_PATH / 'read2_to_snomed.parquet'
output_path = PROCESSED_PATH / "read2_to_snomed.parquet"
df.to_parquet(output_path, index=False)
logger.info(f"Extracted: {output_path}")
# r3->snomed
file_path = DOWNLOADS_PATH / 'Mapping Tables' / 'Updated' / 'Clinically Assured' / 'ctv3sctmap2_uk_20200401000001.txt'
df = pd.read_csv(file_path, sep='\t', dtype=str)
file_path = (
DOWNLOADS_PATH
/ "Mapping Tables"
/ "Updated"
/ "Clinically Assured"
/ "ctv3sctmap2_uk_20200401000001.txt"
)
df = pd.read_csv(file_path, sep="\t", dtype=str)
df = df[["CTV3_TERMID", "SCT_CONCEPTID"]]
df = df.rename(columns={"CTV3_TERMID":"read3",
"SCT_CONCEPTID":"snomed"})
df = df.rename(columns={"CTV3_TERMID": "read3", "SCT_CONCEPTID": "snomed"})
df["snomed"] = df["snomed"].astype(str)
df = df[~df["snomed"].str.match("^.*_.*$")] # remove snomed codes with '_'
output_path = PROCESSED_PATH / 'read3_to_snomed.parquet'
output_path = PROCESSED_PATH / "read3_to_snomed.parquet"
df.to_parquet(output_path, index=False)
logger.info(f"Extracted: {output_path}")
def extract_nhs_read_browser():
# r2 only
input_path = DOWNLOADS_PATH / 'Standard' / 'V2' / 'ANCESTOR.DBF'
input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ANCESTOR.DBF"
df = simpledbf.Dbf5(input_path).to_dataframe()
df = pd.concat([df['READCODE'], df['DESCENDANT']])
df = pd.concat([df["READCODE"], df["DESCENDANT"]])
df = pd.DataFrame(df.drop_duplicates())
df = df.rename(columns={0: "read2"})
output_path = PROCESSED_PATH / 'read2.parquet'
output_path = PROCESSED_PATH / "read2.parquet"
df.to_parquet(output_path, index=False)
logger.info(f"Extracted: {output_path}")
# r2 -> atc
input_path = DOWNLOADS_PATH / 'Standard' / 'V2' / 'ATC.DBF'
input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ATC.DBF"
df = simpledbf.Dbf5(input_path).to_dataframe()
df = df[["READCODE", "ATC"]]
df = df.rename(columns={"READCODE": "read2", "ATC": "atc"})
output_path = PROCESSED_PATH / 'read2_to_atc.parquet'
output_path = PROCESSED_PATH / "read2_to_atc.parquet"
df.to_parquet(output_path, index=False)
logger.info(f"Extracted: {output_path}")
# r2 -> icd10
input_path = DOWNLOADS_PATH / 'Standard' / 'V2' / 'ICD10.DBF'
input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ICD10.DBF"
df = simpledbf.Dbf5(input_path).to_dataframe()
df = df[["READ_CODE", "TARG_CODE"]]
df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "icd10"})
df = df[~df["icd10"].str.match("^.*-.*$")] # remove codes with '-'
df = df[~df["read2"].str.match("^.*-.*$")] # remove codes with '-'
output_path = PROCESSED_PATH / 'read2_to_icd10.parquet'
output_path = PROCESSED_PATH / "read2_to_icd10.parquet"
df.to_parquet(output_path, index=False)
logger.info(f"Extracted: {output_path}")
# r2 -> opcs4
input_path = DOWNLOADS_PATH / 'Standard' / 'V2' / 'OPCS4V3.DBF'
input_path = DOWNLOADS_PATH / "Standard" / "V2" / "OPCS4V3.DBF"
df = simpledbf.Dbf5(input_path).to_dataframe()
df = df[["READ_CODE", "TARG_CODE"]]
df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "opcs4"})
df = df[~df["opcs4"].str.match("^.*-.*$")] # remove codes with '-'
df = df[~df["read2"].str.match("^.*-.*$")] # remove codes with '-'
output_path = PROCESSED_PATH / 'read2_to_opcs4.parquet'
output_path = PROCESSED_PATH / "read2_to_opcs4.parquet"
df.to_parquet(output_path, index=False)
logger.info(f"Extracted: {output_path}")
# r3 only
input_path = DOWNLOADS_PATH / 'Standard' / 'V3' / 'ANCESTOR.DBF'
input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ANCESTOR.DBF"
df = simpledbf.Dbf5(input_path).to_dataframe()
df = pd.concat([df['READCODE'], df['DESCENDANT']])
df = pd.concat([df["READCODE"], df["DESCENDANT"]])
df = pd.DataFrame(df.drop_duplicates())
df = df.rename(columns={0: "read3"})
output_path = PROCESSED_PATH / 'read3.parquet'
output_path = PROCESSED_PATH / "read3.parquet"
df.to_parquet(output_path, index=False)
logger.info(f"Extracted: {output_path}")
# r3 -> icd10
input_path = DOWNLOADS_PATH / 'Standard' / 'V3' / 'ICD10.DBF'
input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ICD10.DBF"
df = simpledbf.Dbf5(input_path).to_dataframe()
df = df[["READ_CODE", "TARG_CODE"]]
df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "icd10"})
df = df[~df["icd10"].str.match("^.*-.*$")] # remove codes with '-'
df = df[~df["read3"].str.match("^.*-.*$")] # remove codes with '-'
output_path = PROCESSED_PATH / 'read3_to_icd10.parquet'
output_path = PROCESSED_PATH / "read3_to_icd10.parquet"
df.to_parquet(output_path, index=False)
logger.info(f"Extracted: {output_path}")
......@@ -238,23 +298,30 @@ def extract_nhs_read_browser():
# dbf = simpledbf.Dbf5('build/maps/downloads/Standard/V3/ICD9V3.DBF')
# r3 -> opcs4
input_path = DOWNLOADS_PATH / 'Standard' / 'V3' / 'OPCS4V3.DBF'
input_path = DOWNLOADS_PATH / "Standard" / "V3" / "OPCS4V3.DBF"
df = simpledbf.Dbf5(input_path).to_dataframe()
df = df[["READ_CODE", "TARG_CODE"]]
df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "opcs4"})
df = df[~df["opcs4"].str.match("^.*-.*$")] # remove codes with '-'
df = df[~df["read3"].str.match("^.*-.*$")] # remove codes with '-'
output_path = PROCESSED_PATH / 'read3_to_opcs4.parquet'
output_path = PROCESSED_PATH / "read3_to_opcs4.parquet"
df.to_parquet(output_path, index=False)
logger.info(f"Extracted: {output_path}")
def create_map_directories():
"""Create map directories."""
# Check if build directory exists
create_map_dirs = False
if VOCAB_PATH.exists():
user_input = input(f"The map directory {VOCAB_PATH} already exists. Do you want to download and process trud data again? (y/n): ").strip().lower()
user_input = (
input(
f"The map directory {VOCAB_PATH} already exists. Do you want to download and process trud data again? (y/n): "
)
.strip()
.lower()
)
if user_input == "y":
# delete all build files
shutil.rmtree(VOCAB_PATH)
......@@ -271,13 +338,16 @@ def create_map_directories():
DOWNLOADS_PATH.mkdir(parents=True, exist_ok=True)
PROCESSED_PATH.mkdir(parents=True, exist_ok=True)
def install():
logger.info(f"Installing TRUD")
# get TRUD api key from environment variable
api_key = os.getenv("ACMC_TRUD_API_KEY")
if not api_key:
raise ValueError("TRUD API KEY not found. Set the ACMC_TRUD_API_KEY environment variable.")
raise ValueError(
"TRUD API KEY not found. Set the ACMC_TRUD_API_KEY environment variable."
)
create_map_directories()
......@@ -306,7 +376,7 @@ def install():
"name": "NHS Read Browser",
"hash": "1FFF2CBF11D0E6D7FC6CC6F13DD52D2F459095C3D83A3F754E6C359F16913C5E",
"extract": extract_nhs_read_browser,
}
},
# TODO: Download BNF from separate site? https://www.nhsbsa.nhs.uk/sites/default/files/2024-10/BNF%20Snomed%20Mapping%20data%2020241016.zip
]
......@@ -328,7 +398,9 @@ def install():
# Process each release in reverse order
for release_ordinal, release in enumerate(releases[::-1], 1):
# Download archive file
file_destination = download_release_file(item_id, release_ordinal, release, "archive")
file_destination = download_release_file(
item_id, release_ordinal, release, "archive"
)
# Optional files
# if items.checksum:
......
......@@ -28,16 +28,19 @@ dependencies = [
"lxml",
"numpy",
"openpyxl",
"pandas-stubs",
"pluggy",
"pyarrow",
"pyomop",
"tables",
"pytest",
"pyyaml",
"requests",
"simpledbf",
"smmap",
"sqlalchemy",
"pyyaml"
"types-PyYAML",
"types-requests"
]
[project.scripts]
......@@ -59,6 +62,7 @@ dependencies = [
[tool.hatch.envs.dev]
dependencies = [
"pydocstyle",
"pytest",
"black",
"mypy"
......@@ -71,3 +75,6 @@ include = ["acmc/**"] # Ensure only the acmc package is included
include = [
"acmc/**",
]
[tool.mypy]
ignore_missing_imports = true
\ No newline at end of file
......@@ -10,6 +10,7 @@ from acmc import trud, omop, main, logging_config as lc
# setup logging
logger = lc.setup_logger()
@pytest.fixture
def tmp_dir():
# Setup tmp directory
......@@ -22,27 +23,34 @@ def tmp_dir():
# Remove the directory after the test finishes
shutil.rmtree(temp_dir)
@pytest.fixture
def logger():
logger = logging.getLogger('acmc_logger')
logger = logging.getLogger("acmc_logger")
logger.setLevel(logging.DEBUG)
stream_handler = logging.StreamHandler(sys.stdout)
logger.addHandler(stream_handler)
def test_phen_init_local_specified(tmp_dir, monkeypatch, caplog):
with caplog.at_level(logging.DEBUG):
phen_path = tmp_dir / "phen"
monkeypatch.setattr(sys, "argv", ["main.py", "phen", "init", "-d", str(phen_path.resolve())])
monkeypatch.setattr(
sys, "argv", ["main.py", "phen", "init", "-d", str(phen_path.resolve())]
)
# Mock input() to return "yes" to the question about reinitialising the directory
monkeypatch.setattr("builtins.input", lambda _: "y")
main.main()
assert "Phenotype initialised successfully" in caplog.text
def test_phen_workflow(tmp_dir, monkeypatch, caplog):
with caplog.at_level(logging.DEBUG):
phen_path = tmp_dir / "phen"
phen_path = phen_path.resolve()
monkeypatch.setattr(sys, "argv", ["main.py", "phen", "init", "-d", str(phen_path.resolve())])
monkeypatch.setattr(
sys, "argv", ["main.py", "phen", "init", "-d", str(phen_path.resolve())]
)
# Mock input() to return "yes" to the question about reinitialising the directory
monkeypatch.setattr("builtins.input", lambda _: "y")
main.main()
......@@ -51,8 +59,8 @@ def test_phen_workflow(tmp_dir, monkeypatch, caplog):
with caplog.at_level(logging.DEBUG):
# validate phenotype
# copy examples across
shutil.rmtree(phen_path / 'codes')
ex_path = Path('./examples').resolve()
shutil.rmtree(phen_path / "codes")
ex_path = Path("./examples").resolve()
for item in ex_path.iterdir():
source = ex_path / item.name
destination = phen_path / item.name
......@@ -61,32 +69,74 @@ def test_phen_workflow(tmp_dir, monkeypatch, caplog):
else:
shutil.copy(source, destination)
monkeypatch.setattr(sys, "argv", ["main.py", "phen", "validate", "-d", str(phen_path.resolve())])
monkeypatch.setattr(
sys, "argv", ["main.py", "phen", "validate", "-d", str(phen_path.resolve())]
)
main.main()
assert "Phenotype validated successfully" in caplog.text
# map phenotype
for code_type in ["read2", "read3", "snomed"]:
with caplog.at_level(logging.DEBUG):
monkeypatch.setattr(sys, "argv", ["main.py", "phen", "map", "-d", str(phen_path.resolve()), "-t", code_type])
monkeypatch.setattr(
sys,
"argv",
[
"main.py",
"phen",
"map",
"-d",
str(phen_path.resolve()),
"-t",
code_type,
],
)
main.main()
assert "Phenotype processed successfully" in caplog.text
# publish phenotype
with caplog.at_level(logging.DEBUG):
monkeypatch.setattr(sys, "argv", ["main.py", "phen", "publish", "-d", str(phen_path.resolve())])
monkeypatch.setattr(
sys, "argv", ["main.py", "phen", "publish", "-d", str(phen_path.resolve())]
)
main.main()
assert "Phenotype published successfully" in caplog.text
# copy phenotype'
with caplog.at_level(logging.DEBUG):
monkeypatch.setattr(sys, "argv", ["main.py", "phen", "copy", "-d", str(phen_path.resolve()), "-td", str(tmp_dir.resolve()), "-v", "v1.0.3"])
monkeypatch.setattr(
sys,
"argv",
[
"main.py",
"phen",
"copy",
"-d",
str(phen_path.resolve()),
"-td",
str(tmp_dir.resolve()),
"-v",
"v1.0.3",
],
)
main.main()
assert "Phenotype copied successfully" in caplog.text
# diff phenotype
with caplog.at_level(logging.DEBUG):
old_path = tmp_dir / "v1.0.3"
monkeypatch.setattr(sys, "argv", ["main.py", "phen", "diff", "-d", str(phen_path.resolve()), "-old", str(old_path.resolve())])
monkeypatch.setattr(
sys,
"argv",
[
"main.py",
"phen",
"diff",
"-d",
str(phen_path.resolve()),
"-old",
str(old_path.resolve()),
],
)
main.main()
assert "Phenotypes diff'd successfully" in caplog.text
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment