diff --git a/acmc/__init__.py b/acmc/__init__.py index ab83ebf60d564f37540e6bf8ae6e294e926494a9..80d81b35b54f7847c0651756af8af3db710a6d8e 100644 --- a/acmc/__init__.py +++ b/acmc/__init__.py @@ -1,5 +1,3 @@ -try: - from importlib.metadata import version -except ImportError: # Python <3.8 - from pkg_resources import get_distribution as version +from importlib.metadata import version + __version__ = version("acmc") diff --git a/acmc/__main__.py b/acmc/__main__.py index 6dc525cf7b30589b5905da70238331078ae90005..20251db632eb3387c5b819e7e07c54cf203700c5 100644 --- a/acmc/__main__.py +++ b/acmc/__main__.py @@ -1,4 +1,4 @@ from acmc.main import main if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/acmc/logging_config.py b/acmc/logging_config.py index b9bb42ecfe5b9c25f7ccec540e3edba36fa51a10..735365a38a5ac3d653f2ea04d48c93a94d7815ba 100644 --- a/acmc/logging_config.py +++ b/acmc/logging_config.py @@ -3,52 +3,57 @@ import logging DEFAULT_LOG_FILE = "acmc.log" -# TODO: Determine if bcolours is still needed considering use of logging not print -class bcolors: #for printing coloured text - HEADER = '\033[95m' - OKBLUE = '\033[94m' - OKCYAN = '\033[96m' - OKGREEN = '\033[92m' - WARNING = '\033[93m' - FAIL = '\033[91m' - ENDC = '\033[0m' - BOLD = '\033[1m' - UNDERLINE = '\033[4m' + +# TODO: Determine if bcolours is still needed considering use of logging not print +class bcolors: # for printing coloured text + HEADER = "\033[95m" + OKBLUE = "\033[94m" + OKCYAN = "\033[96m" + OKGREEN = "\033[92m" + WARNING = "\033[93m" + FAIL = "\033[91m" + ENDC = "\033[0m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + def setup_logger(log_level=logging.INFO): - """Sets up logger as a singleton outputing to file and sysout syserr""" - # Create a logger - logger = logging.getLogger('acmc_logger') - logger.setLevel(logging.INFO) - - if not logger.hasHandlers(): - #Create a file handler that logs to a file - file_handler = logging.FileHandler(DEFAULT_LOG_FILE) - file_handler.setLevel(logging.INFO) - - # Create a stream handler that prints to the console - stream_handler = logging.StreamHandler() - stream_handler.setLevel(logging.INFO) - - # Create a formatter for how the log messages should look - - # Add the formatter to both handlers - file_formatter = logging.Formatter('%(asctime)s - - %(levelname)s - %(message)s') - file_handler.setFormatter(file_formatter) - stream_formatter = logging.Formatter('[%(levelname)s] - %(message)s') - stream_handler.setFormatter(stream_formatter) - - # Add the handlers to the logger - logger.addHandler(file_handler) - logger.addHandler(stream_handler) - - return logger + """Sets up logger as a singleton outputing to file and sysout syserr""" + # Create a logger + logger = logging.getLogger("acmc_logger") + logger.setLevel(logging.INFO) + + if not logger.hasHandlers(): + # Create a file handler that logs to a file + file_handler = logging.FileHandler(DEFAULT_LOG_FILE) + file_handler.setLevel(logging.INFO) + + # Create a stream handler that prints to the console + stream_handler = logging.StreamHandler() + stream_handler.setLevel(logging.INFO) + + # Create a formatter for how the log messages should look + + # Add the formatter to both handlers + file_formatter = logging.Formatter( + "%(asctime)s - - %(levelname)s - %(message)s" + ) + file_handler.setFormatter(file_formatter) + stream_formatter = logging.Formatter("[%(levelname)s] - %(message)s") + stream_handler.setFormatter(stream_formatter) + + # Add the handlers to the logger + logger.addHandler(file_handler) + logger.addHandler(stream_handler) + + return logger + def set_log_level(log_level): - """Sets the log level for the acmc logger""" - logger = logging.getLogger('acmc_logger') - logger.setLevel(log_level) # Set logger level - - # Also update handlers to match the new level - for handler in logger.handlers: - handler.setLevel(log_level) \ No newline at end of file + """Sets the log level for the acmc logger""" + logger = logging.getLogger("acmc_logger") + logger.setLevel(log_level) # Set logger level + + # Also update handlers to match the new level + for handler in logger.handlers: + handler.setLevel(log_level) diff --git a/acmc/main.py b/acmc/main.py index ed2b7aff3d6a14a79cb5881c1b67b7fee82c33b3..89df65dda2d1da11eef38260f2d760a6bbf9846c 100644 --- a/acmc/main.py +++ b/acmc/main.py @@ -8,206 +8,268 @@ from acmc import trud, omop, phen, logging_config as lc # setup logging logger = lc.setup_logger() -DEFAULT_WORKING_PATH = Path('./workspace') +DEFAULT_WORKING_PATH = Path("./workspace") + def trud_install(args): """Handle the `trud install` command.""" trud.install() + def omop_install(args): """Handle the `omop install` command.""" omop.install(args.omop_zip_file, args.version) + def omop_clear(args): """Handle the `omop clear` command.""" - omop.clear(omop.DB_PATH) + omop.clear(omop.DB_PATH) + def omop_delete(args): """Handle the `omop delete` command.""" omop.delete(omop.DB_PATH) + def phen_init(args): - """Handle the `phen init` command.""" - phen.init(args.phen_dir, args.remote_url) + """Handle the `phen init` command.""" + phen.init(args.phen_dir, args.remote_url) + def phen_validate(args): - """Handle the `phen validate` command.""" - phen.validate(args.phen_dir) + """Handle the `phen validate` command.""" + phen.validate(args.phen_dir) + def phen_map(args): - """Handle the `phen map` command.""" - phen.map(args.phen_dir, - args.target_coding) + """Handle the `phen map` command.""" + phen.map(args.phen_dir, args.target_coding) + def phen_export(args): - """Handle the `phen copy` command.""" - phen.export(args.phen_dir, - args.version) + """Handle the `phen copy` command.""" + phen.export(args.phen_dir, args.version) + def phen_publish(args): - """Handle the `phen publish` command.""" - phen.publish(args.phen_dir) + """Handle the `phen publish` command.""" + phen.publish(args.phen_dir) + def phen_copy(args): - """Handle the `phen copy` command.""" - phen.copy(args.phen_dir, - args.target_dir, - args.version) + """Handle the `phen copy` command.""" + phen.copy(args.phen_dir, args.target_dir, args.version) + def phen_diff(args): - """Handle the `phen diff` command.""" - phen.diff(args.phen_dir, - args.phen_dir_old) + """Handle the `phen diff` command.""" + phen.diff(args.phen_dir, args.phen_dir_old) + def main(): - parser = argparse.ArgumentParser(description="ACMC command-line tool") - parser.add_argument("--debug", action="store_true", help="Enable debug mode") - parser.add_argument("--version", action="version", version=f"acmc {acmc.__version__}") - + parser = argparse.ArgumentParser(description="ACMC command-line tool") + parser.add_argument("--debug", action="store_true", help="Enable debug mode") + parser.add_argument( + "--version", action="version", version=f"acmc {acmc.__version__}" + ) + # Top-level commands - subparsers = parser.add_subparsers(dest="command", required=True, help="Available commands") - - ### TRUD Command ### - trud_parser = subparsers.add_parser("trud", help="TRUD commands") - trud_subparsers = trud_parser.add_subparsers(dest="subcommand", required=True, help="TRUD subcommands") - - # trud install - trud_install_parser = trud_subparsers.add_parser("install", help="Install TRUD components") - trud_install_parser.set_defaults(func=trud_install) - - ### OMOP Command ### - omop_parser = subparsers.add_parser("omop", help="OMOP commands") - omop_subparsers = omop_parser.add_subparsers(dest="subcommand", required=True, help="OMOP subcommands") - - # omop install - omop_install_parser = omop_subparsers.add_parser("install", help="Install OMOP codes within database") - omop_install_parser.add_argument("-f", - "--omop-zip-file", - required=True, - help="Path to downloaded OMOP zip file") - omop_install_parser.add_argument("-v", - "--version", - required=True, - help="OMOP vocabularies release version") - omop_install_parser.set_defaults(func=omop_install) - - # omop clear - omop_clear_parser = omop_subparsers.add_parser("clear", help="Clear OMOP data from database") - omop_clear_parser.set_defaults(func=omop_clear) - - # omop delete - omop_delete_parser = omop_subparsers.add_parser("delete", help="Delete OMOP database") - omop_delete_parser.set_defaults(func=omop_delete) - - ### PHEN Command ### - phen_parser = subparsers.add_parser("phen", help="Phen commands") - phen_subparsers = phen_parser.add_subparsers(dest="subcommand", required=True, help="Phen subcommands") - - # phen init - phen_init_parser = phen_subparsers.add_parser("init", help="Initiatise phenotype directory") - phen_init_parser.add_argument("-d", - "--phen-dir", - type=str, - default=str(phen.DEFAULT_PHEN_PATH.resolve()), - help="Phenotype workspace directory") - phen_init_parser.add_argument("-r", - "--remote_url", - help="URL to remote git repository") - phen_init_parser.set_defaults(func=phen_init) - - # phen validate - phen_validate_parser = phen_subparsers.add_parser("validate", help="Validate phenotype configuration") - phen_validate_parser.add_argument("-d", - "--phen-dir", - type=str, - default=str(phen.DEFAULT_PHEN_PATH.resolve()), - help="Phenotype workspace directory") - phen_validate_parser.set_defaults(func=phen_validate) - - # phen map - phen_map_parser = phen_subparsers.add_parser("map", help="Process phen mapping") - phen_map_parser.add_argument("-d", - "--phen-dir", - type=str, - default=str(phen.DEFAULT_PHEN_PATH.resolve()), - help="Phenotype workspace directory") - phen_map_parser.add_argument("-t", - "--target-coding", - required=True, - choices=['read2', 'read3', 'icd10', 'snomed', 'opcs4'], - help="Specify the target coding (read2, read3, icd10, snomed, opcs4)") - phen_map_parser.add_argument("-o", - "--output", - choices=["csv", "omop"], - nargs="+", # allows one or more values - default=["csv"], # default to CSV if not specified - help="Specify output format(s): 'csv', 'omop', or both (default: csv)") - phen_map_parser.set_defaults(func=phen_map) - - # phen export - phen_export_parser = phen_subparsers.add_parser("export", help="Export phen to OMOP database") - phen_export_parser.add_argument("-d", - "--phen-dir", - type=str, - default=str(phen.DEFAULT_PHEN_PATH.resolve()), - help="Phenotype workspace directory") - phen_export_parser.add_argument("-v", - "--version", - type=str, - default='latest', - help="Phenotype version to export, defaults to the latest version") - phen_export_parser.set_defaults(func=phen_export) - - # phen publish - phen_publish_parser = phen_subparsers.add_parser("publish", help="Publish phenotype configuration") - phen_publish_parser.add_argument("-d", - "--phen-dir", - type=str, - default=str(phen.DEFAULT_PHEN_PATH.resolve()), - help="Phenotype workspace directory") - phen_publish_parser.set_defaults(func=phen_publish) - - # phen copy - phen_copy_parser = phen_subparsers.add_parser("copy", help="Publish phenotype configuration") - phen_copy_parser.add_argument("-d", - "--phen-dir", - type=str, - default=str(phen.DEFAULT_PHEN_PATH.resolve()), - help="Phenotype workspace directory") - phen_copy_parser.add_argument("-td", - "--target-dir", - type=str, - default=str(DEFAULT_WORKING_PATH.resolve()), - help="Target directory for the copy") - phen_copy_parser.add_argument("-v", - "--version", - type=str, - default='latest', - help="Phenotype version to copy, defaults to the latest version") - phen_copy_parser.set_defaults(func=phen_copy) - - # phen diff - phen_diff_parser = phen_subparsers.add_parser("diff", help="Publish phenotype configuration") - phen_diff_parser.add_argument("-d", - "--phen-dir", - type=str, - default=str(phen.DEFAULT_PHEN_PATH.resolve()), - help="Directory for the new phenotype version") - phen_diff_parser.add_argument("-old", - "--phen-dir-old", - required=True, - help="Directory of the old phenotype version that is compared to the new one") - phen_diff_parser.set_defaults(func=phen_diff) - - # Parse arguments - args = parser.parse_args() - - # setup logging - if(args.debug): - lc.set_log_level(logging.DEBUG) - - # Call the function associated with the command - args.func(args) + subparsers = parser.add_subparsers( + dest="command", required=True, help="Available commands" + ) + + ### TRUD Command ### + trud_parser = subparsers.add_parser("trud", help="TRUD commands") + trud_subparsers = trud_parser.add_subparsers( + dest="subcommand", required=True, help="TRUD subcommands" + ) + + # trud install + trud_install_parser = trud_subparsers.add_parser( + "install", help="Install TRUD components" + ) + trud_install_parser.set_defaults(func=trud_install) + + ### OMOP Command ### + omop_parser = subparsers.add_parser("omop", help="OMOP commands") + omop_subparsers = omop_parser.add_subparsers( + dest="subcommand", required=True, help="OMOP subcommands" + ) + + # omop install + omop_install_parser = omop_subparsers.add_parser( + "install", help="Install OMOP codes within database" + ) + omop_install_parser.add_argument( + "-f", "--omop-zip-file", required=True, help="Path to downloaded OMOP zip file" + ) + omop_install_parser.add_argument( + "-v", "--version", required=True, help="OMOP vocabularies release version" + ) + omop_install_parser.set_defaults(func=omop_install) + + # omop clear + omop_clear_parser = omop_subparsers.add_parser( + "clear", help="Clear OMOP data from database" + ) + omop_clear_parser.set_defaults(func=omop_clear) + + # omop delete + omop_delete_parser = omop_subparsers.add_parser( + "delete", help="Delete OMOP database" + ) + omop_delete_parser.set_defaults(func=omop_delete) + + ### PHEN Command ### + phen_parser = subparsers.add_parser("phen", help="Phen commands") + phen_subparsers = phen_parser.add_subparsers( + dest="subcommand", required=True, help="Phen subcommands" + ) + + # phen init + phen_init_parser = phen_subparsers.add_parser( + "init", help="Initiatise phenotype directory" + ) + phen_init_parser.add_argument( + "-d", + "--phen-dir", + type=str, + default=str(phen.DEFAULT_PHEN_PATH.resolve()), + help="Phenotype workspace directory", + ) + phen_init_parser.add_argument( + "-r", "--remote_url", help="URL to remote git repository" + ) + phen_init_parser.set_defaults(func=phen_init) + + # phen validate + phen_validate_parser = phen_subparsers.add_parser( + "validate", help="Validate phenotype configuration" + ) + phen_validate_parser.add_argument( + "-d", + "--phen-dir", + type=str, + default=str(phen.DEFAULT_PHEN_PATH.resolve()), + help="Phenotype workspace directory", + ) + phen_validate_parser.set_defaults(func=phen_validate) + + # phen map + phen_map_parser = phen_subparsers.add_parser("map", help="Process phen mapping") + phen_map_parser.add_argument( + "-d", + "--phen-dir", + type=str, + default=str(phen.DEFAULT_PHEN_PATH.resolve()), + help="Phenotype workspace directory", + ) + phen_map_parser.add_argument( + "-t", + "--target-coding", + required=True, + choices=["read2", "read3", "icd10", "snomed", "opcs4"], + help="Specify the target coding (read2, read3, icd10, snomed, opcs4)", + ) + phen_map_parser.add_argument( + "-o", + "--output", + choices=["csv", "omop"], + nargs="+", # allows one or more values + default=["csv"], # default to CSV if not specified + help="Specify output format(s): 'csv', 'omop', or both (default: csv)", + ) + phen_map_parser.set_defaults(func=phen_map) + + # phen export + phen_export_parser = phen_subparsers.add_parser( + "export", help="Export phen to OMOP database" + ) + phen_export_parser.add_argument( + "-d", + "--phen-dir", + type=str, + default=str(phen.DEFAULT_PHEN_PATH.resolve()), + help="Phenotype workspace directory", + ) + phen_export_parser.add_argument( + "-v", + "--version", + type=str, + default="latest", + help="Phenotype version to export, defaults to the latest version", + ) + phen_export_parser.set_defaults(func=phen_export) + + # phen publish + phen_publish_parser = phen_subparsers.add_parser( + "publish", help="Publish phenotype configuration" + ) + phen_publish_parser.add_argument( + "-d", + "--phen-dir", + type=str, + default=str(phen.DEFAULT_PHEN_PATH.resolve()), + help="Phenotype workspace directory", + ) + phen_publish_parser.set_defaults(func=phen_publish) + + # phen copy + phen_copy_parser = phen_subparsers.add_parser( + "copy", help="Publish phenotype configuration" + ) + phen_copy_parser.add_argument( + "-d", + "--phen-dir", + type=str, + default=str(phen.DEFAULT_PHEN_PATH.resolve()), + help="Phenotype workspace directory", + ) + phen_copy_parser.add_argument( + "-td", + "--target-dir", + type=str, + default=str(DEFAULT_WORKING_PATH.resolve()), + help="Target directory for the copy", + ) + phen_copy_parser.add_argument( + "-v", + "--version", + type=str, + default="latest", + help="Phenotype version to copy, defaults to the latest version", + ) + phen_copy_parser.set_defaults(func=phen_copy) + + # phen diff + phen_diff_parser = phen_subparsers.add_parser( + "diff", help="Publish phenotype configuration" + ) + phen_diff_parser.add_argument( + "-d", + "--phen-dir", + type=str, + default=str(phen.DEFAULT_PHEN_PATH.resolve()), + help="Directory for the new phenotype version", + ) + phen_diff_parser.add_argument( + "-old", + "--phen-dir-old", + required=True, + help="Directory of the old phenotype version that is compared to the new one", + ) + phen_diff_parser.set_defaults(func=phen_diff) + + # Parse arguments + args = parser.parse_args() + + # setup logging + if args.debug: + lc.set_log_level(logging.DEBUG) + + # Call the function associated with the command + args.func(args) + if __name__ == "__main__": main() diff --git a/acmc/omop.py b/acmc/omop.py index 0d03f4e56c28732ed26d63cf0d70ae2aff9dd3ee..90b621b1d028d6f51a327286c0a1574363e129bf 100644 --- a/acmc/omop.py +++ b/acmc/omop.py @@ -9,209 +9,223 @@ import json import yaml from pathlib import Path - from acmc import logging_config # setup logging logger = logging_config.setup_logger() # constants -VOCAB_PATH = Path('./vocab/omop') -DB_PATH = VOCAB_PATH / 'omop_54.sqlite' -VERSION_FILE = 'omop_version.yaml' +VOCAB_PATH = Path("./vocab/omop") +DB_PATH = VOCAB_PATH / "omop_54.sqlite" +VERSION_FILE = "omop_version.yaml" VERSION_PATH = VOCAB_PATH / VERSION_FILE -EXPORT_FILE = 'omop_export.db' +EXPORT_FILE = "omop_export.db" vocabularies = { - "source": "OHDSI Athena", - "url": "https://athena.ohdsi.org/vocabulary/list", - "version": "", - "vocabularies": [ - { "id": 1, "name": "SNOMED"}, - { "id": 2, "name": "ICD9CM"}, - { "id": 17, "name": "Readv2"}, - { "id": 21, "name": "ATC"}, - { "id": 55, "name": "OPCS4"}, - { "id": 57, "name": "HES Specialty"}, - { "id": 70, "name": "ICD10CM"}, - { "id": 75, "name": "dm+d"}, - { "id": 144, "name": "UK Biobank"}, - { "id": 154, "name": "NHS Ethnic Category"}, - { "id": 155, "name": "NHS Place of Service"} - ], - "tables": [] + "source": "OHDSI Athena", + "url": "https://athena.ohdsi.org/vocabulary/list", + "version": "", + "vocabularies": [ + {"id": 1, "name": "SNOMED"}, + {"id": 2, "name": "ICD9CM"}, + {"id": 17, "name": "Readv2"}, + {"id": 21, "name": "ATC"}, + {"id": 55, "name": "OPCS4"}, + {"id": 57, "name": "HES Specialty"}, + {"id": 70, "name": "ICD10CM"}, + {"id": 75, "name": "dm+d"}, + {"id": 144, "name": "UK Biobank"}, + {"id": 154, "name": "NHS Ethnic Category"}, + {"id": 155, "name": "NHS Place of Service"}, + ], + "tables": [], } omop_vocab_types = { - "read2": "Read", - "read3": None, - "icd10": "ICD10CM", - "snomed": "SNOMED", - "opcs4": "OPCS4", - "atc": "ATC", - "med": None, - "cprd": None, + "read2": "Read", + "read3": None, + "icd10": "ICD10CM", + "snomed": "SNOMED", + "opcs4": "OPCS4", + "atc": "ATC", + "med": None, + "cprd": None, } -#Populate SQLite3 Database with default OMOP CONCEPTS -def install (omop_zip_file, version): - """Installs the OMOP release csv files in a file-based sql database""" - logger.info(f"Installing OMOP from zip file: {omop_zip_file}") - omop_zip_path = Path(omop_zip_file) - - # Check if the file exists and is a ZIP file - if not omop_zip_path.exists(): - msg = f"{omop_zip_path} does not exist." - logger.error(msg) - raise ValueError(msg) - if not zipfile.is_zipfile(omop_zip_path): - msg = f"Error: {omop_zip_path} is not a valid ZIP file." - logger.error(msg) - raise ValueError(msg) - - # check codes directory exists and if not create it - if not VOCAB_PATH.exists(): - VOCAB_PATH.mkdir(parents=True) - logger.debug(f"OMOP directory '{VOCAB_PATH}' created.") - else: - # removing existing OMOP files - csv_files = list(VOCAB_PATH.glob("*.csv")) - for file in csv_files: - file.unlink() - logger.debug(f"Deleted OMOP csv file: {file}") - - # Extract ZIP contents - with zipfile.ZipFile(omop_zip_path, 'r') as zip_ref: - zip_ref.extractall(VOCAB_PATH) - logger.info(f"Extracted OMOP zip file {omop_zip_path} to {VOCAB_PATH}/") - - # connect to database, if it does not exist it will be created - conn = sqlite3.connect(DB_PATH) - # Iterate through files in the folder - csv_files = list(VOCAB_PATH.glob("*.csv")) - total_tables_count = len(csv_files) - table_count = 1 - for filename in csv_files: - try: - logger.info(f"Processing {table_count} of {total_tables_count} tables: {filename}") - # read the CSV file with the specified delimiter - df = pd.read_csv(filename, delimiter="\t", low_memory=False) - - # export Table to sqlite db - df.to_sql(filename.stem, conn, if_exists='replace', index=False) - - # add to the metadata - vocabularies["tables"].append(filename.stem) - table_count = table_count + 1 - except Exception as e: - raise Exception(f"Error reading file {filename}: {e}") - - conn.close() - - # write version file - write_version_file(version) - - logger.info(f"OMOP installation completed") + +# Populate SQLite3 Database with default OMOP CONCEPTS +def install(omop_zip_file, version): + """Installs the OMOP release csv files in a file-based sql database""" + logger.info(f"Installing OMOP from zip file: {omop_zip_file}") + omop_zip_path = Path(omop_zip_file) + + # Check if the file exists and is a ZIP file + if not omop_zip_path.exists(): + msg = f"{omop_zip_path} does not exist." + logger.error(msg) + raise ValueError(msg) + if not zipfile.is_zipfile(omop_zip_path): + msg = f"Error: {omop_zip_path} is not a valid ZIP file." + logger.error(msg) + raise ValueError(msg) + + # check codes directory exists and if not create it + if not VOCAB_PATH.exists(): + VOCAB_PATH.mkdir(parents=True) + logger.debug(f"OMOP directory '{VOCAB_PATH}' created.") + else: + # removing existing OMOP files + csv_files = list(VOCAB_PATH.glob("*.csv")) + for file in csv_files: + file.unlink() + logger.debug(f"Deleted OMOP csv file: {file}") + + # Extract ZIP contents + with zipfile.ZipFile(omop_zip_path, "r") as zip_ref: + zip_ref.extractall(VOCAB_PATH) + logger.info(f"Extracted OMOP zip file {omop_zip_path} to {VOCAB_PATH}/") + + # connect to database, if it does not exist it will be created + conn = sqlite3.connect(DB_PATH) + # Iterate through files in the folder + csv_files = list(VOCAB_PATH.glob("*.csv")) + total_tables_count = len(csv_files) + table_count = 1 + for filename in csv_files: + try: + logger.info( + f"Processing {table_count} of {total_tables_count} tables: {filename}" + ) + # read the CSV file with the specified delimiter + df = pd.read_csv(filename, delimiter="\t", low_memory=False) + + # export Table to sqlite db + df.to_sql(filename.stem, conn, if_exists="replace", index=False) + + # add to the metadata + vocabularies["tables"].append(filename.stem) + table_count = table_count + 1 + except Exception as e: + raise Exception(f"Error reading file {filename}: {e}") + + conn.close() + + # write version file + write_version_file(version) + + logger.info(f"OMOP installation completed") + def write_version_file(version): - """Writes the OMOP vocaburaries and version to a file""" - vocabularies['version'] = version - with open(VERSION_PATH, "w") as file: - yaml.dump(vocabularies, file, default_flow_style=False, sort_keys=False) - + """Writes the OMOP vocaburaries and version to a file""" + vocabularies["version"] = version + with open(VERSION_PATH, "w") as file: + yaml.dump(vocabularies, file, default_flow_style=False, sort_keys=False) + + def clear(db_path): - """Clears the OMOP sql database""" - logger.info(f"Clearing OMOP data from database") - omop_db_path = Path(db_path) - if not omop_db_path.is_file(): - raise FileNotFoundError(f"Error: OMOP DB file '{omop_db_path}' does not exist.") - conn = sqlite3.connect(db_path) - cur = conn.cursor() - cur.execute("SELECT name FROM sqlite_master WHERE type='table';") - - # Fetch and print table names - tables = cur.fetchall() - logger.debug("Tables in database:", [table[0] for table in tables]) - - #cur.execute("DROP TABLE CONCEPT_SET;") - #cur.execute("DROP TABLE CONCEPT_SET_ITEM;") - - conn.close() - logger.info(f"OMOP database cleared") - + """Clears the OMOP sql database""" + logger.info(f"Clearing OMOP data from database") + omop_db_path = Path(db_path) + if not omop_db_path.is_file(): + raise FileNotFoundError(f"Error: OMOP DB file '{omop_db_path}' does not exist.") + conn = sqlite3.connect(db_path) + cur = conn.cursor() + cur.execute("SELECT name FROM sqlite_master WHERE type='table';") + + # Fetch and print table names + tables = cur.fetchall() + logger.debug("Tables in database:", [table[0] for table in tables]) + + # cur.execute("DROP TABLE CONCEPT_SET;") + # cur.execute("DROP TABLE CONCEPT_SET_ITEM;") + + conn.close() + logger.info(f"OMOP database cleared") + + def delete(db_path): - """Deletes the OMOP sql database""" - logger.info(f"Deleting OMOP database") - omop_db_path = Path(db_path) - if not omop_db_path.is_file(): - raise FileNotFoundError(f"Error: OMOP DB file '{omop_db_path}' does not exist.") - - omop_db_path.unlink() - logger.info(f"OMOP database deleted") - + """Deletes the OMOP sql database""" + logger.info(f"Deleting OMOP database") + omop_db_path = Path(db_path) + if not omop_db_path.is_file(): + raise FileNotFoundError(f"Error: OMOP DB file '{omop_db_path}' does not exist.") + + omop_db_path.unlink() + logger.info(f"OMOP database deleted") + + def table_exists(cursor, table_name): - # Query to check if the table exists - cursor.execute( - """ + # Query to check if the table exists + cursor.execute( + """ SELECT name FROM sqlite_master WHERE type='table' AND name=? """, - (table_name,) - ) + (table_name,), + ) + + # Fetch the result + result = cursor.fetchone() + + return result is not None - # Fetch the result - result = cursor.fetchone() - - return result is not None def vocab_exists(cursor, vocab_id): - # Query to check if the table exists - cursor.execute( - """ + # Query to check if the table exists + cursor.execute( + """ SELECT vocabulary_id FROM VOCABULARY WHERE vocabulary_id=? """, - (vocab_id,) - ) - - # Fetch the result - result = cursor.fetchone() - - return result is not None + (vocab_id,), + ) + + # Fetch the result + result = cursor.fetchone() + + return result is not None + def concept_set_exist(cursor, concept_set_name): - - query = f"SELECT EXISTS (SELECT 1 FROM CONCEPT_SET WHERE concept_set_name = ?)" - cursor.execute(query, (concept_set_name,)) - - # 1 if exists, 0 otherwise - return cursor.fetchone()[0] == 1 + + query = f"SELECT EXISTS (SELECT 1 FROM CONCEPT_SET WHERE concept_set_name = ?)" + cursor.execute(query, (concept_set_name,)) + + # 1 if exists, 0 otherwise + return cursor.fetchone()[0] == 1 + def export(map_path, export_path, version, omop_metadata): - logger.debug(f"exporting with metadata {omop_metadata} at version {version}") - - # copy the baseline omop database - export_db_path = export_path / EXPORT_FILE - shutil.copy(DB_PATH, export_db_path) - - # connect to db - conn = sqlite3.connect(export_db_path) - cur = conn.cursor() - - #Create VOCABULARY - df_test = pd.DataFrame([{ - "vocabulary_id": omop_metadata['vocabulary_id'], - "vocabulary_name": omop_metadata['vocabulary_name'], - "vocabulary_reference": omop_metadata['vocabulary_reference'], - "vocabulary_version": version, - # "vocabulary_concept_id": 0, - }]) - df_test.to_sql("VOCABULARY", conn, if_exists='append', index=False) - - # Create CONCEPT_SET - cur.execute(""" + logger.debug(f"exporting with metadata {omop_metadata} at version {version}") + + # copy the baseline omop database + export_db_path = export_path / EXPORT_FILE + shutil.copy(DB_PATH, export_db_path) + + # connect to db + conn = sqlite3.connect(export_db_path) + cur = conn.cursor() + + # Create VOCABULARY + df_test = pd.DataFrame( + [ + { + "vocabulary_id": omop_metadata["vocabulary_id"], + "vocabulary_name": omop_metadata["vocabulary_name"], + "vocabulary_reference": omop_metadata["vocabulary_reference"], + "vocabulary_version": version, + # "vocabulary_concept_id": 0, + } + ] + ) + df_test.to_sql("VOCABULARY", conn, if_exists="append", index=False) + + # Create CONCEPT_SET + cur.execute( + """ CREATE TABLE CONCEPT_SET ( concept_set_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each concept set atlas_id INTEGER, -- Unique identifier generated by ATLAS @@ -219,77 +233,89 @@ def export(map_path, export_path, version, omop_metadata): concept_set_description TEXT, -- Optional description for the concept set vocabulary_id TEXT NOT NULL, -- Foreign key to VOCABULARY table FOREIGN KEY (vocabulary_id) REFERENCES VOCABULARY(vocabulary_id) - );""") - - # Create CONCEPT_SET_ITEM - cur.execute(""" + );""" + ) + + # Create CONCEPT_SET_ITEM + cur.execute( + """ CREATE TABLE CONCEPT_SET_ITEM ( concept_set_item_id INTEGER PRIMARY KEY AUTOINCREMENT, -- Unique identifier for each mapping concept_set_id INTEGER NOT NULL, -- Foreign key to CONCEPT_SET table concept_id INTEGER NOT NULL, -- Foreign key to CONCEPT table FOREIGN KEY (concept_set_id) REFERENCES CONCEPT_SET(concept_set_id), FOREIGN KEY (concept_id) REFERENCES CONCEPT(concept_id) - );""") - - - # read map files - map_files = list(map_path.glob("*.csv")) - total = len(map_files) - logger.info(f"Exporting {total} map files") - for index, map_file in enumerate(map_files): - logger.info(f"Processing {index+1} of {total}: {map_file}") - df = pd.read_csv(map_file) - - for concept_set_name, grp in df.groupby("CONCEPT_SET"): - - # create Concept_Set - if not concept_set_exist(cur, concept_set_name): - cur.execute(f"INSERT INTO CONCEPT_SET (concept_set_name, vocabulary_id) VALUES ('{concept_set_name}', '{omop_metadata['vocabulary_id']}');") - else: - logger.debug(f"Concept_set {concept_set_name} already exists") - #TODO: ask to remove old concept_set? - - # get Concept_set_Id - query = "SELECT concept_set_id FROM CONCEPT_SET WHERE concept_set_name = ? AND vocabulary_id = ?;" - target_code_type = map_file.stem - cur.execute(query, (concept_set_name, omop_metadata['vocabulary_id'], )) - # FAILS HERE WITH NONE REUR - logger.debug(f"target code type {target_code_type}") - logger.debug(f"omop code type {omop_vocab_types[target_code_type]}") - concept_set_id = cur.fetchone()[0] - logger.debug(f"concept set id {concept_set_id}") - - # get corresponing Concept_id (OMOP) for each Concept_code (e.g. SNOMED) - concept_codes = "'"+"', '".join(list(grp["CONCEPT"].astype(str)))+"'" - query = f"SELECT concept_id FROM CONCEPT WHERE vocabulary_id = ? AND concept_code IN ({concept_codes});" - cur.execute(query, (omop_vocab_types[target_code_type], )) - df_out = pd.DataFrame(cur.fetchall(), columns=["concept_id"]) - - if not len(grp) == len(df_out): - logger.error(f"ERROR: Some {omop_vocab_types[target_code_type]} Codes do not exist in OMOP Database") - - #Create Concept_set_item - df_out["concept_set_id"] = concept_set_id - df_out.to_sql("CONCEPT_SET_ITEM", conn, if_exists='append', index=False) - - # Output all tables to CSV - # Get the list of all tables - cur.execute("SELECT name FROM sqlite_master WHERE type='table';") - tables = cur.fetchall() # List of tables - - # Export each table to a separate CSV file - for table in tables: - table_name = table[0] # Extract table name - df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn) - output_file = f"{table_name}.csv" - output_path = export_path / output_file - df.to_csv(output_path, index=False) # Save as CSV - logger.info(f"Exported {table_name} to {table_name}.csv") - - conn.close() - - logger.debug(f"Created export db successfully") - - return export_db_path - - return export_db_path + );""" + ) + + # read map files + map_files = list(map_path.glob("*.csv")) + total = len(map_files) + logger.info(f"Exporting {total} map files") + for index, map_file in enumerate(map_files): + logger.info(f"Processing {index+1} of {total}: {map_file}") + df = pd.read_csv(map_file) + + for concept_set_name, grp in df.groupby("CONCEPT_SET"): + + # create Concept_Set + if not concept_set_exist(cur, concept_set_name): + cur.execute( + f"INSERT INTO CONCEPT_SET (concept_set_name, vocabulary_id) VALUES ('{concept_set_name}', '{omop_metadata['vocabulary_id']}');" + ) + else: + logger.debug(f"Concept_set {concept_set_name} already exists") + # TODO: ask to remove old concept_set? + + # get Concept_set_Id + query = "SELECT concept_set_id FROM CONCEPT_SET WHERE concept_set_name = ? AND vocabulary_id = ?;" + target_code_type = map_file.stem + cur.execute( + query, + ( + concept_set_name, + omop_metadata["vocabulary_id"], + ), + ) + # FAILS HERE WITH NONE REUR + logger.debug(f"target code type {target_code_type}") + logger.debug(f"omop code type {omop_vocab_types[target_code_type]}") + concept_set_id = cur.fetchone()[0] + logger.debug(f"concept set id {concept_set_id}") + + # get corresponing Concept_id (OMOP) for each Concept_code (e.g. SNOMED) + concept_codes = "'" + "', '".join(list(grp["CONCEPT"].astype(str))) + "'" + query = f"SELECT concept_id FROM CONCEPT WHERE vocabulary_id = ? AND concept_code IN ({concept_codes});" + cur.execute(query, (omop_vocab_types[target_code_type],)) + df_out = pd.DataFrame(cur.fetchall(), columns=["concept_id"]) + + if not len(grp) == len(df_out): + logger.error( + f"ERROR: Some {omop_vocab_types[target_code_type]} Codes do not exist in OMOP Database" + ) + + # Create Concept_set_item + df_out["concept_set_id"] = concept_set_id + df_out.to_sql("CONCEPT_SET_ITEM", conn, if_exists="append", index=False) + + # Output all tables to CSV + # Get the list of all tables + cur.execute("SELECT name FROM sqlite_master WHERE type='table';") + tables = cur.fetchall() # List of tables + + # Export each table to a separate CSV file + for table in tables: + table_name = table[0] # Extract table name + df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn) + output_file = f"{table_name}.csv" + output_path = export_path / output_file + df.to_csv(output_path, index=False) # Save as CSV + logger.info(f"Exported {table_name} to {table_name}.csv") + + conn.close() + + logger.debug(f"Created export db successfully") + + return export_db_path + + return export_db_path diff --git a/acmc/parse.py b/acmc/parse.py index 7595ecbafff83c37631dbbe8794809cf8a4bbe5a..a2996eddcdd982fe2335b8ccca559c850226b69a 100644 --- a/acmc/parse.py +++ b/acmc/parse.py @@ -1,413 +1,553 @@ import pandas as pd import numpy as np import os +from typing import Callable from acmc import trud, logging_config as lc # setup logging logger = lc.setup_logger() -class CodesError(): - """A class used in InvalidCodesException to report an error if a code parser check fails""" - def __init__(self, message, codes=None, codes_file=None, mask=None, code_type=None): - # initialise class variables with provided parameters - for key, value in locals().items(): - if key != "self": - setattr(self, key, value) + +class CodesError: + """A class used in InvalidCodesException to report an error if a code parser check fails""" + + def __init__(self, message, codes=None, codes_file=None, mask=None, code_type=None): + # initialise class variables with provided parameters + for key, value in locals().items(): + if key != "self": + setattr(self, key, value) + class InvalidCodesException(Exception): - """Custom exception class raised when invalid codes are found that cannot be resolved by processing""" - def __init__(self, error): - super().__init__(error.message) - self.error = error - -class Proto(): - """ - Define checks as list of 3 tuple: (Message, Condition, Process) - - Message = The name of the condition (what is printed and logged) - - Condition = True if Passed, and False if Failed - - Process = Aims to resolve all issues that stop condition from passing (Do not change index!) - """ - checks = [ - ( - "Not Empty", - lambda codes : pd.Series([len(codes) > 0]), #Should be true if passed - lambda codes, codes_file : self.raise_exception(Exception(f"Code list is empty {codes_file}")) #correts code, or logs, or throws error - ) - ] - - def __init__(self, name, trud_codes_path=None): - if trud_codes_path is not None: - if trud_codes_path.is_file(): - self.trud_codes_path = trud_codes_path - self.db = pd.read_parquet(self.trud_codes_path) - else: - raise FileNotFoundError(f"Error: Read2 code file '{trud_codes_path}' does not exist. Please ensure you have installed TRUD correctly") - - self.name = name - - def raise_exception(self, ex): - """ Raises an exception inside a lambda function. Python does not allow using raise statement inside lambda because lambda can only contain expressions, not statements. Using raise_exception not raise_ as it's more explict""" - raise ex - - def in_database(self, codes, db, col): - return codes.isin(db[col]) - - def process(self, codes, codes_file): - """ identify issues that do not pass and fix them with define/d process """ - errors = [] - # Iter through each item in check. - for msg, cond, fix in self.checks: - # Check if any codes fail the check to False - if not cond(codes).all(): - # Log the number of codes that failed - logger.debug(f"Check: {msg} {(~cond(codes)).sum()} failed, trying to fix") - # try fix errors by running lamba "process" function - try: - codes = fix(codes, codes_file) - logger.debug(f"Check: Fixed") - except InvalidCodesException as ex: - errors.append(ex.error) - else: - logger.debug(f"Check: passed") - - return codes, errors + """Custom exception class raised when invalid codes are found that cannot be resolved by processing""" + + def __init__(self, error): + super().__init__(error.message) + self.error = error + + +class Proto: + """ + Define checks as list of 3 tuple: (Message, Condition, Process) + - Message = The name of the condition (what is printed and logged) + - Condition = True if Passed, and False if Failed + - Process = Aims to resolve all issues that stop condition from passing (Do not change index!) + """ + + checks: list[ + tuple[ + str, # The description, e.g., "Not Empty" + Callable[ + [list], pd.Series, + ], # The first lambda function: takes a list and returns a pd.Series of booleans + Callable[ + [list, str], None, + ], # The second lambda function: takes a list and a string, and returns nothing + ] + ] - def verify(self, codes, codes_file): - """ verify codes in codes file """ - conds = np.array([]) - # Iter through each item in check. - for msg, cond, process in self.checks: - # run conditional check - out = cond(codes) - conds = np.append(conds, out.all()) + def __init__(self, name, trud_codes_path=None): + if trud_codes_path is not None: + if trud_codes_path.is_file(): + self.trud_codes_path = trud_codes_path + self.db = pd.read_parquet(self.trud_codes_path) + else: + raise FileNotFoundError( + f"Error: Read2 code file '{trud_codes_path}' does not exist. Please ensure you have installed TRUD correctly" + ) + + self.name = name + + def raise_exception(self, ex): + """Raises an exception inside a lambda function. Python does not allow using raise statement inside lambda because lambda can only contain expressions, not statements. Using raise_exception not raise_ as it's more explict""" + raise ex + + def in_database(self, codes, db, col): + return codes.isin(db[col]) + + def process(self, codes, codes_file): + """identify issues that do not pass and fix them with define/d process""" + errors = [] + # Iter through each item in check. + for msg, cond, fix in self.checks: + # Check if any codes fail the check to False + if not cond(codes).all(): + # Log the number of codes that failed + logger.debug( + f"Check: {msg} {(~cond(codes)).sum()} failed, trying to fix" + ) + # try fix errors by running lamba "process" function + try: + codes = fix(codes, codes_file) + logger.debug(f"Check: Fixed") + except InvalidCodesException as ex: + errors.append(ex.error) + else: + logger.debug(f"Check: passed") + + return codes, errors + + def verify(self, codes, codes_file): + """verify codes in codes file""" + conds = np.array([]) + + # Iter through each item in check. + for msg, cond, process in self.checks: + # run conditional check + out = cond(codes) + conds = np.append(conds, out.all()) + + return conds - return conds class Read2(Proto): - """ This Read2 class extends Proto, adding custom validation checks for a dataset of "Read2" codes. It ensures that the dataset is loaded, validates the codes based on several rules, and applies corrections or logs errors when necessary.""" - def __init__(self): - super().__init__('read2', trud.PROCESSED_PATH / 'read2.parquet') - - # validate checks - self.checks = [ - ( - # check codes are not empty, if empty throw an exception - "Not Empty", - lambda codes : pd.Series([len(codes) > 0]), - lambda codes, codes_file : self.raise_exception( - InvalidCodesException(CodesError(f"Code list is empty", - codes=codes, - codes_file=codes_file, - mask=None, - code_type=self.name))) - ), - ( - # check codes <5 characters, if too short pads it with . (dots) to reach 5 characters - "Too Short", - lambda codes : ~(codes.str.len() < 5), - lambda codes, codes_file : codes.str.pad(width=5, side='right', fillchar='.') - ), - ( - # check codes > 5 characters, If too long, truncates them to 5 characters - "Too Long", - lambda codes : ~(codes.str.len() > 5), - lambda codes, codes_file : codes.str[:5] - ), - ( - # checks codes contain numbers, or dots (.), if not logs invalid code error - "Alphanumeric Dot", - lambda codes : codes.str.match(r"^[a-zA-Z0-9.]+$"), - lambda codes, codes_file : self.raise_exception( - InvalidCodesException(CodesError(f"Illegal code format, not alphanumeric dot", - codes=codes, - codes_file=codes_file, - mask=codes.str.match(r"^[a-zA-Z0-9.]+$"), - code_type=self.name))) - ), - ( - # checks code exists in self.db (the Read2 dataset). If missing log invalid codes. - "In Database", - lambda codes : self.in_database(codes, self.db, self.name), - lambda codes, codes_file : self.raise_exception( - InvalidCodesException(CodesError(f"Codes do not exist in database", - codes=codes, - codes_file=codes_file, - mask=self.in_database(codes, self.db, self.name), - code_type=self.name))) - ) - ] - + """This Read2 class extends Proto, adding custom validation checks for a dataset of "Read2" codes. It ensures that the dataset is loaded, validates the codes based on several rules, and applies corrections or logs errors when necessary.""" + + def __init__(self): + super().__init__("read2", trud.PROCESSED_PATH / "read2.parquet") + + # validate checks + self.checks = [ + ( + # check codes are not empty, if empty throw an exception + "Not Empty", + lambda codes: pd.Series([len(codes) > 0]), + lambda codes, codes_file: self.raise_exception( + InvalidCodesException( + CodesError( + f"Code list is empty", + codes=codes, + codes_file=codes_file, + mask=None, + code_type=self.name, + ) + ) + ), + ), + ( + # check codes <5 characters, if too short pads it with . (dots) to reach 5 characters + "Too Short", + lambda codes: ~(codes.str.len() < 5), + lambda codes, codes_file: codes.str.pad( + width=5, side="right", fillchar="." + ), + ), + ( + # check codes > 5 characters, If too long, truncates them to 5 characters + "Too Long", + lambda codes: ~(codes.str.len() > 5), + lambda codes, codes_file: codes.str[:5], + ), + ( + # checks codes contain numbers, or dots (.), if not logs invalid code error + "Alphanumeric Dot", + lambda codes: codes.str.match(r"^[a-zA-Z0-9.]+$"), + lambda codes, codes_file: self.raise_exception( + InvalidCodesException( + CodesError( + f"Illegal code format, not alphanumeric dot", + codes=codes, + codes_file=codes_file, + mask=codes.str.match(r"^[a-zA-Z0-9.]+$"), + code_type=self.name, + ) + ) + ), + ), + ( + # checks code exists in self.db (the Read2 dataset). If missing log invalid codes. + "In Database", + lambda codes: self.in_database(codes, self.db, self.name), + lambda codes, codes_file: self.raise_exception( + InvalidCodesException( + CodesError( + f"Codes do not exist in database", + codes=codes, + codes_file=codes_file, + mask=self.in_database(codes, self.db, self.name), + code_type=self.name, + ) + ) + ), + ), + ] + + class Read3(Proto): - def __init__(self): - super().__init__('Read3', trud.PROCESSED_PATH / 'read3.parquet') - - self.checks = [ - ( - "Not Empty", - lambda codes : pd.Series([len(codes) > 0]), - lambda codes, codes_file : self.raise_exception( - InvalidCodesException(CodesError(f"Code list is empty", - codes=codes, - codes_file=codes_file, - mask=None, - code_type=self.name))) - ), - ( - "Too Short", - lambda codes : ~(codes.str.len() < 5), - lambda codes, codes_file : codes.str.pad(width=5, side='right', fillchar='.') - ), - ( - "Too Long", - lambda codes : ~(codes.str.len() > 5), - lambda codes, codes_file : codes.str[:5] - ), - ( - "Alphanumeric Dot", - lambda codes : codes.str.match(r"^[a-zA-Z0-9.]+$"), - lambda codes, codes_file : self.raise_exception( - InvalidCodesException(CodesError(f"QA Alphanumeric Dot", - codes=codes, - codes_file=codes_file, - check_regex=codes.str.match(r"^[a-zA-Z0-9.]+$"), - code_type=self.name))) - ), - ( - "In Database", - lambda codes : self.in_database(codes, self.db, self.name), - lambda codes, codes_file : self.raise_exception( - InvalidCodesException(CodesError(f"QA In Database", - codes=codes, - codes_file=codes_file, - check_regex=self.in_database(codes, self.db, self.name), - code_type=self.name))) - ), - ] - + def __init__(self): + super().__init__("Read3", trud.PROCESSED_PATH / "read3.parquet") + + self.checks = [ + ( + "Not Empty", + lambda codes: pd.Series([len(codes) > 0]), + lambda codes, codes_file: self.raise_exception( + InvalidCodesException( + CodesError( + f"Code list is empty", + codes=codes, + codes_file=codes_file, + mask=None, + code_type=self.name, + ) + ) + ), + ), + ( + "Too Short", + lambda codes: ~(codes.str.len() < 5), + lambda codes, codes_file: codes.str.pad( + width=5, side="right", fillchar="." + ), + ), + ( + "Too Long", + lambda codes: ~(codes.str.len() > 5), + lambda codes, codes_file: codes.str[:5], + ), + ( + "Alphanumeric Dot", + lambda codes: codes.str.match(r"^[a-zA-Z0-9.]+$"), + lambda codes, codes_file: self.raise_exception( + InvalidCodesException( + CodesError( + f"QA Alphanumeric Dot", + codes=codes, + codes_file=codes_file, + check_regex=codes.str.match(r"^[a-zA-Z0-9.]+$"), + code_type=self.name, + ) + ) + ), + ), + ( + "In Database", + lambda codes: self.in_database(codes, self.db, self.name), + lambda codes, codes_file: self.raise_exception( + InvalidCodesException( + CodesError( + f"QA In Database", + codes=codes, + codes_file=codes_file, + check_regex=self.in_database(codes, self.db, self.name), + code_type=self.name, + ) + ) + ), + ), + ] + + class Icd10(Proto): - def __init__(self): - super().__init__('icd10', trud.PROCESSED_PATH / 'icd10.parquet') - - self.checks = [ - ( - "Not Empty", - lambda codes : pd.Series([len(codes) > 0]), - lambda codes, codes_file : self.raise_exception( - InvalidCodesException(CodesError(f"Code list is empty {codes_file}", - codes=codes, - codes_file=codes_file, - mask=None, - code_type=self.name))) - ), - ( - "Too Short", - lambda codes : ~(codes.str.len() < 3), - lambda codes, codes_file : self.raise_exception( - InvalidCodesException(CodesError(f"QA Too Short", - codes=codes, - codes_file=codes_file, - mask=~(codes.str.len() < 3), - code_type=self.name))) - ), - ( - "Has Dot", - lambda codes : ~(codes.str.match(r".*\..*")), #check if contains dot - lambda codes, codes_file : codes.str.replace(".", "") #delete any dots in string - # lambda codes : codes.str.split('\.').apply(lambda ls: ls[0]) #only get part before dot - ), - ( - "Alphanumeric Capital", - lambda codes : codes.str.match(r"^[A-Z0-9]+$"), - lambda codes, codes_file : self.raise_exception( - InvalidCodesException(CodesError(f"QA Alphanumeric Capital", - codes=codes, - codes_file=codes_file, - mask=codes.str.match(r"^[A-Z0-9]+$"), - code_type=self.name))) - ), - ( - "In Database", - lambda codes : ~(~self.in_database(codes, self.db, self.name) & ~self.in_database(codes, self.db, self.name + "_alt")), - lambda codes, codes_file : self.raise_exception( - InvalidCodesException(CodesError(f"QA In Database", - codes=codes, - codes_file=codes_file, - mask=~(~self.in_database(codes, self.db, self.name) & ~self.in_database(codes, self.db, self.name+"_alt")), - code_type=self.name))) - ) -# ( -# "ICD10 Regex", -# lambda codes : codes.str.match("[a-zA-Z][0-9][0-9]\.?[a-zA-Z0-9]*$"), #Alpha, Num, Num , Dot?, 4xAlphNum* -# lambda codes : lc.log_invalid_code(codes, -# codes.str.match("[a-zA-Z][0-9][0-9]\.?[a-zA-Z0-9]*$"), #Log non-matching rows -# code_type="icd10", -# -# ) - ] - - def trim_icd10(codes): - codes = codes.str[:4] - return codes - - + def __init__(self): + super().__init__("icd10", trud.PROCESSED_PATH / "icd10.parquet") + + self.checks = [ + ( + "Not Empty", + lambda codes: pd.Series([len(codes) > 0]), + lambda codes, codes_file: self.raise_exception( + InvalidCodesException( + CodesError( + f"Code list is empty {codes_file}", + codes=codes, + codes_file=codes_file, + mask=None, + code_type=self.name, + ) + ) + ), + ), + ( + "Too Short", + lambda codes: ~(codes.str.len() < 3), + lambda codes, codes_file: self.raise_exception( + InvalidCodesException( + CodesError( + f"QA Too Short", + codes=codes, + codes_file=codes_file, + mask=~(codes.str.len() < 3), + code_type=self.name, + ) + ) + ), + ), + ( + "Has Dot", + lambda codes: ~(codes.str.match(r".*\..*")), # check if contains dot + lambda codes, codes_file: codes.str.replace( + ".", "" + ), # delete any dots in string + # lambda codes : codes.str.split('\.').apply(lambda ls: ls[0]) #only get part before dot + ), + ( + "Alphanumeric Capital", + lambda codes: codes.str.match(r"^[A-Z0-9]+$"), + lambda codes, codes_file: self.raise_exception( + InvalidCodesException( + CodesError( + f"QA Alphanumeric Capital", + codes=codes, + codes_file=codes_file, + mask=codes.str.match(r"^[A-Z0-9]+$"), + code_type=self.name, + ) + ) + ), + ), + ( + "In Database", + lambda codes: ~( + ~self.in_database(codes, self.db, self.name) + & ~self.in_database(codes, self.db, self.name + "_alt") + ), + lambda codes, codes_file: self.raise_exception( + InvalidCodesException( + CodesError( + f"QA In Database", + codes=codes, + codes_file=codes_file, + mask=~( + ~self.in_database(codes, self.db, self.name) + & ~self.in_database(codes, self.db, self.name + "_alt") + ), + code_type=self.name, + ) + ) + ), + ), + # ( + # "ICD10 Regex", + # lambda codes : codes.str.match("[a-zA-Z][0-9][0-9]\.?[a-zA-Z0-9]*$"), #Alpha, Num, Num , Dot?, 4xAlphNum* + # lambda codes : lc.log_invalid_code(codes, + # codes.str.match("[a-zA-Z][0-9][0-9]\.?[a-zA-Z0-9]*$"), #Log non-matching rows + # code_type="icd10", + # + # ) + ] + + def trim_icd10(codes): + codes = codes.str[:4] + return codes + + class Snomed(Proto): - def __init__(self): - super().__init__('snomed', trud.PROCESSED_PATH / 'snomed.parquet') - - self.checks = [ - # ( - # "Not Empty", - # lambda codes : pd.Series([len(codes) > 0]), - # lambda codes : raise_exception(Exception("Code List is Empty")) - # ), - ( - "Too Short", - lambda codes : ~(codes.str.len() < 6), - lambda codes, codes_file : self.raise_exception( - InvalidCodesException(CodesError(f"QA Too Short", - codes=codes, - codes_file=codes_file, - mask=~(codes.str.len() < 6), - code_type=self.name))) - ), - ( - "Too Long", - lambda codes : ~(codes.str.len() > 18), - lambda codes, codes_file : self.raise_exception( - InvalidCodesException(CodesError(f"QA Too Long", - codes=codes, - codes_file=codes_file, - mask=~(codes.str.len() > 18), - code_type=self.name))) - ), - ( - "Numeric", - lambda codes : codes.str.match(r"[0-9]+$"), - lambda codes, codes_file : self.raise_exception( - InvalidCodesException(CodesError(f"QA Numeric", - codes=codes, - codes_file=codes_file, - mask=codes.str.match(r"[0-9]+$"), - code_type=self.name))) - ), - # ( - # "Is Integer", - # lambda codes : codes.dtype == int, - # lambda codes : codes.astype(int) #Convert to integer - # ), - ( - "In Database", - lambda codes : self.in_database(codes, self.db, self.name), - lambda codes, codes_file : self.raise_exception( - InvalidCodesException(CodesError(f"QA In Database", - codes=codes, - codes_file=codes_file, - mask=self.in_database(codes, self.db, self.name), - code_type=self.name))) - ) - ] + def __init__(self): + super().__init__("snomed", trud.PROCESSED_PATH / "snomed.parquet") + + self.checks = [ + # ( + # "Not Empty", + # lambda codes : pd.Series([len(codes) > 0]), + # lambda codes : raise_exception(Exception("Code List is Empty")) + # ), + ( + "Too Short", + lambda codes: ~(codes.str.len() < 6), + lambda codes, codes_file: self.raise_exception( + InvalidCodesException( + CodesError( + f"QA Too Short", + codes=codes, + codes_file=codes_file, + mask=~(codes.str.len() < 6), + code_type=self.name, + ) + ) + ), + ), + ( + "Too Long", + lambda codes: ~(codes.str.len() > 18), + lambda codes, codes_file: self.raise_exception( + InvalidCodesException( + CodesError( + f"QA Too Long", + codes=codes, + codes_file=codes_file, + mask=~(codes.str.len() > 18), + code_type=self.name, + ) + ) + ), + ), + ( + "Numeric", + lambda codes: codes.str.match(r"[0-9]+$"), + lambda codes, codes_file: self.raise_exception( + InvalidCodesException( + CodesError( + f"QA Numeric", + codes=codes, + codes_file=codes_file, + mask=codes.str.match(r"[0-9]+$"), + code_type=self.name, + ) + ) + ), + ), + # ( + # "Is Integer", + # lambda codes : codes.dtype == int, + # lambda codes : codes.astype(int) #Convert to integer + # ), + ( + "In Database", + lambda codes: self.in_database(codes, self.db, self.name), + lambda codes, codes_file: self.raise_exception( + InvalidCodesException( + CodesError( + f"QA In Database", + codes=codes, + codes_file=codes_file, + mask=self.in_database(codes, self.db, self.name), + code_type=self.name, + ) + ) + ), + ), + ] + class Opcs4(Proto): - def __init__(self): - super().__init__('opcs4', trud.PROCESSED_PATH / 'opcs4.parquet') - - self.checks = [ - ( - "Not Empty", - lambda codes : pd.Series([len(codes) > 0]), - lambda codes, codes_file : self.raise_exception( - InvalidCodesException(CodesError(f"Code list is empty", - codes=codes, - codes_file=codes_file, - mask=None, - code_type=self.name))) - ), - ( - "In Database", - lambda codes : self.in_database(codes, self.db, self.name), - lambda codes, codes_file : self.raise_exception( - InvalidCodesException(CodesError(f"QA In Database", - codes=codes, - codes_file=codes_file, - mask=self.in_database(codes, self.db, self.name), - code_type=self.name))) - ) - ] + def __init__(self): + super().__init__("opcs4", trud.PROCESSED_PATH / "opcs4.parquet") + + self.checks = [ + ( + "Not Empty", + lambda codes: pd.Series([len(codes) > 0]), + lambda codes, codes_file: self.raise_exception( + InvalidCodesException( + CodesError( + f"Code list is empty", + codes=codes, + codes_file=codes_file, + mask=None, + code_type=self.name, + ) + ) + ), + ), + ( + "In Database", + lambda codes: self.in_database(codes, self.db, self.name), + lambda codes, codes_file: self.raise_exception( + InvalidCodesException( + CodesError( + f"QA In Database", + codes=codes, + codes_file=codes_file, + mask=self.in_database(codes, self.db, self.name), + code_type=self.name, + ) + ) + ), + ), + ] + class Atc(Proto): - def __init__(self): - super().__init__('atc', trud_codes_path=None) - self.checks = [ - ( - "Not Empty", - lambda codes : pd.Series([len(codes) > 0]), - lambda codes, codes_file : self.raise_exception( - InvalidCodesException(CodesError(f"Code list is empty", - codes=codes, - codes_file=codes_file, - mask=None, - code_type=self.name))) - ), - ( - "Alphanumeric Capital", - lambda codes : codes.str.match(r"^[A-Z0-9]+$"), - lambda codes, codes_file : self.raise_exception( - InvalidCodesException(CodesError(f"QA Alphanumeric Capital", - codes=codes, - codes_file=codes_file, - mask=codes.str.match(r"^[A-Z0-9]+$"), - code_type=self.name))) - ), - ] - + def __init__(self): + super().__init__("atc", trud_codes_path=None) + self.checks = [ + ( + "Not Empty", + lambda codes: pd.Series([len(codes) > 0]), + lambda codes, codes_file: self.raise_exception( + InvalidCodesException( + CodesError( + f"Code list is empty", + codes=codes, + codes_file=codes_file, + mask=None, + code_type=self.name, + ) + ) + ), + ), + ( + "Alphanumeric Capital", + lambda codes: codes.str.match(r"^[A-Z0-9]+$"), + lambda codes, codes_file: self.raise_exception( + InvalidCodesException( + CodesError( + f"QA Alphanumeric Capital", + codes=codes, + codes_file=codes_file, + mask=codes.str.match(r"^[A-Z0-9]+$"), + code_type=self.name, + ) + ) + ), + ), + ] + + class Med(Proto): - def __init__(self): - super().__init__('med', trud_codes_path=None) - self.checks = [ - ( - "Not Empty", - lambda codes : pd.Series([len(codes) > 0]), - lambda codes, codes_file : self.raise_exception( - InvalidCodesException(CodesError(f"Code list is empty", - codes=codes, - codes_file=codes_file, - mask=None, - code_type=self.name))) - ) - ] - + def __init__(self): + super().__init__("med", trud_codes_path=None) + self.checks = [ + ( + "Not Empty", + lambda codes: pd.Series([len(codes) > 0]), + lambda codes, codes_file: self.raise_exception( + InvalidCodesException( + CodesError( + f"Code list is empty", + codes=codes, + codes_file=codes_file, + mask=None, + code_type=self.name, + ) + ) + ), + ) + ] + + class Cprd(Proto): - def __init__(self): - super().__init__('cprd', trud_codes_path=None) - self.checks = [ - ( - "Not Empty", - lambda codes : pd.Series([len(codes) > 0]), - lambda codes, codes_file : self.raise_exception( - InvalidCodesException(CodesError(f"Code list is empty", - codes=codes, - codes_file=codes_file, - mask=None, - code_type=self.name))) - ) - ] - -class CodeTypeParser(): - """A class used in InvalidCodesException to report an error if a code parser check fails""" - def __init__(self, trud_processed_dir=trud.PROCESSED_PATH): - - if not trud_processed_dir.exists() or not trud_processed_dir.is_dir(): - raise FileNotFoundError(f"Cannot initialise parsers as the TRUD processed directory {trud_processed_dir} does not exist, please check that TRUD has been installed: acmc trud install") - - self.code_types = { - "read2": Read2(), - "read3": Read3(), - "icd10": Icd10(), - "snomed": Snomed(), - "opcs4": Opcs4(), - "atc": Atc(), - "med": Med(), - "cprd": Cprd(), - } + def __init__(self): + super().__init__("cprd", trud_codes_path=None) + self.checks = [ + ( + "Not Empty", + lambda codes: pd.Series([len(codes) > 0]), + lambda codes, codes_file: self.raise_exception( + InvalidCodesException( + CodesError( + f"Code list is empty", + codes=codes, + codes_file=codes_file, + mask=None, + code_type=self.name, + ) + ) + ), + ) + ] + + +class CodeTypeParser: + """A class used in InvalidCodesException to report an error if a code parser check fails""" + + def __init__(self, trud_processed_dir=trud.PROCESSED_PATH): + + if not trud_processed_dir.exists() or not trud_processed_dir.is_dir(): + raise FileNotFoundError( + f"Cannot initialise parsers as the TRUD processed directory {trud_processed_dir} does not exist, please check that TRUD has been installed: acmc trud install" + ) + + self.code_types = { + "read2": Read2(), + "read3": Read3(), + "icd10": Icd10(), + "snomed": Snomed(), + "opcs4": Opcs4(), + "atc": Atc(), + "med": Med(), + "cprd": Cprd(), + } diff --git a/acmc/phen.py b/acmc/phen.py index 1f00c0ed99ed1978f3c95841cabdaffc62a538c1..fa94e75b3face24e5afb2a1e39843ce0b2cc86b6 100644 --- a/acmc/phen.py +++ b/acmc/phen.py @@ -19,289 +19,350 @@ from acmc import trud, omop, parse # setup logging import acmc.logging_config as lc + logger = lc.setup_logger() pd.set_option("mode.chained_assignment", None) -PHEN_DIR = 'phen' -DEFAULT_PHEN_PATH = Path('./workspace') / PHEN_DIR +PHEN_DIR = "phen" +DEFAULT_PHEN_PATH = Path("./workspace") / PHEN_DIR -CODES_DIR = 'codes' -MAP_DIR = 'map' -CONCEPT_SET_DIR = 'concept-set' -OMOP_DIR = 'omop' +CODES_DIR = "codes" +MAP_DIR = "map" +CONCEPT_SET_DIR = "concept-set" +OMOP_DIR = "omop" DEFAULT_PHEN_DIR_LIST = [CODES_DIR, MAP_DIR, CONCEPT_SET_DIR, OMOP_DIR] -CONFIG_FILE = 'config.json' -VOCAB_VERSION_FILE = 'vocab_version.yaml' +CONFIG_FILE = "config.json" +VOCAB_VERSION_FILE = "vocab_version.yaml" -DEFAULT_GIT_BRANCH = 'main' +DEFAULT_GIT_BRANCH = "main" SPLIT_COL_ACTION = "split_col" CODES_COL_ACTION = "codes_col" DIVIDE_COL_ACTION = "divide_col" COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION] + class PhenValidationException(Exception): - """Custom exception class raised when validation errors in phenotype configuration file""" - def __init__(self, message, validation_errors=None): - super().__init__(message) - self.validation_errors = validation_errors + """Custom exception class raised when validation errors in phenotype configuration file""" + + def __init__(self, message, validation_errors=None): + super().__init__(message) + self.validation_errors = validation_errors + def construct_git_url(remote_url): - """Constructs a git url for github or gitlab including a PAT token environment variable""" - # check the url - parsed_url = urlparse(remote_url) - - # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd - # need to update this function if the URL scheme is different. - if "github.com" in parsed_url.netloc: - # get GitHub PAT from environment variable - auth = os.getenv("ACMC_GITHUB_PAT") - if not auth: - raise ValueError("GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable.") - else: - # get GitLab PAT from environment variable - auth = os.getenv("ACMC_GITLAB_PAT") - if not auth: - raise ValueError("GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable.") - auth = f"oauth2:{auth}" - - # Construct the new URL with credentials - new_netloc = f"{auth}@{parsed_url.netloc}" - return urlunparse((parsed_url.scheme, new_netloc, parsed_url.path, parsed_url.params, parsed_url.query, parsed_url.fragment)) + """Constructs a git url for github or gitlab including a PAT token environment variable""" + # check the url + parsed_url = urlparse(remote_url) + + # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd + # need to update this function if the URL scheme is different. + if "github.com" in parsed_url.netloc: + # get GitHub PAT from environment variable + auth = os.getenv("ACMC_GITHUB_PAT") + if not auth: + raise ValueError( + "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable." + ) + else: + # get GitLab PAT from environment variable + auth = os.getenv("ACMC_GITLAB_PAT") + if not auth: + raise ValueError( + "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable." + ) + auth = f"oauth2:{auth}" + + # Construct the new URL with credentials + new_netloc = f"{auth}@{parsed_url.netloc}" + return urlunparse( + ( + parsed_url.scheme, + new_netloc, + parsed_url.path, + parsed_url.params, + parsed_url.query, + parsed_url.fragment, + ) + ) + def create_empty_git_dir(path): - """Creates a directory with a .gitkeep file so that it's tracked in git""" - path.mkdir(exist_ok=True) - keep_path = path / '.gitkeep' - keep_path.touch(exist_ok=True) + """Creates a directory with a .gitkeep file so that it's tracked in git""" + path.mkdir(exist_ok=True) + keep_path = path / ".gitkeep" + keep_path.touch(exist_ok=True) + def init(phen_dir, remote_url): - """Initial phenotype directory as git repo with standard structure""" - logger.info(f"Initialising Phenotype in directory: {phen_dir}") - phen_path = Path(phen_dir) - - # check if directory already exists and ask user if they want to recreate it - configure = False - if phen_path.exists() and phen_path.is_dir(): # Check if it exists and is a directory - user_input = input(f"The phen directory already exists. Do you want to reinitialise? (yes/no): ").strip().lower() - if user_input in ['yes', 'y']: - shutil.rmtree(phen_path) - configure = True; - else: - logger.info("Phen directory was not recreated.") - else: - configure=True - - if not configure: - logger.info(f"Exiting, phenotype not initiatised") - return - - # Initialise repo from local or remote - repo = None - # if remote then clone the repo otherwise init a local repo - if remote_url != None: - # add PAT token to the URL - git_url = construct_git_url(remote_url) - - # clone the repo - repo = git.cmd.Git() - repo.clone(git_url, phen_path) - # open repo - repo = git.Repo(phen_path) - - # check if there are any commits (new repo has no commits) - if len(repo.branches) == 0 or repo.head.is_detached: # Handle detached HEAD (e.g., after init) - logger.debug("The phen repository has no commits yet.") - commit_count = 0 - else: - # Get the total number of commits in the default branch - commit_count = sum(1 for _ in repo.iter_commits()) - logger.debug(f"Repo has previous commits: {commit_count}") - else: - # local repo, create the directories and init - phen_path.mkdir(parents=True, exist_ok=True) - logger.debug(f"Phen directory '{phen_path}' has been created.") - repo = git.Repo.init(phen_path) - commit_count = 0 - - # initialise empty repos - if commit_count == 0: - # create initial commit - initial_file_path = phen_path / "README.md" - with open(initial_file_path, "w") as file: - file.write("# Initial commit\nThis is the first commit in the phen repository.\n") - repo.index.add([initial_file_path]) - repo.index.commit("Initial commit") - commit_count = 1 - - # Checkout the phens default branch, creating it if it does not exist - if DEFAULT_GIT_BRANCH in repo.branches: - main_branch = repo.heads[DEFAULT_GIT_BRANCH] - main_branch.checkout() - else: - main_branch = repo.create_head(DEFAULT_GIT_BRANCH) - main_branch.checkout() - - # if the phen path does not contain the config file then initialise the phen type - config_path = phen_path / CONFIG_FILE - if config_path.exists(): - logger.debug(f"Phenotype configuration files already exist") - return - - logger.info("Creating phen directory structure and config files") - for d in DEFAULT_PHEN_DIR_LIST: - create_empty_git_dir(phen_path / d) - - # set initial version based on the number of commits in the repo, depending on how the repo was created - # e.g., with a README.md, then there will be some initial commits before the phen config is added - next_commit_count = commit_count + 1 - initial_version = f"v1.0.{next_commit_count}" - - # create empty phen config file - config = { - "concept_sets": { - "version": initial_version, - "omop": { - "vocabulary_id": "", - "vocabulary_name": "", - "vocabulary_reference": "" - }, - "concept_set": [ - ] - }, - "codes": [ - ] - } - config_path = phen_path / CONFIG_FILE - with open(config_path, "w", encoding="utf-8") as f: - json.dump(config, f, indent=4) - - # TODO: add gitignore - - # Ignore all SQLite database files -#*.db -#*.sqlite -#*.sqlite3 - - # add to git repo and commit - for d in DEFAULT_PHEN_DIR_LIST: - repo.git.add(phen_path / d) - repo.git.add(all=True) - repo.index.commit("initialised the phen git repo.") - - logger.info(f"Phenotype initialised successfully") - + """Initial phenotype directory as git repo with standard structure""" + logger.info(f"Initialising Phenotype in directory: {phen_dir}") + phen_path = Path(phen_dir) + + # check if directory already exists and ask user if they want to recreate it + configure = False + if ( + phen_path.exists() and phen_path.is_dir() + ): # Check if it exists and is a directory + user_input = ( + input( + f"The phen directory already exists. Do you want to reinitialise? (yes/no): " + ) + .strip() + .lower() + ) + if user_input in ["yes", "y"]: + shutil.rmtree(phen_path) + configure = True + else: + logger.info("Phen directory was not recreated.") + else: + configure = True + + if not configure: + logger.info(f"Exiting, phenotype not initiatised") + return + + # Initialise repo from local or remote + repo = None + # if remote then clone the repo otherwise init a local repo + if remote_url != None: + # add PAT token to the URL + git_url = construct_git_url(remote_url) + + # clone the repo + repo = git.cmd.Git() + repo.clone(git_url, phen_path) + # open repo + repo = git.Repo(phen_path) + + # check if there are any commits (new repo has no commits) + if ( + len(repo.branches) == 0 or repo.head.is_detached + ): # Handle detached HEAD (e.g., after init) + logger.debug("The phen repository has no commits yet.") + commit_count = 0 + else: + # Get the total number of commits in the default branch + commit_count = sum(1 for _ in repo.iter_commits()) + logger.debug(f"Repo has previous commits: {commit_count}") + else: + # local repo, create the directories and init + phen_path.mkdir(parents=True, exist_ok=True) + logger.debug(f"Phen directory '{phen_path}' has been created.") + repo = git.Repo.init(phen_path) + commit_count = 0 + + # initialise empty repos + if commit_count == 0: + # create initial commit + initial_file_path = phen_path / "README.md" + with open(initial_file_path, "w") as file: + file.write( + "# Initial commit\nThis is the first commit in the phen repository.\n" + ) + repo.index.add([initial_file_path]) + repo.index.commit("Initial commit") + commit_count = 1 + + # Checkout the phens default branch, creating it if it does not exist + if DEFAULT_GIT_BRANCH in repo.branches: + main_branch = repo.heads[DEFAULT_GIT_BRANCH] + main_branch.checkout() + else: + main_branch = repo.create_head(DEFAULT_GIT_BRANCH) + main_branch.checkout() + + # if the phen path does not contain the config file then initialise the phen type + config_path = phen_path / CONFIG_FILE + if config_path.exists(): + logger.debug(f"Phenotype configuration files already exist") + return + + logger.info("Creating phen directory structure and config files") + for d in DEFAULT_PHEN_DIR_LIST: + create_empty_git_dir(phen_path / d) + + # set initial version based on the number of commits in the repo, depending on how the repo was created + # e.g., with a README.md, then there will be some initial commits before the phen config is added + next_commit_count = commit_count + 1 + initial_version = f"v1.0.{next_commit_count}" + + # create empty phen config file + config = { + "concept_sets": { + "version": initial_version, + "omop": { + "vocabulary_id": "", + "vocabulary_name": "", + "vocabulary_reference": "", + }, + "concept_set": [], + }, + "codes": [], + } + config_path = phen_path / CONFIG_FILE + with open(config_path, "w", encoding="utf-8") as f: + json.dump(config, f, indent=4) + + # TODO: add gitignore + + # Ignore all SQLite database files + # *.db + # *.sqlite + # *.sqlite3 + + # add to git repo and commit + for d in DEFAULT_PHEN_DIR_LIST: + repo.git.add(phen_path / d) + repo.git.add(all=True) + repo.index.commit("initialised the phen git repo.") + + logger.info(f"Phenotype initialised successfully") + + def validate(phen_dir): - """Validates the phenotype directory is a git repo with standard structure""" - logger.info(f"Validating phenotype: {phen_dir}") - phen_path = Path(phen_dir) - if not phen_path.is_dir(): - raise NotADirectoryError(f"Error: '{phen_path}' is not a directory") - - config_path = phen_path / CONFIG_FILE - if not config_path.is_file(): - raise FileNotFoundError(f"Error: phen configuration file '{config_path}' does not exist.") - - codes_path = phen_path / CODES_DIR - if not codes_path.is_dir(): - raise FileNotFoundError(f"Error: source codes directory {source_codes_dir} does not exist.") - - # Calidate the directory is a git repo - try: - git.Repo(phen_path) - except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError): - raise Exception(f"Phen directory {phen_path} is not a git repo") - - # Load configuration File - if config_path.suffix == ".json": - mapping = json.load(open(config_path, "rb")) - else: - raise Exception(f"Unsupported configuration filetype: {str(config_path.resolve())}") - - # initiatise - validation_errors = [] - concept_sets = mapping["concept_sets"] - concept_codes = mapping["codes"] - code_types = parse.CodeTypeParser().code_types - + """Validates the phenotype directory is a git repo with standard structure""" + logger.info(f"Validating phenotype: {phen_dir}") + phen_path = Path(phen_dir) + if not phen_path.is_dir(): + raise NotADirectoryError(f"Error: '{phen_path}' is not a directory") + + config_path = phen_path / CONFIG_FILE + if not config_path.is_file(): + raise FileNotFoundError( + f"Error: phen configuration file '{config_path}' does not exist." + ) + + codes_path = phen_path / CODES_DIR + if not codes_path.is_dir(): + raise FileNotFoundError( + f"Error: source codes directory {source_codes_dir} does not exist." + ) + + # Calidate the directory is a git repo + try: + git.Repo(phen_path) + except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError): + raise Exception(f"Phen directory {phen_path} is not a git repo") + + # Load configuration File + if config_path.suffix == ".json": + mapping = json.load(open(config_path, "rb")) + else: + raise Exception( + f"Unsupported configuration filetype: {str(config_path.resolve())}" + ) + + # initiatise + validation_errors = [] + concept_sets = mapping["concept_sets"] + concept_codes = mapping["codes"] + code_types = parse.CodeTypeParser().code_types + # check the version number is of the format vn.n.n - match = re.match(r"v(\d+\.\d+\.\d+)", concept_sets['version']) - if not match: - validation_errors.append(f"Invalid version format in configuration file: {concept_sets['version']}") - - # create a list of all the concept set names defined in the concept set configuration - concept_set_names = [] - for item in concept_sets['concept_set']: - if item['concept_set_name'] in concept_set_names: - validation_errors.append(f"Duplicate concept set defined in concept sets {item['concept_set_name'] }") - else: - concept_set_names.append(item['concept_set_name']) - - # check codes definition - concept_set_mapping_names = [] - for item in concept_codes: - - required_keys = {"folder", "files"} - if required_keys.issubset(item.keys()): - # check concept codes path is a directory - concept_code_dir_path = codes_path / item['folder'] - if not concept_code_dir_path.is_dir(): - validation_errors.append(f"Folder directory {str(concept_code_dir_path.resolve())} is not a directory") - - for file in item["files"]: - # check concepte code file exists - concept_code_file_path = concept_code_dir_path / file['file'] - if not concept_code_file_path.exists(): - validation_errors.append(f"Coding file {str(concept_code_file_path.resolve())} does not exist") - - # check concepte code file is not empty - concept_code_file_path = concept_code_dir_path / file['file'] - if concept_code_file_path.stat().st_size == 0: - validation_errors.append(f"Coding file {str(concept_code_file_path.resolve())} is an empty file") - - # check columns section exists - if "columns" not in file: - validation_errors.append(f"Columns not defined for {concept_code_file_path}") - - # check columns specified are a supported medical coding type - for column in file['columns']: - if column not in code_types and column != 'metadata': - validation_errors.append(f"Column type {column} for file {concept_code_file_path} is not supported") - - # check the actions are supported - if 'actions' in file: - for action in file['actions']: - if action not in COL_ACTIONS: - validation_errors.append(f"Action {action} is not supported") - - # check concept_set defined for the mapping - for concept_set_mapping in file['concept_set']: - # store the concept set names found for later set operations - if concept_set_mapping not in concept_set_mapping_names: - concept_set_mapping_names.append(concept_set_mapping) - else: - validation_errors.append(f"Missing required elements {required_keys} in codes {item}") - # create sets to perform set operations on the lists of concept set names - concept_set_names_set = set(concept_set_names) - concept_set_mapping_names_set = set(concept_set_mapping_names) - - # check all concept sets in the summary section have at least one code mapping - concept_set_no_codes = list(concept_set_names_set - concept_set_mapping_names_set) - if len(concept_set_no_codes) > 0: - validation_errors.append(f"Concept sets do not exist in codes {concept_set_no_codes}") - - # check all concept sets included in the code mapping are defined in the summary concept_set section - codes_no_concept_set = list(concept_set_mapping_names_set - concept_set_names_set) - if len(codes_no_concept_set) > 0: - validation_errors.append(f"Concept sets mapped in codes do not exist in the concept sets: {codes_no_concept_set}") - - if len(validation_errors) > 0: - logger.error(validation_errors) - raise PhenValidationException(f"Configuration file {str(config_path.resolve())} failed validation", - validation_errors) - - logger.info(f"Phenotype validated successfully") + match = re.match(r"v(\d+\.\d+\.\d+)", concept_sets["version"]) + if not match: + validation_errors.append( + f"Invalid version format in configuration file: {concept_sets['version']}" + ) + + # create a list of all the concept set names defined in the concept set configuration + concept_set_names = [] + for item in concept_sets["concept_set"]: + if item["concept_set_name"] in concept_set_names: + validation_errors.append( + f"Duplicate concept set defined in concept sets {item['concept_set_name'] }" + ) + else: + concept_set_names.append(item["concept_set_name"]) + + # check codes definition + concept_set_mapping_names = [] + for item in concept_codes: + + required_keys = {"folder", "files"} + if required_keys.issubset(item.keys()): + # check concept codes path is a directory + concept_code_dir_path = codes_path / item["folder"] + if not concept_code_dir_path.is_dir(): + validation_errors.append( + f"Folder directory {str(concept_code_dir_path.resolve())} is not a directory" + ) + + for file in item["files"]: + # check concepte code file exists + concept_code_file_path = concept_code_dir_path / file["file"] + if not concept_code_file_path.exists(): + validation_errors.append( + f"Coding file {str(concept_code_file_path.resolve())} does not exist" + ) + + # check concepte code file is not empty + concept_code_file_path = concept_code_dir_path / file["file"] + if concept_code_file_path.stat().st_size == 0: + validation_errors.append( + f"Coding file {str(concept_code_file_path.resolve())} is an empty file" + ) + + # check columns section exists + if "columns" not in file: + validation_errors.append( + f"Columns not defined for {concept_code_file_path}" + ) + + # check columns specified are a supported medical coding type + for column in file["columns"]: + if column not in code_types and column != "metadata": + validation_errors.append( + f"Column type {column} for file {concept_code_file_path} is not supported" + ) + + # check the actions are supported + if "actions" in file: + for action in file["actions"]: + if action not in COL_ACTIONS: + validation_errors.append( + f"Action {action} is not supported" + ) + + # check concept_set defined for the mapping + for concept_set_mapping in file["concept_set"]: + # store the concept set names found for later set operations + if concept_set_mapping not in concept_set_mapping_names: + concept_set_mapping_names.append(concept_set_mapping) + else: + validation_errors.append( + f"Missing required elements {required_keys} in codes {item}" + ) + # create sets to perform set operations on the lists of concept set names + concept_set_names_set = set(concept_set_names) + concept_set_mapping_names_set = set(concept_set_mapping_names) + + # check all concept sets in the summary section have at least one code mapping + concept_set_no_codes = list(concept_set_names_set - concept_set_mapping_names_set) + if len(concept_set_no_codes) > 0: + validation_errors.append( + f"Concept sets do not exist in codes {concept_set_no_codes}" + ) + + # check all concept sets included in the code mapping are defined in the summary concept_set section + codes_no_concept_set = list(concept_set_mapping_names_set - concept_set_names_set) + if len(codes_no_concept_set) > 0: + validation_errors.append( + f"Concept sets mapped in codes do not exist in the concept sets: {codes_no_concept_set}" + ) + + if len(validation_errors) > 0: + logger.error(validation_errors) + raise PhenValidationException( + f"Configuration file {str(config_path.resolve())} failed validation", + validation_errors, + ) + + logger.info(f"Phenotype validated successfully") + def read_table_file(path, excel_sheet=None): """ @@ -318,108 +379,129 @@ def read_table_file(path, excel_sheet=None): df = pd.read_stata(path, dtype=str) else: raise Exception(f"Unsupported filetype provided for source file {path.suffix}") - + return df + def process_actions(df, file): - # Perform Structural Changes to file before preprocessing - logger.debug("Processing file structural actions") - if ("actions" in file and "split_col" in file["actions"] and "codes_col" in file["actions"]): - split_col = file["actions"]["split_col"] - codes_col = file["actions"]["codes_col"] - logger.debug("Action: Splitting", split_col, "column into:", df[split_col].unique(),) - codes = df[codes_col] - oh = pd.get_dummies(df[split_col], dtype=bool) # one hot encode - oh = oh.where((oh != True), codes, axis=0) # fill in 1s with codes - oh[oh == False] = np.nan # replace 0s with None - df = pd.concat([df, oh], axis=1) # merge in new columns - - return df + # Perform Structural Changes to file before preprocessing + logger.debug("Processing file structural actions") + if ( + "actions" in file + and "split_col" in file["actions"] + and "codes_col" in file["actions"] + ): + split_col = file["actions"]["split_col"] + codes_col = file["actions"]["codes_col"] + logger.debug( + "Action: Splitting", + split_col, + "column into:", + df[split_col].unique(), + ) + codes = df[codes_col] + oh = pd.get_dummies(df[split_col], dtype=bool) # one hot encode + oh = oh.where((oh != True), codes, axis=0) # fill in 1s with codes + oh[oh == False] = np.nan # replace 0s with None + df = pd.concat([df, oh], axis=1) # merge in new columns + + return df + # Perform QA Checks on columns individually and append to df def preprocess_codes(df, file, target_code_type=None, codes_file=None): - """ Parses each column individually - Order and length will not be preserved! """ - out = pd.DataFrame([]) # create output df to append to - code_errors = [] # list of errors from processing - - meta_columns = [] # meta columns to keep with codes - if "actions" in file and "divide_col" in file["actions"]: - meta_columns += [file["actions"]["divide_col"]] - # TODO: enable metacolumns to be outputted - problem with map_file appending - if "metadata" in file["columns"]: - meta_columns += file["columns"]["metadata"] - - metadata_df = df[meta_columns] - - # Preprocess codes - code_types = parse.CodeTypeParser().code_types - for code_type_name, code_type_parser in code_types.items(): - if code_type_name in file['columns']: - logger.info(f"Processing {code_type_name} codes...") - - # get code types - codes = df[file['columns'][code_type_name]].dropna() - codes = codes.astype(str) # convert to string - codes = codes.str.strip() # remove excess spaces - - # process codes, validating them using parser and returning the errors - codes, errors = code_type_parser.process(codes, codes_file) - if len(errors) > 0: - code_errors.extend(errors) - logger.warning(f"Codes validation failed with {len(errors)} errors") - - # add metadata columns - out = pd.concat([out, pd.DataFrame({code_type_name: codes}).join(metadata_df)], ignore_index=True) - - return out, meta_columns, code_errors + """Parses each column individually - Order and length will not be preserved!""" + out = pd.DataFrame([]) # create output df to append to + code_errors = [] # list of errors from processing + + meta_columns = [] # meta columns to keep with codes + if "actions" in file and "divide_col" in file["actions"]: + meta_columns += [file["actions"]["divide_col"]] + # TODO: enable metacolumns to be outputted - problem with map_file appending + if "metadata" in file["columns"]: + meta_columns += file["columns"]["metadata"] + + metadata_df = df[meta_columns] + + # Preprocess codes + code_types = parse.CodeTypeParser().code_types + for code_type_name, code_type_parser in code_types.items(): + if code_type_name in file["columns"]: + logger.info(f"Processing {code_type_name} codes...") + + # get code types + codes = df[file["columns"][code_type_name]].dropna() + codes = codes.astype(str) # convert to string + codes = codes.str.strip() # remove excess spaces + + # process codes, validating them using parser and returning the errors + codes, errors = code_type_parser.process(codes, codes_file) + if len(errors) > 0: + code_errors.extend(errors) + logger.warning(f"Codes validation failed with {len(errors)} errors") + + # add metadata columns + out = pd.concat( + [out, pd.DataFrame({code_type_name: codes}).join(metadata_df)], + ignore_index=True, + ) + + return out, meta_columns, code_errors + # Translate Df with multiple codes into single code type Series def translate_codes(df, target_code_type): - codes = pd.Series([], dtype=str) - - # Convert codes to target type - logger.info(f"Converting to target code type {target_code_type}") - for col_name in df.columns: - # if target code type is the same as thet source code type, no translation, just appending source as target - if col_name == target_code_type: - logger.debug(f"Target code type {target_code_type} has source code types {len(df)}, copying rather than translating") - codes = pd.concat([codes, df[target_code_type]]) - else: - filename = f"{col_name}_to_{target_code_type}.parquet" - map_path = trud.PROCESSED_PATH / filename - if map_path.exists(): - col = df[col_name] - df_map = pd.read_parquet(map_path) - # merge on corresponding codes and take target column - translated = pd.merge(col, df_map, how="left")[target_code_type] - # TODO: BUG mask does not match column - codes = pd.concat([codes, translated]) # merge to output - else: - logger.warning(f"No mapping from {col_name} to {target_code_type}, file {str(map_path.resolve())} does not exist") - - return codes + codes = pd.Series([], dtype=str) + + # Convert codes to target type + logger.info(f"Converting to target code type {target_code_type}") + for col_name in df.columns: + # if target code type is the same as thet source code type, no translation, just appending source as target + if col_name == target_code_type: + logger.debug( + f"Target code type {target_code_type} has source code types {len(df)}, copying rather than translating" + ) + codes = pd.concat([codes, df[target_code_type]]) + else: + filename = f"{col_name}_to_{target_code_type}.parquet" + map_path = trud.PROCESSED_PATH / filename + if map_path.exists(): + col = df[col_name] + df_map = pd.read_parquet(map_path) + # merge on corresponding codes and take target column + translated = pd.merge(col, df_map, how="left")[target_code_type] + # TODO: BUG mask does not match column + codes = pd.concat([codes, translated]) # merge to output + else: + logger.warning( + f"No mapping from {col_name} to {target_code_type}, file {str(map_path.resolve())} does not exist" + ) + + return codes + # Append file's codes to output Df with concept def map_file(df, target_code_type, out, concepts, meta_columns=[]): - # seperate out meta_columns - metadata_df = df[meta_columns] - df = df.drop(columns=meta_columns) - - # translate codes - codes = translate_codes(df, target_code_type) - codes = codes.dropna() # delete NaNs - - # Append to output if translated - if len(codes) > 0: - codes = pd.DataFrame({"CONCEPT": codes}) - codes = codes.join(metadata_df) - for concept in concepts: - codes["CONCEPT_SET"] = np.repeat(concept.strip(), len(codes)) - out = pd.concat([out, codes]) - else: - logger.debug(f"No codes converted with target code type {target_code_type}") - - return out + # seperate out meta_columns + metadata_df = df[meta_columns] + df = df.drop(columns=meta_columns) + + # translate codes + codes = translate_codes(df, target_code_type) + codes = codes.dropna() # delete NaNs + + # Append to output if translated + if len(codes) > 0: + codes = pd.DataFrame({"CONCEPT": codes}) + codes = codes.join(metadata_df) + for concept in concepts: + codes["CONCEPT_SET"] = np.repeat(concept.strip(), len(codes)) + out = pd.concat([out, codes]) + else: + logger.debug(f"No codes converted with target code type {target_code_type}") + + return out + def sql_row_exist(conn, table, column, value): # Execute and check if a result exists @@ -430,394 +512,466 @@ def sql_row_exist(conn, table, column, value): return exists + def write_code_errors(code_errors, code_errors_path): - err_df = pd.DataFrame([ - {"CONCEPT": ", ".join(err.codes[~err.mask].tolist()), - "VOCABULARY": err.code_type, - "SOURCE": err.codes_file, - "CAUSE": err.message} for err in code_errors]) + err_df = pd.DataFrame( + [ + { + "CONCEPT": ", ".join(err.codes[~err.mask].tolist()), + "VOCABULARY": err.code_type, + "SOURCE": err.codes_file, + "CAUSE": err.message, + } + for err in code_errors + ] + ) + + err_df = err_df.drop_duplicates() # Remove Duplicates from Error file + err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"]) + err_df.to_csv(code_errors_path, index=False, mode="w") - err_df = err_df.drop_duplicates() # Remove Duplicates from Error file - err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"]) - err_df.to_csv(code_errors_path, index=False, mode="w") def write_vocab_version(phen_path): - # write the vocab version files - - if not trud.VERSION_PATH.exists(): - raise FileNotFoundError(f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed") - - if not omop.VERSION_PATH.exists(): - raise FileNotFoundError(f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed") - - with trud.VERSION_PATH.open("r") as file: - trud_version = yaml.safe_load(file) - - with omop.VERSION_PATH.open("r") as file: - omop_version = yaml.safe_load(file) - - # Create the combined YAML structure - version_data = { - "versions": { - "acmc": acmc.__version__, - "trud": trud_version, - "omop": omop_version, - } - } - - with open(phen_path / VOCAB_VERSION_FILE, "w") as file: - yaml.dump(version_data, file, default_flow_style=False, sort_keys=False) + # write the vocab version files + + if not trud.VERSION_PATH.exists(): + raise FileNotFoundError( + f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed" + ) + + if not omop.VERSION_PATH.exists(): + raise FileNotFoundError( + f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed" + ) + + with trud.VERSION_PATH.open("r") as file: + trud_version = yaml.safe_load(file) + + with omop.VERSION_PATH.open("r") as file: + omop_version = yaml.safe_load(file) + + # Create the combined YAML structure + version_data = { + "versions": { + "acmc": acmc.__version__, + "trud": trud_version, + "omop": omop_version, + } + } + + with open(phen_path / VOCAB_VERSION_FILE, "w") as file: + yaml.dump(version_data, file, default_flow_style=False, sort_keys=False) + def map(phen_dir, target_code_type): - logger.info(f"Processing phenotype: {phen_dir}") - logger.debug(f"Target coding format: {target_code_type}") - - # Validate configuration - validate(phen_dir) - - # initialise paths - phen_path = Path(phen_dir) - config_path = phen_path / CONFIG_FILE - codes_path = phen_path / CODES_DIR - - # load configuration - config = json.load(open(config_path, "rb")) - concept_sets = config["concept_sets"] - codes = config["codes"] - - # Create output dataframe - out = pd.DataFrame([]) - code_errors = [] - - # Process each folder in codes section - for folder in codes: - for file in folder["files"]: - logger.debug(f"--- {file['file']} ---") - codes_file_path = codes_path / folder["folder"] / file["file"] - - # Load Code File - if "excel_sheet" in file: - df = read_table_file(path=codes_file_path, excel_sheet=file["excel_sheet"]) - else: - df = read_table_file(path=codes_file_path) - - # process structural actions - df = process_actions(df, file) - - # Preprocessing & Validation Checks - logger.debug("Processing and validating code formats") - df, meta_columns, errors = preprocess_codes( - df, - file, codes_file=str(codes_file_path.resolve()), - target_code_type=target_code_type) - - logger.debug(f" Length of errors from preprocess {len(errors)}") - if len(errors) > 0: - code_errors.extend(errors) - logger.debug(f" Length of code_errors {len(code_errors)}") - - # partition table by categorical column - if ("actions" in file and "divide_col" in file["actions"] and len(df) > 0): - divide_col = file["actions"]["divide_col"] - logger.debug("Action: Dividing Table by", divide_col, "column into: ", df[divide_col].unique(),) - df = df.groupby(divide_col) - - # Map to Concept/Phenotype - if len(df.index) != 0: - if ("concept_set" in file) and isinstance(df, pd.core.frame.DataFrame): - out = map_file( - df, - target_code_type, - out, - concepts=file["concept_set"], - meta_columns=meta_columns) - elif ("concept_set_categories" in file) and isinstance(df, pd.core.groupby.generic.DataFrameGroupBy): - meta_columns.remove(divide_col) # delete categorical column - for cat, grp in df: - if (cat in file["concept_set_categories"].keys()): # check if category is mapped - grp = grp.drop(columns=[divide_col]) # delete categorical column - logger.debug("Category:", cat) - out = map_file( - grp, - target_code_type, - out, - concepts=file["concept_set_categories"][cat], - meta_columns=meta_columns,) - else: - raise AttributeError(f"File {file} has either no concept_set or conceot_set_categories or the instance of dataframe objectives associated is incorrect, concept_set must be a DataFrame, conceot_set_categories must be pd.core.groupby.generic.DataFrameGroupBy") - else: - logger.warning(f"File {file} has no output after preprocessing in config {str(config_path.resolve())}") - - if(len(code_errors) > 0): - logger.error(f"The map processing has {len(code_errors)} errors") - error_filename = f"{target_code_type}-code-errors.csv" - write_code_errors(code_errors, phen_path / MAP_DIR / error_filename) - - # Check there is output from processing - if len(out.index) == 0: - logger.error(f"No output after map processing") - raise Exception(f"No output after map processing, check config {str(config_path.resolve())}") - - # Final processing - out = out.reset_index(drop=True) - out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) - out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"]) - - # Add concept set definition metadata - concept_sets_df = pd.DataFrame(concept_sets["concept_set"]) # transform to dataframe - if "metadata" in concept_sets_df.columns: - concept_sets_df = concept_sets_df.join(pd.json_normalize(concept_sets_df["metadata"])) # metadata to columns - concept_sets_df = concept_sets_df.drop(columns=["metadata"]) - concept_sets_df = concept_sets_df.rename(columns={"concept_set_name": "CONCEPT_SET"}) - concept_sets_df = concept_sets_df.drop_duplicates() # remove duplicates - out = out.merge(concept_sets_df, how="left", on="CONCEPT_SET") # merge with output - - # Save output to map directory - output_filename = target_code_type + '.csv' - map_path = phen_path / MAP_DIR / output_filename - out.to_csv(map_path, index=False) - logger.info(f"Saved mapped concepts to {str(map_path.resolve())}") - - # save concept sets as separate files - concept_set_path = phen_path / CONCEPT_SET_DIR / target_code_type - - # empty the concept-set directory if it exists but keep the .git file - git_items = ['.git', '.gitkeep'] - if concept_set_path.exists(): - for item in concept_set_path.iterdir(): - if item not in git_items: - item.unlink() - else: - concept_set_path.mkdir(parents=True, exist_ok=True) - - # write each concept as a separate file - for name, concept in out.groupby("CONCEPT_SET"): - concept = concept.sort_values(by="CONCEPT") #sort rows - concept = concept.dropna(how='all', axis=1) #remove empty cols - concept = concept.reindex(sorted(concept.columns), axis=1) #sort cols alphabetically - filename = f"{name}.csv" - concept_path = concept_set_path / filename - concept.to_csv(concept_path, index=False ) - - write_vocab_version(phen_path) - - logger.info(f"Phenotype processed successfully") + logger.info(f"Processing phenotype: {phen_dir}") + logger.debug(f"Target coding format: {target_code_type}") + + # Validate configuration + validate(phen_dir) + + # initialise paths + phen_path = Path(phen_dir) + config_path = phen_path / CONFIG_FILE + codes_path = phen_path / CODES_DIR + + # load configuration + config = json.load(open(config_path, "rb")) + concept_sets = config["concept_sets"] + codes = config["codes"] + + # Create output dataframe + out = pd.DataFrame([]) + code_errors = [] + + # Process each folder in codes section + for folder in codes: + for file in folder["files"]: + logger.debug(f"--- {file['file']} ---") + codes_file_path = codes_path / folder["folder"] / file["file"] + + # Load Code File + if "excel_sheet" in file: + df = read_table_file( + path=codes_file_path, excel_sheet=file["excel_sheet"] + ) + else: + df = read_table_file(path=codes_file_path) + + # process structural actions + df = process_actions(df, file) + + # Preprocessing & Validation Checks + logger.debug("Processing and validating code formats") + df, meta_columns, errors = preprocess_codes( + df, + file, + codes_file=str(codes_file_path.resolve()), + target_code_type=target_code_type, + ) + + logger.debug(f" Length of errors from preprocess {len(errors)}") + if len(errors) > 0: + code_errors.extend(errors) + logger.debug(f" Length of code_errors {len(code_errors)}") + + # partition table by categorical column + if "actions" in file and "divide_col" in file["actions"] and len(df) > 0: + divide_col = file["actions"]["divide_col"] + logger.debug( + "Action: Dividing Table by", + divide_col, + "column into: ", + df[divide_col].unique(), + ) + df = df.groupby(divide_col) + + # Map to Concept/Phenotype + if len(df.index) != 0: + if ("concept_set" in file) and isinstance(df, pd.core.frame.DataFrame): + out = map_file( + df, + target_code_type, + out, + concepts=file["concept_set"], + meta_columns=meta_columns, + ) + elif ("concept_set_categories" in file) and isinstance( + df, pd.core.groupby.generic.DataFrameGroupBy + ): + meta_columns.remove(divide_col) # delete categorical column + for cat, grp in df: + if ( + cat in file["concept_set_categories"].keys() + ): # check if category is mapped + grp = grp.drop( + columns=[divide_col] + ) # delete categorical column + logger.debug("Category:", cat) + out = map_file( + grp, + target_code_type, + out, + concepts=file["concept_set_categories"][cat], + meta_columns=meta_columns, + ) + else: + raise AttributeError( + f"File {file} has either no concept_set or conceot_set_categories or the instance of dataframe objectives associated is incorrect, concept_set must be a DataFrame, conceot_set_categories must be pd.core.groupby.generic.DataFrameGroupBy" + ) + else: + logger.warning( + f"File {file} has no output after preprocessing in config {str(config_path.resolve())}" + ) + + if len(code_errors) > 0: + logger.error(f"The map processing has {len(code_errors)} errors") + error_filename = f"{target_code_type}-code-errors.csv" + write_code_errors(code_errors, phen_path / MAP_DIR / error_filename) + + # Check there is output from processing + if len(out.index) == 0: + logger.error(f"No output after map processing") + raise Exception( + f"No output after map processing, check config {str(config_path.resolve())}" + ) + + # Final processing + out = out.reset_index(drop=True) + out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) + out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"]) + + # Add concept set definition metadata + concept_sets_df = pd.DataFrame( + concept_sets["concept_set"] + ) # transform to dataframe + if "metadata" in concept_sets_df.columns: + concept_sets_df = concept_sets_df.join( + pd.json_normalize(concept_sets_df["metadata"]) + ) # metadata to columns + concept_sets_df = concept_sets_df.drop(columns=["metadata"]) + concept_sets_df = concept_sets_df.rename( + columns={"concept_set_name": "CONCEPT_SET"} + ) + concept_sets_df = concept_sets_df.drop_duplicates() # remove duplicates + out = out.merge(concept_sets_df, how="left", on="CONCEPT_SET") # merge with output + + # Save output to map directory + output_filename = target_code_type + ".csv" + map_path = phen_path / MAP_DIR / output_filename + out.to_csv(map_path, index=False) + logger.info(f"Saved mapped concepts to {str(map_path.resolve())}") + + # save concept sets as separate files + concept_set_path = phen_path / CONCEPT_SET_DIR / target_code_type + + # empty the concept-set directory if it exists but keep the .git file + git_items = [".git", ".gitkeep"] + if concept_set_path.exists(): + for item in concept_set_path.iterdir(): + if item not in git_items: + item.unlink() + else: + concept_set_path.mkdir(parents=True, exist_ok=True) + + # write each concept as a separate file + for name, concept in out.groupby("CONCEPT_SET"): + concept = concept.sort_values(by="CONCEPT") # sort rows + concept = concept.dropna(how="all", axis=1) # remove empty cols + concept = concept.reindex( + sorted(concept.columns), axis=1 + ) # sort cols alphabetically + filename = f"{name}.csv" + concept_path = concept_set_path / filename + concept.to_csv(concept_path, index=False) + + write_vocab_version(phen_path) + + logger.info(f"Phenotype processed successfully") + def publish(phen_dir): - """Publishes updates to the phenotype by commiting all changes to the repo directory""" - - # Validate config - validate(phen_dir) - phen_path = Path(phen_dir) - - # load git repo and set the branch - repo = git.Repo(phen_path) - if DEFAULT_GIT_BRANCH in repo.branches: - main_branch = repo.heads[DEFAULT_GIT_BRANCH] - main_branch.checkout() - else: - raise AttributeError(f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}") - - # check if any changes to publish - if not repo.is_dirty() and not repo.untracked_files: - logger.info("Nothing to publish, no changes to the repo") - return - - # get major version from configuration file - config_path = phen_path / CONFIG_FILE - config = json.load(open(config_path, "rb")) - match = re.match(r"v(\d+\.\d+)", config['concept_sets']['version']) - major_version = match.group(1) - - # get latest minor version from git commit count - commit_count = len(list(repo.iter_commits("HEAD"))) - - # set version and write to config file so consistent with repo version - next_minor_version = commit_count + 1 - version = f"v{major_version}.{next_minor_version}" - logger.debug(f"New version: {version}") - config['concept_sets']['version'] = version - with open(config_path, "w", encoding="utf-8") as f: - json.dump(config, f, indent=4) - - # Add and commit changes to repo - commit_message = f"Committing updates to phenotype {phen_path}" - repo.git.add('--all') - repo.index.commit(commit_message) - - # Create and push the tag - if version in repo.tags: - raise Exception (f"Tag {version} already exists in repo {phen_path}") - repo.create_tag(version, message=f"Release {version}") - logger.info(f"New version: {version}") - - # push to origin if a remote repo - try: - origin = repo.remotes.origin - origin.push('main') - origin.push(tags=True) - logger.debug("Changes pushed to 'origin'.") - except AttributeError: - logger.debug("No remote named 'origin' found, local repo.") - - logger.info(f"Phenotype published successfully") + """Publishes updates to the phenotype by commiting all changes to the repo directory""" + + # Validate config + validate(phen_dir) + phen_path = Path(phen_dir) + + # load git repo and set the branch + repo = git.Repo(phen_path) + if DEFAULT_GIT_BRANCH in repo.branches: + main_branch = repo.heads[DEFAULT_GIT_BRANCH] + main_branch.checkout() + else: + raise AttributeError( + f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}" + ) + + # check if any changes to publish + if not repo.is_dirty() and not repo.untracked_files: + logger.info("Nothing to publish, no changes to the repo") + return + + # get major version from configuration file + config_path = phen_path / CONFIG_FILE + config = json.load(open(config_path, "rb")) + match = re.match(r"v(\d+\.\d+)", config["concept_sets"]["version"]) + major_version = match.group(1) + + # get latest minor version from git commit count + commit_count = len(list(repo.iter_commits("HEAD"))) + + # set version and write to config file so consistent with repo version + next_minor_version = commit_count + 1 + version = f"v{major_version}.{next_minor_version}" + logger.debug(f"New version: {version}") + config["concept_sets"]["version"] = version + with open(config_path, "w", encoding="utf-8") as f: + json.dump(config, f, indent=4) + + # Add and commit changes to repo + commit_message = f"Committing updates to phenotype {phen_path}" + repo.git.add("--all") + repo.index.commit(commit_message) + + # Create and push the tag + if version in repo.tags: + raise Exception(f"Tag {version} already exists in repo {phen_path}") + repo.create_tag(version, message=f"Release {version}") + logger.info(f"New version: {version}") + + # push to origin if a remote repo + try: + origin = repo.remotes.origin + origin.push("main") + origin.push(tags=True) + logger.debug("Changes pushed to 'origin'.") + except AttributeError: + logger.debug("No remote named 'origin' found, local repo.") + + logger.info(f"Phenotype published successfully") + def export(phen_dir, version): - """Exports a phen repo at a specific tagged version into a target directory""" - logger.info(f"Exporting phenotype {phen_dir} at version {version}") - - # validate configuration - validate(phen_dir) - phen_path = Path(phen_dir) - - # load configuration - config_path = phen_path / CONFIG_FILE - config = json.load(open(config_path, "rb")) - - map_path = phen_path / MAP_DIR - if not map_path.exists(): - logger.warning(f"Map path does not exist '{map_path}'") - - export_path = phen_path / OMOP_DIR - # check export directory exists and if not create it - if not export_path.exists(): - export_path.mkdir(parents=True) - logger.debug(f"OMOP export directory '{export_path}' created.") - - # omop export db - export_db_path = omop.export(map_path, - export_path, - config['concept_sets']['version'], - config['concept_sets']['omop']) - - - # write to tables - # export as csv - logger.info(f"Phenotype exported successfully") - + """Exports a phen repo at a specific tagged version into a target directory""" + logger.info(f"Exporting phenotype {phen_dir} at version {version}") + + # validate configuration + validate(phen_dir) + phen_path = Path(phen_dir) + + # load configuration + config_path = phen_path / CONFIG_FILE + config = json.load(open(config_path, "rb")) + + map_path = phen_path / MAP_DIR + if not map_path.exists(): + logger.warning(f"Map path does not exist '{map_path}'") + + export_path = phen_path / OMOP_DIR + # check export directory exists and if not create it + if not export_path.exists(): + export_path.mkdir(parents=True) + logger.debug(f"OMOP export directory '{export_path}' created.") + + # omop export db + export_db_path = omop.export( + map_path, + export_path, + config["concept_sets"]["version"], + config["concept_sets"]["omop"], + ) + + # write to tables + # export as csv + logger.info(f"Phenotype exported successfully") + + def copy(phen_dir, target_dir, version): - """Copys a phen repo at a specific tagged version into a target directory""" - - # Validate - validate(phen_dir) - phen_path = Path(phen_dir) - - # Check target directory exists - target_path = Path(target_dir) - if not target_path.exists(): - raise FileNotFoundError(f"The target directory {target_path} does not exist") - - # Set copy directory - copy_path = target_path / version - logger.info(f"Copying repo {phen_path} to {copy_path}") - - if not copy_path.exists(): - # If copy directory doesn't exist, clone the repo - logger.debug(f"Cloning repo from {phen_path} into {copy_path}...") - repo = git.Repo.clone_from(phen_path, copy_path) - else: - # If copy directory exists, open the repo - logger.debug(f"Copy of repository already exists in {copy_path}. Opening the repo...") - repo = git.Repo(copy_path) - - # Check out the latest commit or specified version - if version: - # Checkout a specific version (e.g., branch, tag, or commit hash) - logger.info(f"Checking out version {version}...") - repo.git.checkout(version) - else: - # Checkout the latest commit (HEAD) - logger.info(f"Checking out the latest commit...") - repo.git.checkout("HEAD") - - logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}") - - logger.info(f"Phenotype copied successfully") + """Copys a phen repo at a specific tagged version into a target directory""" + + # Validate + validate(phen_dir) + phen_path = Path(phen_dir) + + # Check target directory exists + target_path = Path(target_dir) + if not target_path.exists(): + raise FileNotFoundError(f"The target directory {target_path} does not exist") + + # Set copy directory + copy_path = target_path / version + logger.info(f"Copying repo {phen_path} to {copy_path}") + + if not copy_path.exists(): + # If copy directory doesn't exist, clone the repo + logger.debug(f"Cloning repo from {phen_path} into {copy_path}...") + repo = git.Repo.clone_from(phen_path, copy_path) + else: + # If copy directory exists, open the repo + logger.debug( + f"Copy of repository already exists in {copy_path}. Opening the repo..." + ) + repo = git.Repo(copy_path) + + # Check out the latest commit or specified version + if version: + # Checkout a specific version (e.g., branch, tag, or commit hash) + logger.info(f"Checking out version {version}...") + repo.git.checkout(version) + else: + # Checkout the latest commit (HEAD) + logger.info(f"Checking out the latest commit...") + repo.git.checkout("HEAD") + + logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}") + + logger.info(f"Phenotype copied successfully") + def diff(phen_dir, phen_old_dir): - """Compare the differences between two versions of a phenotype""" - - # validate phenotype directories - validate(phen_old_dir) - validate(phen_dir) - - old_phen_path = Path(phen_old_dir) - new_phen_path = Path(phen_dir) - - # Load report (FOR SOME REASON THIS WAS APPEND SO SET TO w for NOW) - report_file_name = old_phen_path.name + "_diff.md" - report_path = new_phen_path / report_file_name - report = open(report_path, 'w') - logger.debug(f"Writing to report file {str(report_path.resolve())}") - - # Get maps files from phenotype - old_map_path = old_phen_path / MAP_DIR - new_map_path = new_phen_path / MAP_DIR - - # List files from output directories - old_output_files = [file.name for file in old_map_path.iterdir() if file.is_file() and not file.name.startswith('.')] - new_output_files = [file.name for file in new_map_path.iterdir() if file.is_file() and not file.name.startswith('.')] - - # Convert the lists to sets for easy comparison - old_output_set = set(old_output_files) - new_output_set = set(new_output_files) - - # Outputs that are in old_output_set but not in new_output_set (removed files) - removed_outputs = old_output_set - new_output_set - # Outputs that are in new_output_set but not in old_output_set (added files) - added_outputs = new_output_set - old_output_set - # Outputs that are the intersection of old_output_set and new_output_set - common_outputs = old_output_set & new_output_set - - # Write outputs report - new_config_path = new_phen_path / CONFIG_FILE - new_config = json.load(open(new_config_path, "rb")) - report.write(f"\n\n# Report for version {new_config['concept_sets']['version']}\n\n") - report.write(f"- Removed outputs: {list(removed_outputs)}\n") - report.write(f"- Added outputs: {list(added_outputs)}\n") - report.write(f"- Common outputs: {list(common_outputs)}\n") - - report.write(f"\n\n## Compare concepts {str(old_phen_path.resolve())} to {str(new_phen_path.resolve())}\n\n") - # Compare common outputs between versions - for file in common_outputs: - old_output = old_map_path / file - new_output = new_map_path / file - - df1 = pd.read_csv(old_output) - df1 = df1[["CONCEPT","CONCEPT_SET"]].groupby("CONCEPT_SET").count() - df2 = pd.read_csv(new_output) - df2 = df2[["CONCEPT","CONCEPT_SET"]].groupby("CONCEPT_SET").count() - - # Check for added and removed concepts - report.write("- Removed concepts {}\n".format(list(set(df1.index) - set(df2.index)))) - report.write("- Added concepts {}\n".format(list(set(df2.index) - set(df1.index)))) - - # Check for changed concepts - diff = df2 - df1 #diff in counts - diff = diff[(~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna()] #get non-zero counts - s = "\n" - if len(diff.index) > 0: - for concept, row in diff.iterrows(): - s += "\t - {} {}\n".format(concept, row["CONCEPT"]) - report.write(f"- Changed concepts {s}\n\n") - else: - report.write(f"- Changed concepts []\n\n") - - logger.info(f"Phenotypes diff'd successfully") + """Compare the differences between two versions of a phenotype""" + + # validate phenotype directories + validate(phen_old_dir) + validate(phen_dir) + + old_phen_path = Path(phen_old_dir) + new_phen_path = Path(phen_dir) + + # Load report (FOR SOME REASON THIS WAS APPEND SO SET TO w for NOW) + report_file_name = old_phen_path.name + "_diff.md" + report_path = new_phen_path / report_file_name + report = open(report_path, "w") + logger.debug(f"Writing to report file {str(report_path.resolve())}") + + # Get maps files from phenotype + old_map_path = old_phen_path / MAP_DIR + new_map_path = new_phen_path / MAP_DIR + + # List files from output directories + old_output_files = [ + file.name + for file in old_map_path.iterdir() + if file.is_file() and not file.name.startswith(".") + ] + new_output_files = [ + file.name + for file in new_map_path.iterdir() + if file.is_file() and not file.name.startswith(".") + ] + + # Convert the lists to sets for easy comparison + old_output_set = set(old_output_files) + new_output_set = set(new_output_files) + + # Outputs that are in old_output_set but not in new_output_set (removed files) + removed_outputs = old_output_set - new_output_set + # Outputs that are in new_output_set but not in old_output_set (added files) + added_outputs = new_output_set - old_output_set + # Outputs that are the intersection of old_output_set and new_output_set + common_outputs = old_output_set & new_output_set + + # Write outputs report + new_config_path = new_phen_path / CONFIG_FILE + new_config = json.load(open(new_config_path, "rb")) + report.write( + f"\n\n# Report for version {new_config['concept_sets']['version']}\n\n" + ) + report.write(f"- Removed outputs: {list(removed_outputs)}\n") + report.write(f"- Added outputs: {list(added_outputs)}\n") + report.write(f"- Common outputs: {list(common_outputs)}\n") + + report.write( + f"\n\n## Compare concepts {str(old_phen_path.resolve())} to {str(new_phen_path.resolve())}\n\n" + ) + # Compare common outputs between versions + for file in common_outputs: + old_output = old_map_path / file + new_output = new_map_path / file + + df1 = pd.read_csv(old_output) + df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() + df2 = pd.read_csv(new_output) + df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() + + # Check for added and removed concepts + report.write( + "- Removed concepts {}\n".format(list(set(df1.index) - set(df2.index))) + ) + report.write( + "- Added concepts {}\n".format(list(set(df2.index) - set(df1.index))) + ) + + # Check for changed concepts + diff = df2 - df1 # diff in counts + diff = diff[ + (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna() + ] # get non-zero counts + s = "\n" + if len(diff.index) > 0: + for concept, row in diff.iterrows(): + s += "\t - {} {}\n".format(concept, row["CONCEPT"]) + report.write(f"- Changed concepts {s}\n\n") + else: + report.write(f"- Changed concepts []\n\n") + + logger.info(f"Phenotypes diff'd successfully") + # Here's the atlas code that needs to go into anotehr function -# if output_path == "atlas": -# vocab_id = summary_config["omop"]["vocabulary_id"] -# vocab_version = summary_config["version"] -# vocab_name = summary_config["omop"]["vocabulary_name"] -# vocab_reference = summary_config["omop"]["vocabulary_reference"] - - # Create New OMOP Vocabulary -# omop_setup(OMOP_DB_PATH, vocab_id, vocab_version, vocab_name, vo#cab_reference) - - # Export to DB -# omop_publish_concept_sets(out, -# OMOP_DB_PATH, -# vocab_id, -# omop_vocab_types[target_code_type], -# vocab_version,) \ No newline at end of file +# if output_path == "atlas": +# vocab_id = summary_config["omop"]["vocabulary_id"] +# vocab_version = summary_config["version"] +# vocab_name = summary_config["omop"]["vocabulary_name"] +# vocab_reference = summary_config["omop"]["vocabulary_reference"] + +# Create New OMOP Vocabulary +# omop_setup(OMOP_DB_PATH, vocab_id, vocab_version, vocab_name, vo#cab_reference) + +# Export to DB +# omop_publish_concept_sets(out, +# OMOP_DB_PATH, +# vocab_id, +# omop_vocab_types[target_code_type], +# vocab_version,) diff --git a/acmc/trud.py b/acmc/trud.py index d014f7e05b551f84dde0936e09cccfb65240d21c..57a0afe4f8d9f2393011a3a5bc832ea67ba4e7dc 100644 --- a/acmc/trud.py +++ b/acmc/trud.py @@ -12,20 +12,23 @@ from pathlib import Path # setup logging import acmc.logging_config as lc + logger = lc.setup_logger() # Constants FQDN = "isd.digital.nhs.uk" -VOCAB_PATH = Path('./vocab/trud') -VERSION_FILE = 'trud_version.yaml' +VOCAB_PATH = Path("./vocab/trud") +VERSION_FILE = "trud_version.yaml" VERSION_PATH = VOCAB_PATH / VERSION_FILE -DOWNLOADS_PATH = VOCAB_PATH / 'downloads' -PROCESSED_PATH = VOCAB_PATH / 'processed' +DOWNLOADS_PATH = VOCAB_PATH / "downloads" +PROCESSED_PATH = VOCAB_PATH / "processed" + def error_exit(message): logger.error(message, "error") sys.exit(1) + def get_releases(item_id, API_KEY, latest=False): """Retrieve release information for an item from the TRUD API.""" url = f"https://{FQDN}/trud/api/v1/keys/{API_KEY}/items/{item_id}/releases" @@ -34,7 +37,9 @@ def get_releases(item_id, API_KEY, latest=False): response = requests.get(url) if response.status_code != 200: - error_exit(f"Failed to fetch releases for item {item_id}. Status code: {response.status_code}, error {response.json()['message']}. If no releases found for API key, please ensure you are subscribed to the data release and that it is not pending approval") + error_exit( + f"Failed to fetch releases for item {item_id}. Status code: {response.status_code}, error {response.json()['message']}. If no releases found for API key, please ensure you are subscribed to the data release and that it is not pending approval" + ) data = response.json() if data.get("message") != "OK": @@ -42,219 +47,281 @@ def get_releases(item_id, API_KEY, latest=False): return data.get("releases", []) -def download_release_file(item_id, release_ordinal, release, file_json_prefix, file_type=None): + +def download_release_file( + item_id, release_ordinal, release, file_json_prefix, file_type=None +): """Download specified file type for a given release of an item.""" # check folder is a directory if not DOWNLOADS_PATH.is_dir(): - raise NotADirectoryError(f"Error: '{DOWNLOADS_PATH}' for TRUD resources is not a directory") - + raise NotADirectoryError( + f"Error: '{DOWNLOADS_PATH}' for TRUD resources is not a directory" + ) + file_type = file_type or file_json_prefix file_url = release.get(f"{file_json_prefix}FileUrl") file_name = release.get(f"{file_json_prefix}FileName") - file_destination = DOWNLOADS_PATH / file_name + file_destination = DOWNLOADS_PATH / file_name if not file_url or not file_name: - error_exit(f"Missing {file_type} file information for release {release_ordinal} of item {item_id}.") + error_exit( + f"Missing {file_type} file information for release {release_ordinal} of item {item_id}." + ) - logger.info(f"Downloading item {item_id} {file_type} file: {file_name} from {file_url} to {file_destination}") + logger.info( + f"Downloading item {item_id} {file_type} file: {file_name} from {file_url} to {file_destination}" + ) response = requests.get(file_url, stream=True) - + if response.status_code == 200: with open(file_destination, "wb") as f: f.write(response.content) return file_destination else: - error_exit(f"Failed to download {file_type} file for item {item_id}. Status code: {response.status_code}") + error_exit( + f"Failed to download {file_type} file for item {item_id}. Status code: {response.status_code}" + ) -def validate_download_hash(file_destination:str, item_hash:str): + +def validate_download_hash(file_destination: str, item_hash: str): with open(file_destination, "rb") as f: hash = hashlib.sha256(f.read()).hexdigest() logger.debug(hash) if hash.upper() == item_hash.upper(): logger.debug(f"Verified hash of {file_destination} {hash}") else: - error_exit(f"Could not validate origin of {file_destination}. The SHA-256 hash should be: {item_hash}, but got {hash} instead") + error_exit( + f"Could not validate origin of {file_destination}. The SHA-256 hash should be: {item_hash}, but got {hash} instead" + ) + -def unzip_download(file_destination:str): +def unzip_download(file_destination: str): # check folder is a directory if not DOWNLOADS_PATH.is_dir(): - raise NotADirectoryError(f"Error: '{DOWNLOADS_PATH}' for TRUD resoruces is not a directory") - - with zipfile.ZipFile(file_destination, 'r') as zip_ref: + raise NotADirectoryError( + f"Error: '{DOWNLOADS_PATH}' for TRUD resoruces is not a directory" + ) + + with zipfile.ZipFile(file_destination, "r") as zip_ref: zip_ref.extractall(DOWNLOADS_PATH) + def extract_icd10(): - #ICD10_edition5 - file_path = DOWNLOADS_PATH / 'ICD10_Edition5_XML_20160401' / 'Content' / 'ICD10_Edition5_CodesAndTitlesAndMetadata_GB_20160401.xml' + # ICD10_edition5 + file_path = ( + DOWNLOADS_PATH + / "ICD10_Edition5_XML_20160401" + / "Content" + / "ICD10_Edition5_CodesAndTitlesAndMetadata_GB_20160401.xml" + ) df = pd.read_xml(file_path) df = df[["CODE", "ALT_CODE", "DESCRIPTION"]] - df = df.rename(columns={"CODE":"icd10", - "ALT_CODE":"icd10_alt", - "DESCRIPTION":"description" - }) - output_path = PROCESSED_PATH / 'icd10.parquet' + df = df.rename( + columns={"CODE": "icd10", "ALT_CODE": "icd10_alt", "DESCRIPTION": "description"} + ) + output_path = PROCESSED_PATH / "icd10.parquet" df.to_parquet(output_path, index=False) logger.info(f"Extracted: {output_path}") + def extract_opsc4(): - file_path = DOWNLOADS_PATH / 'OPCS410 Data files txt' / 'OPCS410 CodesAndTitles Nov 2022 V1.0.txt' - - df = pd.read_csv(file_path, sep='\t', dtype=str, header=None) - df = df.rename(columns={0:"opcs4", 1:"description"}) - - output_path = PROCESSED_PATH / 'opcs4.parquet' + file_path = ( + DOWNLOADS_PATH + / "OPCS410 Data files txt" + / "OPCS410 CodesAndTitles Nov 2022 V1.0.txt" + ) + + df = pd.read_csv(file_path, sep="\t", dtype=str, header=None) + df = df.rename(columns={0: "opcs4", 1: "description"}) + + output_path = PROCESSED_PATH / "opcs4.parquet" df.to_parquet(output_path, index=False) logger.info(f"Extracted: {output_path}") + def extract_nhs_data_migrations(): - #NHS Data Migrations - - #snomed only - file_path = DOWNLOADS_PATH / 'Mapping Tables' / 'Updated' / 'Clinically Assured' / 'sctcremap_uk_20200401000001.txt' - df = pd.read_csv(file_path, sep='\t') + # NHS Data Migrations + + # snomed only + file_path = ( + DOWNLOADS_PATH + / "Mapping Tables" + / "Updated" + / "Clinically Assured" + / "sctcremap_uk_20200401000001.txt" + ) + df = pd.read_csv(file_path, sep="\t") df = df[["SCT_CONCEPTID"]] - df = df.rename(columns={"SCT_CONCEPTID":"snomed"}) + df = df.rename(columns={"SCT_CONCEPTID": "snomed"}) df = df.drop_duplicates() df = df.astype(str) - output_path = PROCESSED_PATH / 'snomed.parquet' + output_path = PROCESSED_PATH / "snomed.parquet" df.to_parquet(output_path, index=False) logger.info(f"Extracted: {output_path}") - - #r2 -> r3 - file_path = DOWNLOADS_PATH / 'Mapping Tables' / 'Updated' / 'Clinically Assured' / 'rctctv3map_uk_20200401000001.txt' - df = pd.read_csv(file_path, sep='\t') + + # r2 -> r3 + file_path = ( + DOWNLOADS_PATH + / "Mapping Tables" + / "Updated" + / "Clinically Assured" + / "rctctv3map_uk_20200401000001.txt" + ) + df = pd.read_csv(file_path, sep="\t") df = df[["V2_CONCEPTID", "CTV3_CONCEPTID"]] - df = df.rename(columns={"V2_CONCEPTID":"read2", - "CTV3_CONCEPTID":"read3"}) + df = df.rename(columns={"V2_CONCEPTID": "read2", "CTV3_CONCEPTID": "read3"}) - output_path = PROCESSED_PATH / 'read2_to_read3.parquet' + output_path = PROCESSED_PATH / "read2_to_read3.parquet" df.to_parquet(output_path, index=False) logger.info(f"Extracted: {output_path}") - #r3->r2 - file_path = DOWNLOADS_PATH / 'Mapping Tables' / 'Updated' / 'Clinically Assured' / 'ctv3rctmap_uk_20200401000002.txt' - df = pd.read_csv(file_path, sep='\t') + # r3->r2 + file_path = ( + DOWNLOADS_PATH + / "Mapping Tables" + / "Updated" + / "Clinically Assured" + / "ctv3rctmap_uk_20200401000002.txt" + ) + df = pd.read_csv(file_path, sep="\t") df = df[["CTV3_CONCEPTID", "V2_CONCEPTID"]] - df = df.rename(columns={"CTV3_CONCEPTID":"read3", - "V2_CONCEPTID":"read2"}) + df = df.rename(columns={"CTV3_CONCEPTID": "read3", "V2_CONCEPTID": "read2"}) df = df.drop_duplicates() - df = df[~df["read2"].str.match("^.*_.*$")] #remove r2 codes with '_' + df = df[~df["read2"].str.match("^.*_.*$")] # remove r2 codes with '_' - output_path = PROCESSED_PATH / 'read3_to_read2.parquet' + output_path = PROCESSED_PATH / "read3_to_read2.parquet" df.to_parquet(output_path, index=False) logger.info(f"Extracted: {output_path}") - - #r2 -> snomed - file_path = DOWNLOADS_PATH / 'Mapping Tables' / 'Updated' / 'Clinically Assured' / 'rcsctmap2_uk_20200401000001.txt' - df = pd.read_csv(file_path, sep='\t', dtype=str) + + # r2 -> snomed + file_path = ( + DOWNLOADS_PATH + / "Mapping Tables" + / "Updated" + / "Clinically Assured" + / "rcsctmap2_uk_20200401000001.txt" + ) + df = pd.read_csv(file_path, sep="\t", dtype=str) df = df[["ReadCode", "ConceptId"]] - df = df.rename(columns={"ReadCode":"read2", - "ConceptId":"snomed"}) + df = df.rename(columns={"ReadCode": "read2", "ConceptId": "snomed"}) - output_path = PROCESSED_PATH / 'read2_to_snomed.parquet' + output_path = PROCESSED_PATH / "read2_to_snomed.parquet" df.to_parquet(output_path, index=False) logger.info(f"Extracted: {output_path}") - #r3->snomed - file_path = DOWNLOADS_PATH / 'Mapping Tables' / 'Updated' / 'Clinically Assured' / 'ctv3sctmap2_uk_20200401000001.txt' - df = pd.read_csv(file_path, sep='\t', dtype=str) + # r3->snomed + file_path = ( + DOWNLOADS_PATH + / "Mapping Tables" + / "Updated" + / "Clinically Assured" + / "ctv3sctmap2_uk_20200401000001.txt" + ) + df = pd.read_csv(file_path, sep="\t", dtype=str) df = df[["CTV3_TERMID", "SCT_CONCEPTID"]] - df = df.rename(columns={"CTV3_TERMID":"read3", - "SCT_CONCEPTID":"snomed"}) + df = df.rename(columns={"CTV3_TERMID": "read3", "SCT_CONCEPTID": "snomed"}) df["snomed"] = df["snomed"].astype(str) - df = df[~df["snomed"].str.match("^.*_.*$")] #remove snomed codes with '_' + df = df[~df["snomed"].str.match("^.*_.*$")] # remove snomed codes with '_' - output_path = PROCESSED_PATH / 'read3_to_snomed.parquet' + output_path = PROCESSED_PATH / "read3_to_snomed.parquet" df.to_parquet(output_path, index=False) - logger.info(f"Extracted: {output_path}") + logger.info(f"Extracted: {output_path}") + def extract_nhs_read_browser(): - #r2 only - input_path = DOWNLOADS_PATH / 'Standard' / 'V2' / 'ANCESTOR.DBF' + # r2 only + input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ANCESTOR.DBF" df = simpledbf.Dbf5(input_path).to_dataframe() - df = pd.concat([df['READCODE'], df['DESCENDANT']]) + df = pd.concat([df["READCODE"], df["DESCENDANT"]]) df = pd.DataFrame(df.drop_duplicates()) - df = df.rename(columns={0:"read2"}) - output_path = PROCESSED_PATH / 'read2.parquet' + df = df.rename(columns={0: "read2"}) + output_path = PROCESSED_PATH / "read2.parquet" df.to_parquet(output_path, index=False) - logger.info(f"Extracted: {output_path}") + logger.info(f"Extracted: {output_path}") - #r2 -> atc - input_path = DOWNLOADS_PATH / 'Standard' / 'V2' / 'ATC.DBF' + # r2 -> atc + input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ATC.DBF" df = simpledbf.Dbf5(input_path).to_dataframe() df = df[["READCODE", "ATC"]] - df = df.rename(columns={"READCODE":"read2", "ATC":"atc"}) - output_path = PROCESSED_PATH / 'read2_to_atc.parquet' + df = df.rename(columns={"READCODE": "read2", "ATC": "atc"}) + output_path = PROCESSED_PATH / "read2_to_atc.parquet" df.to_parquet(output_path, index=False) - logger.info(f"Extracted: {output_path}") + logger.info(f"Extracted: {output_path}") - #r2 -> icd10 - input_path = DOWNLOADS_PATH / 'Standard' / 'V2' / 'ICD10.DBF' - df = simpledbf.Dbf5(input_path).to_dataframe() + # r2 -> icd10 + input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ICD10.DBF" + df = simpledbf.Dbf5(input_path).to_dataframe() df = df[["READ_CODE", "TARG_CODE"]] - df = df.rename(columns={"READ_CODE":"read2", "TARG_CODE":"icd10"}) - df = df[~df["icd10"].str.match("^.*-.*$")] #remove codes with '-' - df = df[~df["read2"].str.match("^.*-.*$")] #remove codes with '-' - output_path = PROCESSED_PATH / 'read2_to_icd10.parquet' + df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "icd10"}) + df = df[~df["icd10"].str.match("^.*-.*$")] # remove codes with '-' + df = df[~df["read2"].str.match("^.*-.*$")] # remove codes with '-' + output_path = PROCESSED_PATH / "read2_to_icd10.parquet" df.to_parquet(output_path, index=False) - logger.info(f"Extracted: {output_path}") + logger.info(f"Extracted: {output_path}") - #r2 -> opcs4 - input_path = DOWNLOADS_PATH / 'Standard' / 'V2' / 'OPCS4V3.DBF' - df = simpledbf.Dbf5(input_path).to_dataframe() + # r2 -> opcs4 + input_path = DOWNLOADS_PATH / "Standard" / "V2" / "OPCS4V3.DBF" + df = simpledbf.Dbf5(input_path).to_dataframe() df = df[["READ_CODE", "TARG_CODE"]] - df = df.rename(columns={"READ_CODE":"read2", "TARG_CODE":"opcs4"}) - df = df[~df["opcs4"].str.match("^.*-.*$")] #remove codes with '-' - df = df[~df["read2"].str.match("^.*-.*$")] #remove codes with '-' - output_path = PROCESSED_PATH / 'read2_to_opcs4.parquet' + df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "opcs4"}) + df = df[~df["opcs4"].str.match("^.*-.*$")] # remove codes with '-' + df = df[~df["read2"].str.match("^.*-.*$")] # remove codes with '-' + output_path = PROCESSED_PATH / "read2_to_opcs4.parquet" df.to_parquet(output_path, index=False) - logger.info(f"Extracted: {output_path}") + logger.info(f"Extracted: {output_path}") - #r3 only - input_path = DOWNLOADS_PATH / 'Standard' / 'V3' / 'ANCESTOR.DBF' - df = simpledbf.Dbf5(input_path).to_dataframe() - df = pd.concat([df['READCODE'], df['DESCENDANT']]) + # r3 only + input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ANCESTOR.DBF" + df = simpledbf.Dbf5(input_path).to_dataframe() + df = pd.concat([df["READCODE"], df["DESCENDANT"]]) df = pd.DataFrame(df.drop_duplicates()) - df = df.rename(columns={0:"read3"}) - output_path = PROCESSED_PATH / 'read3.parquet' + df = df.rename(columns={0: "read3"}) + output_path = PROCESSED_PATH / "read3.parquet" df.to_parquet(output_path, index=False) - logger.info(f"Extracted: {output_path}") + logger.info(f"Extracted: {output_path}") - #r3 -> icd10 - input_path = DOWNLOADS_PATH / 'Standard' / 'V3' / 'ICD10.DBF' + # r3 -> icd10 + input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ICD10.DBF" df = simpledbf.Dbf5(input_path).to_dataframe() df = df[["READ_CODE", "TARG_CODE"]] - df = df.rename(columns={"READ_CODE":"read3", "TARG_CODE":"icd10"}) - df = df[~df["icd10"].str.match("^.*-.*$")] #remove codes with '-' - df = df[~df["read3"].str.match("^.*-.*$")] #remove codes with '-' - output_path = PROCESSED_PATH / 'read3_to_icd10.parquet' + df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "icd10"}) + df = df[~df["icd10"].str.match("^.*-.*$")] # remove codes with '-' + df = df[~df["read3"].str.match("^.*-.*$")] # remove codes with '-' + output_path = PROCESSED_PATH / "read3_to_icd10.parquet" df.to_parquet(output_path, index=False) - logger.info(f"Extracted: {output_path}") + logger.info(f"Extracted: {output_path}") - #r3 -> icd9 + # r3 -> icd9 # dbf = simpledbf.Dbf5('build/maps/downloads/Standard/V3/ICD9V3.DBF') - #r3 -> opcs4 - input_path = DOWNLOADS_PATH / 'Standard' / 'V3' / 'OPCS4V3.DBF' + # r3 -> opcs4 + input_path = DOWNLOADS_PATH / "Standard" / "V3" / "OPCS4V3.DBF" df = simpledbf.Dbf5(input_path).to_dataframe() df = df[["READ_CODE", "TARG_CODE"]] - df = df.rename(columns={"READ_CODE":"read3", "TARG_CODE":"opcs4"}) - df = df[~df["opcs4"].str.match("^.*-.*$")] #remove codes with '-' - df = df[~df["read3"].str.match("^.*-.*$")] #remove codes with '-' - output_path = PROCESSED_PATH / 'read3_to_opcs4.parquet' + df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "opcs4"}) + df = df[~df["opcs4"].str.match("^.*-.*$")] # remove codes with '-' + df = df[~df["read3"].str.match("^.*-.*$")] # remove codes with '-' + output_path = PROCESSED_PATH / "read3_to_opcs4.parquet" df.to_parquet(output_path, index=False) - logger.info(f"Extracted: {output_path}") + logger.info(f"Extracted: {output_path}") + def create_map_directories(): - """Create map directories.""" + """Create map directories.""" # Check if build directory exists - create_map_dirs = False - if VOCAB_PATH.exists(): - user_input = input(f"The map directory {VOCAB_PATH} already exists. Do you want to download and process trud data again? (y/n): ").strip().lower() + create_map_dirs = False + if VOCAB_PATH.exists(): + user_input = ( + input( + f"The map directory {VOCAB_PATH} already exists. Do you want to download and process trud data again? (y/n): " + ) + .strip() + .lower() + ) if user_input == "y": # delete all build files shutil.rmtree(VOCAB_PATH) @@ -263,92 +330,97 @@ def create_map_directories(): logger.info("Exiting TRUD installation") sys.exit(0) else: - create_map_dirs = True + create_map_dirs = True if create_map_dirs: # create maps directories VOCAB_PATH.mkdir(parents=True, exist_ok=True) - DOWNLOADS_PATH.mkdir(parents=True, exist_ok=True) - PROCESSED_PATH.mkdir(parents=True,exist_ok=True) + DOWNLOADS_PATH.mkdir(parents=True, exist_ok=True) + PROCESSED_PATH.mkdir(parents=True, exist_ok=True) + def install(): - logger.info(f"Installing TRUD") - - # get TRUD api key from environment variable - api_key = os.getenv("ACMC_TRUD_API_KEY") - if not api_key: - raise ValueError("TRUD API KEY not found. Set the ACMC_TRUD_API_KEY environment variable.") - - create_map_directories() - - items_latest = True - items = [ - { - "id": 259, - "name": "NHS ICD-10 5th Edition XML data files", - "hash": "A4F7BBA6E86349AADD0F4696C5E91152EB99CC06121427FC359160439B9F883F", - "extract": extract_icd10, - }, - { - "id": 119, - "name": "OPCS-4 data files", - "hash": "0615A2BF43FFEF94517F1D1E0C05493B627839F323F22C52CBCD8B40BF767CD3", - "extract": extract_opsc4, - }, - { - "id": 9, - "name": "NHS Data Migration", - "hash": "D4317B3ADBA6E1247CF17F0B7CD2B8850FD36C0EA2923BF684EA6159F3A54765", - "extract": extract_nhs_data_migrations, - }, - { - "id": 8, - "name": "NHS Read Browser", - "hash": "1FFF2CBF11D0E6D7FC6CC6F13DD52D2F459095C3D83A3F754E6C359F16913C5E", - "extract": extract_nhs_read_browser, - } - # TODO: Download BNF from separate site? https://www.nhsbsa.nhs.uk/sites/default/files/2024-10/BNF%20Snomed%20Mapping%20data%2020241016.zip - ] - - # remove function from items to save versions - data = [{k: v for k, v in d.items() if k != "extract"} for d in items] - # save TRUD versions to file to main record of what was downloaded - with open(VERSION_PATH, "w") as file: - yaml.dump(data, file, default_flow_style=False, sort_keys=False) - - # Validate and process each item ID - for item in items: - item_id = item["id"] - logger.info(f"--- {item['name']} ---") - - releases = get_releases(item_id, API_KEY=api_key, latest=items_latest) - if not releases: - error_exit(f"No releases found for item {item_id}.") - - # Process each release in reverse order - for release_ordinal, release in enumerate(releases[::-1], 1): - # Download archive file - file_destination = download_release_file(item_id, release_ordinal, release, "archive") - - # Optional files - # if items.checksum: - # download_release_file(item["id"], release_ordinal, release, "checksum") - # if items.signature: - # download_release_file(item["id"], release_ordinal, release, "signature") - # if items.public_key: - # download_release_file(item["id"], release_ordinal, release, "publicKey", "public key") - - #Verify Hash if available - if "hash" in item: - validate_download_hash(file_destination, item["hash"]) - - #Unzip downloaded .zip - unzip_download(file_destination) - - #Extract Tables to parquet - if "extract" in item: - item["extract"]() - - logger.info(f"Downloaded {release_ordinal} release(s) for item {item_id}.") - - logger.info(f"TRUD installation completed") \ No newline at end of file + logger.info(f"Installing TRUD") + + # get TRUD api key from environment variable + api_key = os.getenv("ACMC_TRUD_API_KEY") + if not api_key: + raise ValueError( + "TRUD API KEY not found. Set the ACMC_TRUD_API_KEY environment variable." + ) + + create_map_directories() + + items_latest = True + items = [ + { + "id": 259, + "name": "NHS ICD-10 5th Edition XML data files", + "hash": "A4F7BBA6E86349AADD0F4696C5E91152EB99CC06121427FC359160439B9F883F", + "extract": extract_icd10, + }, + { + "id": 119, + "name": "OPCS-4 data files", + "hash": "0615A2BF43FFEF94517F1D1E0C05493B627839F323F22C52CBCD8B40BF767CD3", + "extract": extract_opsc4, + }, + { + "id": 9, + "name": "NHS Data Migration", + "hash": "D4317B3ADBA6E1247CF17F0B7CD2B8850FD36C0EA2923BF684EA6159F3A54765", + "extract": extract_nhs_data_migrations, + }, + { + "id": 8, + "name": "NHS Read Browser", + "hash": "1FFF2CBF11D0E6D7FC6CC6F13DD52D2F459095C3D83A3F754E6C359F16913C5E", + "extract": extract_nhs_read_browser, + }, + # TODO: Download BNF from separate site? https://www.nhsbsa.nhs.uk/sites/default/files/2024-10/BNF%20Snomed%20Mapping%20data%2020241016.zip + ] + + # remove function from items to save versions + data = [{k: v for k, v in d.items() if k != "extract"} for d in items] + # save TRUD versions to file to main record of what was downloaded + with open(VERSION_PATH, "w") as file: + yaml.dump(data, file, default_flow_style=False, sort_keys=False) + + # Validate and process each item ID + for item in items: + item_id = item["id"] + logger.info(f"--- {item['name']} ---") + + releases = get_releases(item_id, API_KEY=api_key, latest=items_latest) + if not releases: + error_exit(f"No releases found for item {item_id}.") + + # Process each release in reverse order + for release_ordinal, release in enumerate(releases[::-1], 1): + # Download archive file + file_destination = download_release_file( + item_id, release_ordinal, release, "archive" + ) + + # Optional files + # if items.checksum: + # download_release_file(item["id"], release_ordinal, release, "checksum") + # if items.signature: + # download_release_file(item["id"], release_ordinal, release, "signature") + # if items.public_key: + # download_release_file(item["id"], release_ordinal, release, "publicKey", "public key") + + # Verify Hash if available + if "hash" in item: + validate_download_hash(file_destination, item["hash"]) + + # Unzip downloaded .zip + unzip_download(file_destination) + + # Extract Tables to parquet + if "extract" in item: + item["extract"]() + + logger.info(f"Downloaded {release_ordinal} release(s) for item {item_id}.") + + logger.info(f"TRUD installation completed") diff --git a/docs/index.md b/docs/index.md index 35182069a02f2e56ffe7cba91995463fec0a2ece..07791b90d48d86862acf4fd0ba14ba9897535ead 100644 --- a/docs/index.md +++ b/docs/index.md @@ -2,8 +2,10 @@ ### Workflow + The high level steps to use the tools are outlined below: + **1. Define concept sets:** A domain expert defines a list of [concept sets](#defining-concept-sets) for each observable characteristic of the phenotype using CSV file format (e.g., `PHEN_concept_sets.csv`). **2. Define concept code lists for concept sets:** A domain expert defines [code lists](#defining-concept-codes) for each concept set within the phenotype using supported coding list formats and stores them in the `/src` directory. diff --git a/pyproject.toml b/pyproject.toml index fc4377e263b37f2ba3f05d8e71bbf93e8fd2ff51..951658810ee1ac97c4c31dbfb1efb8155cae9a52 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,16 +28,19 @@ dependencies = [ "lxml", "numpy", "openpyxl", + "pandas-stubs", "pluggy", "pyarrow", "pyomop", "tables", "pytest", + "pyyaml", "requests", "simpledbf", "smmap", "sqlalchemy", - "pyyaml" + "types-PyYAML", + "types-requests" ] [project.scripts] @@ -59,6 +62,7 @@ dependencies = [ [tool.hatch.envs.dev] dependencies = [ + "pydocstyle", "pytest", "black", "mypy" @@ -70,4 +74,7 @@ include = ["acmc/**"] # Ensure only the acmc package is included [tool.hatch.build.targets.sdist] include = [ "acmc/**", -] \ No newline at end of file +] + +[tool.mypy] +ignore_missing_imports = true \ No newline at end of file diff --git a/tests/test_acmc.py b/tests/test_acmc.py index b53f97c9fb543acec7481f99856786659f49aade..ecdadaefdea9fa514b6d6d7797cebdf5868f3f17 100644 --- a/tests/test_acmc.py +++ b/tests/test_acmc.py @@ -10,83 +10,133 @@ from acmc import trud, omop, main, logging_config as lc # setup logging logger = lc.setup_logger() + @pytest.fixture def tmp_dir(): - # Setup tmp directory - temp_dir = Path("./tests/tmp") - temp_dir.mkdir(parents=True, exist_ok=True) - - # Yield the directory path to the test function - yield temp_dir - - # Remove the directory after the test finishes - shutil.rmtree(temp_dir) + # Setup tmp directory + temp_dir = Path("./tests/tmp") + temp_dir.mkdir(parents=True, exist_ok=True) + + # Yield the directory path to the test function + yield temp_dir + + # Remove the directory after the test finishes + shutil.rmtree(temp_dir) + @pytest.fixture def logger(): - logger = logging.getLogger('acmc_logger') - logger.setLevel(logging.DEBUG) - stream_handler = logging.StreamHandler(sys.stdout) - logger.addHandler(stream_handler) + logger = logging.getLogger("acmc_logger") + logger.setLevel(logging.DEBUG) + stream_handler = logging.StreamHandler(sys.stdout) + logger.addHandler(stream_handler) + def test_phen_init_local_specified(tmp_dir, monkeypatch, caplog): - with caplog.at_level(logging.DEBUG): - phen_path = tmp_dir / "phen" - monkeypatch.setattr(sys, "argv", ["main.py", "phen", "init", "-d", str(phen_path.resolve())]) - # Mock input() to return "yes" to the question about reinitialising the directory - monkeypatch.setattr("builtins.input", lambda _: "y") - main.main() - assert "Phenotype initialised successfully" in caplog.text + with caplog.at_level(logging.DEBUG): + phen_path = tmp_dir / "phen" + monkeypatch.setattr( + sys, "argv", ["main.py", "phen", "init", "-d", str(phen_path.resolve())] + ) + # Mock input() to return "yes" to the question about reinitialising the directory + monkeypatch.setattr("builtins.input", lambda _: "y") + main.main() + assert "Phenotype initialised successfully" in caplog.text + def test_phen_workflow(tmp_dir, monkeypatch, caplog): - with caplog.at_level(logging.DEBUG): - phen_path = tmp_dir / "phen" - phen_path = phen_path.resolve() - monkeypatch.setattr(sys, "argv", ["main.py", "phen", "init", "-d", str(phen_path.resolve())]) - # Mock input() to return "yes" to the question about reinitialising the directory - monkeypatch.setattr("builtins.input", lambda _: "y") - main.main() - assert "Phenotype initialised successfully" in caplog.text - - with caplog.at_level(logging.DEBUG): - # validate phenotype - # copy examples across - shutil.rmtree(phen_path / 'codes') - ex_path = Path('./examples').resolve() - for item in ex_path.iterdir(): - source = ex_path / item.name - destination = phen_path / item.name - if source.is_dir(): - shutil.copytree(source, destination) - else: - shutil.copy(source, destination) - - monkeypatch.setattr(sys, "argv", ["main.py", "phen", "validate", "-d", str(phen_path.resolve())]) - main.main() - assert "Phenotype validated successfully" in caplog.text - - # map phenotype - for code_type in ["read2", "read3", "snomed"]: - with caplog.at_level(logging.DEBUG): - monkeypatch.setattr(sys, "argv", ["main.py", "phen", "map", "-d", str(phen_path.resolve()), "-t", code_type]) - main.main() - assert "Phenotype processed successfully" in caplog.text - - # publish phenotype - with caplog.at_level(logging.DEBUG): - monkeypatch.setattr(sys, "argv", ["main.py", "phen", "publish", "-d", str(phen_path.resolve())]) - main.main() - assert "Phenotype published successfully" in caplog.text - - # copy phenotype' - with caplog.at_level(logging.DEBUG): - monkeypatch.setattr(sys, "argv", ["main.py", "phen", "copy", "-d", str(phen_path.resolve()), "-td", str(tmp_dir.resolve()), "-v", "v1.0.3"]) - main.main() - assert "Phenotype copied successfully" in caplog.text - - # diff phenotype - with caplog.at_level(logging.DEBUG): - old_path = tmp_dir / "v1.0.3" - monkeypatch.setattr(sys, "argv", ["main.py", "phen", "diff", "-d", str(phen_path.resolve()), "-old", str(old_path.resolve())]) - main.main() - assert "Phenotypes diff'd successfully" in caplog.text + with caplog.at_level(logging.DEBUG): + phen_path = tmp_dir / "phen" + phen_path = phen_path.resolve() + monkeypatch.setattr( + sys, "argv", ["main.py", "phen", "init", "-d", str(phen_path.resolve())] + ) + # Mock input() to return "yes" to the question about reinitialising the directory + monkeypatch.setattr("builtins.input", lambda _: "y") + main.main() + assert "Phenotype initialised successfully" in caplog.text + + with caplog.at_level(logging.DEBUG): + # validate phenotype + # copy examples across + shutil.rmtree(phen_path / "codes") + ex_path = Path("./examples").resolve() + for item in ex_path.iterdir(): + source = ex_path / item.name + destination = phen_path / item.name + if source.is_dir(): + shutil.copytree(source, destination) + else: + shutil.copy(source, destination) + + monkeypatch.setattr( + sys, "argv", ["main.py", "phen", "validate", "-d", str(phen_path.resolve())] + ) + main.main() + assert "Phenotype validated successfully" in caplog.text + + # map phenotype + for code_type in ["read2", "read3", "snomed"]: + with caplog.at_level(logging.DEBUG): + monkeypatch.setattr( + sys, + "argv", + [ + "main.py", + "phen", + "map", + "-d", + str(phen_path.resolve()), + "-t", + code_type, + ], + ) + main.main() + assert "Phenotype processed successfully" in caplog.text + + # publish phenotype + with caplog.at_level(logging.DEBUG): + monkeypatch.setattr( + sys, "argv", ["main.py", "phen", "publish", "-d", str(phen_path.resolve())] + ) + main.main() + assert "Phenotype published successfully" in caplog.text + + # copy phenotype' + with caplog.at_level(logging.DEBUG): + monkeypatch.setattr( + sys, + "argv", + [ + "main.py", + "phen", + "copy", + "-d", + str(phen_path.resolve()), + "-td", + str(tmp_dir.resolve()), + "-v", + "v1.0.3", + ], + ) + main.main() + assert "Phenotype copied successfully" in caplog.text + + # diff phenotype + with caplog.at_level(logging.DEBUG): + old_path = tmp_dir / "v1.0.3" + monkeypatch.setattr( + sys, + "argv", + [ + "main.py", + "phen", + "diff", + "-d", + str(phen_path.resolve()), + "-old", + str(old_path.resolve()), + ], + ) + main.main() + assert "Phenotypes diff'd successfully" in caplog.text