acmc.phen

Phenotype Module

This module provides functionality for managing phenotypes.

   1"""
   2Phenotype Module
   3================
   4This module provides functionality for managing phenotypes.
   5"""
   6
   7import argparse
   8import pandas as pd
   9import numpy as np
  10import json
  11import os
  12import sqlite3
  13import sys
  14import shutil
  15import time
  16import git
  17import re
  18import logging
  19import requests
  20import yaml
  21import semver
  22from git import Repo
  23from cerberus import Validator  # type: ignore
  24from deepdiff import DeepDiff
  25from pathlib import Path
  26from urllib.parse import urlparse, urlunparse
  27from typing import Tuple, Set, Any
  28
  29import acmc
  30from acmc import trud, omop, parse, util
  31
  32# setup logging
  33import acmc.logging_config as lc
  34
  35logger = lc.setup_logger()
  36
  37pd.set_option("mode.chained_assignment", None)
  38
  39PHEN_DIR = "phen"
  40DEFAULT_PHEN_PATH = Path("./workspace") / PHEN_DIR
  41
  42CONCEPTS_DIR = "concepts"
  43MAP_DIR = "map"
  44CONCEPT_SET_DIR = "concept-sets"
  45CSV_PATH = Path(CONCEPT_SET_DIR) / "csv"
  46OMOP_PATH = Path(CONCEPT_SET_DIR) / "omop"
  47DEFAULT_PHEN_DIR_LIST = [CONCEPTS_DIR, MAP_DIR, CONCEPT_SET_DIR]
  48CONFIG_FILE = "config.yaml"
  49VOCAB_VERSION_FILE = "vocab_version.yaml"
  50SEMANTIC_VERSION_TYPES = ["major", "minor", "patch"]
  51DEFAULT_VERSION_INC = "patch"
  52
  53DEFAULT_GIT_BRANCH = "main"
  54
  55SPLIT_COL_ACTION = "split_col"
  56CODES_COL_ACTION = "codes_col"
  57DIVIDE_COL_ACTION = "divide_col"
  58COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION]
  59
  60CODE_FILE_TYPES = [".xlsx", ".xls", ".csv"]
  61SOURCE_COL_SUFFIX = "_acmc_source"
  62TARGET_COL_SUFFIX = "_acmc_target"
  63
  64# config.yaml schema
  65CONFIG_SCHEMA = {
  66    "phenotype": {
  67        "type": "dict",
  68        "required": True,
  69        "schema": {
  70            "version": {
  71                "type": "string",
  72                "required": True,
  73                "regex": r"^\d+\.\d+\.\d+$",  # Enforces 'vN.N.N' format
  74            },
  75            "omop": {
  76                "type": "dict",
  77                "required": True,
  78                "schema": {
  79                    "vocabulary_id": {"type": "string", "required": True},
  80                    "vocabulary_name": {"type": "string", "required": True},
  81                    "vocabulary_reference": {
  82                        "type": "string",
  83                        "required": True,
  84                        "regex": r"^https?://.*",  # Ensures it's a URL
  85                    },
  86                },
  87            },
  88            "map": {
  89                "type": "list",
  90                "schema": {
  91                    "type": "string",
  92                    "allowed": list(
  93                        parse.SUPPORTED_CODE_TYPES
  94                    ),  # Ensure only predefined values are allowed
  95                },
  96            },
  97            "concept_sets": {
  98                "type": "list",
  99                "required": True,
 100                "schema": {
 101                    "type": "dict",
 102                    "schema": {
 103                        "name": {"type": "string", "required": True},
 104                        "file": {
 105                            "type": "dict",
 106                            "required": False,
 107                            "schema": {
 108                                "path": {"type": "string", "required": True},
 109                                "columns": {"type": "dict", "required": True},
 110                                "category": {
 111                                    "type": "string"
 112                                },  # Optional but must be string if present
 113                                "actions": {
 114                                    "type": "dict",
 115                                    "schema": {"divide_col": {"type": "string"}},
 116                                },
 117                            },
 118                        },
 119                        "metadata": {"type": "dict", "required": True},
 120                    },
 121                },
 122            },
 123        },
 124    }
 125}
 126
 127
 128class PhenValidationException(Exception):
 129    """Custom exception class raised when validation errors in phenotype configuration file"""
 130
 131    def __init__(self, message, validation_errors=None):
 132        super().__init__(message)
 133        self.validation_errors = validation_errors
 134
 135
 136def construct_git_url(remote_url: str):
 137    """Constructs a git url for github or gitlab including a PAT token environment variable"""
 138    # check the url
 139    parsed_url = urlparse(remote_url)
 140
 141    # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd
 142    # need to update this function if the URL scheme is different.
 143    if "github.com" in parsed_url.netloc:
 144        # get GitHub PAT from environment variable
 145        auth = os.getenv("ACMC_GITHUB_PAT")
 146        if not auth:
 147            raise ValueError(
 148                "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable."
 149            )
 150    else:
 151        # get GitLab PAT from environment variable
 152        auth = os.getenv("ACMC_GITLAB_PAT")
 153        if not auth:
 154            raise ValueError(
 155                "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable."
 156            )
 157        auth = f"oauth2:{auth}"
 158
 159    # Construct the new URL with credentials
 160    new_netloc = f"{auth}@{parsed_url.netloc}"
 161    return urlunparse(
 162        (
 163            parsed_url.scheme,
 164            new_netloc,
 165            parsed_url.path,
 166            parsed_url.params,
 167            parsed_url.query,
 168            parsed_url.fragment,
 169        )
 170    )
 171
 172
 173def create_empty_git_dir(path: Path):
 174    """Creates a directory with a .gitkeep file so that it's tracked in git"""
 175    path.mkdir(exist_ok=True)
 176    keep_path = path / ".gitkeep"
 177    keep_path.touch(exist_ok=True)
 178
 179
 180def check_delete_dir(path: Path, msg: str) -> bool:
 181    """Checks on the command line if a user wants to delete a directory
 182
 183    Args:
 184        path (Path): path of the directory to be deleted
 185        msg (str): message to be displayed to the user
 186
 187    Returns:
 188        Boolean: True if deleted
 189    """
 190    deleted = False
 191
 192    user_input = input(f"{msg}").strip().lower()
 193    if user_input in ["yes", "y"]:
 194        shutil.rmtree(path)
 195        deleted = True
 196    else:
 197        logger.info("Directory was not deleted.")
 198
 199    return deleted
 200
 201
 202def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
 203    """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
 204
 205    Args:
 206        phen_dir (str): local directory path where the upstream repo is to be cloned
 207        upstream_url (str): url to the upstream repo
 208        upstream_version (str): version in the upstream repo to clone
 209        new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
 210
 211    Raises:
 212        ValueError: if the specified version is not in the upstream repo
 213        ValueError: if the upstream repo is not a valid phenotype repo
 214        ValueError: if there's any other problems with Git
 215    """
 216    logger.info(
 217        f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}"
 218    )
 219
 220    phen_path = Path(phen_dir)
 221    # check if directory already exists and ask user if they want to recreate it
 222    if (
 223        phen_path.exists() and phen_path.is_dir()
 224    ):  # Check if it exists and is a directory
 225        configure = check_delete_dir(
 226            phen_path,
 227            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
 228        )
 229    else:
 230        configure = True
 231
 232    if not configure:
 233        logger.info(f"Exiting, phenotype not initiatised")
 234        return
 235
 236    try:
 237        # Clone repo
 238        git_url = construct_git_url(upstream_url)
 239        repo = git.Repo.clone_from(git_url, phen_path)
 240
 241        # Fetch all branches and tags
 242        repo.remotes.origin.fetch()
 243
 244        # Check if the version exists
 245        available_refs = [ref.name.split("/")[-1] for ref in repo.references]
 246        if upstream_version not in available_refs:
 247            raise ValueError(
 248                f"Version '{upstream_version}' not found in the repository: {upstream_url}."
 249            )
 250
 251        # Checkout the specified version
 252        repo.git.checkout(upstream_version)
 253        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
 254        main_branch.checkout()
 255
 256        # Check if 'config.yaml' exists in the root directory
 257        config_path = phen_path / "config.yaml"
 258        if not os.path.isfile(config_path):
 259            raise ValueError(
 260                f"The forked repository is not a valid ACMC repo because 'config.yaml' is missing in the root directory."
 261            )
 262
 263        # Validate the phenotype is compatible with the acmc tool
 264        validate(str(phen_path.resolve()))
 265
 266        # Delete each tag locally
 267        tags = repo.tags
 268        for tag in tags:
 269            repo.delete_tag(tag)
 270            logger.debug(f"Deleted tags from forked repo: {tag}")
 271
 272        # Add upstream remote
 273        repo.create_remote("upstream", upstream_url)
 274        remote = repo.remotes["origin"]
 275        repo.delete_remote(remote)  # Remove existing origin
 276
 277        # Optionally set a new origin remote
 278        if new_origin_url:
 279            git_url = construct_git_url(new_origin_url)
 280            repo.create_remote("origin", git_url)
 281            repo.git.push("--set-upstream", "origin", "main")
 282
 283        logger.info(f"Repository forked successfully at {phen_path}")
 284        logger.info(f"Upstream set to {upstream_url}")
 285        if new_origin_url:
 286            logger.info(f"Origin set to {new_origin_url}")
 287
 288    except Exception as e:
 289        if phen_path.exists():
 290            shutil.rmtree(phen_path)
 291        raise ValueError(f"Error occurred during repository fork: {str(e)}")
 292
 293
 294def init(phen_dir: str, remote_url: str):
 295    """Initial phenotype directory as git repo with standard structure"""
 296    logger.info(f"Initialising Phenotype in directory: {phen_dir}")
 297    phen_path = Path(phen_dir)
 298
 299    # check if directory already exists and ask user if they want to recreate it
 300    if (
 301        phen_path.exists() and phen_path.is_dir()
 302    ):  # Check if it exists and is a directory
 303        configure = check_delete_dir(
 304            phen_path,
 305            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
 306        )
 307    else:
 308        configure = True
 309
 310    if not configure:
 311        logger.info(f"Exiting, phenotype not initiatised")
 312        return
 313
 314    # Initialise repo from local or remote
 315    repo: Repo
 316
 317    # if remote then clone the repo otherwise init a local repo
 318    if remote_url != None:
 319        # add PAT token to the URL
 320        git_url = construct_git_url(remote_url)
 321
 322        # clone the repo
 323        git_cmd = git.cmd.Git()
 324        git_cmd.clone(git_url, phen_path)
 325
 326        # open repo
 327        repo = Repo(phen_path)
 328        # check if there are any commits (new repo has no commits)
 329        if (
 330            len(repo.branches) == 0 or repo.head.is_detached
 331        ):  # Handle detached HEAD (e.g., after init)
 332            logger.debug("The phen repository has no commits yet.")
 333            commit_count = 0
 334        else:
 335            # Get the total number of commits in the default branch
 336            commit_count = sum(1 for _ in repo.iter_commits())
 337            logger.debug(f"Repo has previous commits: {commit_count}")
 338    else:
 339        # local repo, create the directories and init
 340        phen_path.mkdir(parents=True, exist_ok=True)
 341        logger.debug(f"Phen directory '{phen_path}' has been created.")
 342        repo = git.Repo.init(phen_path)
 343        commit_count = 0
 344
 345    phen_path = phen_path.resolve()
 346    # initialise empty repos
 347    if commit_count == 0:
 348        # create initial commit
 349        initial_file_path = phen_path / "README.md"
 350        with open(initial_file_path, "w") as file:
 351            file.write(
 352                "# Initial commit\nThis is the first commit in the phen repository.\n"
 353            )
 354        repo.index.add([initial_file_path])
 355        repo.index.commit("Initial commit")
 356        commit_count = 1
 357
 358    # Checkout the phens default branch, creating it if it does not exist
 359    if DEFAULT_GIT_BRANCH in repo.branches:
 360        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
 361        main_branch.checkout()
 362    else:
 363        main_branch = repo.create_head(DEFAULT_GIT_BRANCH)
 364        main_branch.checkout()
 365
 366    # if the phen path does not contain the config file then initialise the phen type
 367    config_path = phen_path / CONFIG_FILE
 368    if config_path.exists():
 369        logger.debug(f"Phenotype configuration files already exist")
 370        return
 371
 372    logger.info("Creating phen directory structure and config files")
 373    for d in DEFAULT_PHEN_DIR_LIST:
 374        create_empty_git_dir(phen_path / d)
 375
 376    # create empty phen config file
 377    config = {
 378        "phenotype": {
 379            "version": "0.0.0",
 380            "omop": {
 381                "vocabulary_id": "",
 382                "vocabulary_name": "",
 383                "vocabulary_reference": "",
 384            },
 385            "translate": [],
 386            "concept_sets": [],
 387        }
 388    }
 389
 390    with open(phen_path / CONFIG_FILE, "w") as file:
 391        yaml.dump(
 392            config,
 393            file,
 394            Dumper=util.QuotedDumper,
 395            default_flow_style=False,
 396            sort_keys=False,
 397            default_style='"',
 398        )
 399
 400    # add git ignore
 401    ignore_content = """# Ignore SQLite database files
 402*.db
 403*.sqlite3
 404 
 405# Ignore SQLite journal and metadata files
 406*.db-journal
 407*.sqlite3-journal
 408
 409# python
 410.ipynb_checkpoints
 411 """
 412    ignore_path = phen_path / ".gitignore"
 413    with open(ignore_path, "w") as file:
 414        file.write(ignore_content)
 415
 416    # add to git repo and commit
 417    for d in DEFAULT_PHEN_DIR_LIST:
 418        repo.git.add(phen_path / d)
 419    repo.git.add(all=True)
 420    repo.index.commit("initialised the phen git repo.")
 421
 422    logger.info(f"Phenotype initialised successfully")
 423
 424
 425def validate(phen_dir: str):
 426    """Validates the phenotype directory is a git repo with standard structure"""
 427    logger.info(f"Validating phenotype: {phen_dir}")
 428    phen_path = Path(phen_dir)
 429    if not phen_path.is_dir():
 430        raise NotADirectoryError(
 431            f"Error: '{str(phen_path.resolve())}' is not a directory"
 432        )
 433
 434    config_path = phen_path / CONFIG_FILE
 435    if not config_path.is_file():
 436        raise FileNotFoundError(
 437            f"Error: phen configuration file '{config_path}' does not exist."
 438        )
 439
 440    concepts_path = phen_path / CONCEPTS_DIR
 441    if not concepts_path.is_dir():
 442        raise FileNotFoundError(
 443            f"Error: source concepts directory {concepts_path} does not exist."
 444        )
 445
 446    # Calidate the directory is a git repo
 447    try:
 448        git.Repo(phen_path)
 449    except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError):
 450        raise Exception(f"Phen directory {phen_path} is not a git repo")
 451
 452    # Load configuration File
 453    if config_path.suffix == ".yaml":
 454        try:
 455            with config_path.open("r") as file:
 456                phenotype = yaml.safe_load(file)
 457
 458            validator = Validator(CONFIG_SCHEMA)
 459            if validator.validate(phenotype):
 460                logger.debug("YAML structure is valid.")
 461            else:
 462                logger.error(f"YAML structure validation failed: {validator.errors}")
 463                raise Exception(f"YAML structure validation failed: {validator.errors}")
 464        except yaml.YAMLError as e:
 465            logger.error(f"YAML syntax error: {e}")
 466            raise e
 467    else:
 468        raise Exception(
 469            f"Unsupported configuration filetype: {str(config_path.resolve())}"
 470        )
 471
 472    # initiatise
 473    validation_errors = []
 474    phenotype = phenotype["phenotype"]
 475    code_types = parse.CodeTypeParser().code_types
 476
 477    # check the version number is of the format vn.n.n
 478    match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"])
 479    if not match:
 480        validation_errors.append(
 481            f"Invalid version format in configuration file: {phenotype['version']}"
 482        )
 483
 484    # create a list of all the concept set names defined in the concept set configuration
 485    concept_set_names = []
 486    for item in phenotype["concept_sets"]:
 487        if item["name"] in concept_set_names:
 488            validation_errors.append(
 489                f"Duplicate concept set defined in concept sets {item['name'] }"
 490            )
 491        else:
 492            concept_set_names.append(item["name"])
 493
 494    # check codes definition
 495    for item in phenotype["concept_sets"]:
 496        # check concepte code file exists
 497        concept_code_file_path = concepts_path / item["file"]["path"]
 498        if not concept_code_file_path.exists():
 499            validation_errors.append(
 500                f"Coding file {str(concept_code_file_path.resolve())} does not exist"
 501            )
 502
 503        # check concepte code file is not empty
 504        if concept_code_file_path.stat().st_size == 0:
 505            validation_errors.append(
 506                f"Coding file {str(concept_code_file_path.resolve())} is an empty file"
 507            )
 508
 509        # check code file type is supported
 510        if concept_code_file_path.suffix not in CODE_FILE_TYPES:
 511            raise ValueError(
 512                f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types"
 513            )
 514
 515        # check columns specified are a supported medical coding type
 516        for column in item["file"]["columns"]:
 517            if column not in code_types:
 518                validation_errors.append(
 519                    f"Column type {column} for file {concept_code_file_path} is not supported"
 520                )
 521
 522        # check the actions are supported
 523        if "actions" in item["file"]:
 524            for action in item["file"]["actions"]:
 525                if action not in COL_ACTIONS:
 526                    validation_errors.append(f"Action {action} is not supported")
 527
 528    if len(validation_errors) > 0:
 529        logger.error(validation_errors)
 530        raise PhenValidationException(
 531            f"Configuration file {str(config_path.resolve())} failed validation",
 532            validation_errors,
 533        )
 534
 535    logger.info(f"Phenotype validated successfully")
 536
 537
 538def read_table_file(path: Path, excel_sheet: str = ""):
 539    """
 540    Load Code List File
 541    """
 542
 543    path = path.resolve()
 544    if path.suffix == ".csv":
 545        df = pd.read_csv(path, dtype=str)
 546    elif path.suffix == ".xlsx" or path.suffix == ".xls":
 547        if excel_sheet != "":
 548            df = pd.read_excel(path, sheet_name=excel_sheet, dtype=str)
 549        else:
 550            df = pd.read_excel(path, dtype=str)
 551    elif path.suffix == ".dta":
 552        df = pd.read_stata(path)
 553    else:
 554        raise ValueError(
 555            f"Unsupported filetype {path.suffix}, only support{CODE_FILE_TYPES} code file types"
 556        )
 557
 558    return df
 559
 560
 561def process_actions(df: pd.DataFrame, concept_set: dict) -> pd.DataFrame:
 562    # Perform Structural Changes to file before preprocessing
 563    logger.debug("Processing file structural actions")
 564    if (
 565        "actions" in concept_set["file"]
 566        and "split_col" in concept_set["file"]["actions"]
 567        and "codes_col" in concept_set["file"]["actions"]
 568    ):
 569        split_col = concept_set["file"]["actions"]["split_col"]
 570        codes_col = concept_set["file"]["actions"]["codes_col"]
 571        logger.debug(
 572            "Action: Splitting",
 573            split_col,
 574            "column into:",
 575            df[split_col].unique(),
 576        )
 577        codes = df[codes_col]
 578        oh = pd.get_dummies(df[split_col], dtype=bool)  # one hot encode
 579        oh = oh.where((oh != True), codes, axis=0)  # fill in 1s with codes
 580        oh[oh == False] = np.nan  # replace 0s with None
 581        df = pd.concat([df, oh], axis=1)  # merge in new columns
 582
 583    return df
 584
 585
 586# Perform QA Checks on columns individually and append to df
 587def preprocess_source_concepts(
 588    df: pd.DataFrame, concept_set: dict, code_file_path: Path
 589) -> Tuple[pd.DataFrame, list]:
 590    """Parses each column individually - Order and length will not be preserved!"""
 591    out = pd.DataFrame([])  # create output df to append to
 592    code_errors = []  # list of errors from processing
 593
 594    # remove unnamed columns due to extra commas, missing headers, or incorrect parsing
 595    df = df.drop(columns=[col for col in df.columns if "Unnamed" in col])
 596
 597    # Preprocess codes
 598    code_types = parse.CodeTypeParser().code_types
 599    for code_type in concept_set["file"]["columns"]:
 600        parser = code_types[code_type]
 601        logger.info(f"Processing {code_type} codes for {code_file_path}")
 602
 603        # get codes by column name
 604        source_col_name = concept_set["file"]["columns"][code_type]
 605        codes = df[source_col_name].dropna()
 606        codes = codes.astype(str)  # convert to string
 607        codes = codes.str.strip()  # remove excess spaces
 608
 609        # process codes, validating them using parser and returning the errors
 610        codes, errors = parser.process(codes, code_file_path)
 611        if len(errors) > 0:
 612            code_errors.extend(errors)
 613            logger.warning(f"Codes validation failed with {len(errors)} errors")
 614
 615        # add processed codes to df
 616        new_col_name = f"{source_col_name}_SOURCE"
 617        df = df.rename(columns={source_col_name: new_col_name})
 618        process_codes = pd.DataFrame({code_type: codes}).join(df)
 619        out = pd.concat(
 620            [out, process_codes],
 621            ignore_index=True,
 622        )
 623
 624    logger.debug(out.head())
 625
 626    return out, code_errors
 627
 628
 629def get_code_type_from_col_name(col_name: str):
 630    return col_name.split("_")[0]
 631
 632
 633# Translate Df with multiple codes into single code type Series
 634def translate_codes(
 635    source_df: pd.DataFrame, target_code_type: str, concept_name: str
 636) -> pd.DataFrame:
 637    """Translates each source code type the source coding list into a target type and returns all conversions as a concept set"""
 638
 639    # codes = pd.DataFrame([], dtype=str)
 640    codes = pd.DataFrame(
 641        columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string"
 642    )
 643    # Convert codes to target type
 644    logger.info(f"Converting to target code type {target_code_type}")
 645
 646    for source_code_type in source_df.columns:
 647        # if target code type is the same as thet source code type, no translation, just appending source as target
 648        if source_code_type == target_code_type:
 649            copy_df = pd.DataFrame(
 650                {
 651                    "SOURCE_CONCEPT": source_df[source_code_type],
 652                    "SOURCE_CONCEPT_TYPE": source_code_type,
 653                    "CONCEPT": source_df[source_code_type],
 654                }
 655            )
 656            codes = pd.concat([codes, copy_df])
 657            logger.debug(
 658                f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating"
 659            )
 660        else:
 661            # get the translation filename using source to target code types
 662            filename = f"{source_code_type}_to_{target_code_type}.parquet"
 663            map_path = trud.PROCESSED_PATH / filename
 664
 665            # do the mapping if it exists
 666            if map_path.exists():
 667                # get mapping
 668                df_map = pd.read_parquet(map_path)
 669
 670                # do mapping
 671                translated_df = pd.merge(
 672                    source_df[source_code_type], df_map, how="left"
 673                )
 674
 675                # normalise the output
 676                translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"])
 677                translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type
 678
 679                # add to list of codes
 680                codes = pd.concat([codes, translated_df])
 681
 682            else:
 683                logger.warning(
 684                    f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist"
 685                )
 686
 687    codes = codes.dropna()  # delete NaNs
 688
 689    # added concept set type to output if any translations
 690    if len(codes.index) > 0:
 691        codes["CONCEPT_SET"] = concept_name
 692    else:
 693        logger.debug(f"No codes converted with target code type {target_code_type}")
 694
 695    return codes
 696
 697
 698def sql_row_exist(
 699    conn: sqlite3.Connection, table: str, column: str, value: str
 700) -> bool:
 701    # Execute and check if a result exists
 702    cur = conn.cursor()
 703    query = f"SELECT 1 FROM {table} WHERE {column} = ? LIMIT 1;"
 704    cur.execute(query, (value,))
 705    exists = cur.fetchone() is not None
 706
 707    return exists
 708
 709
 710def write_code_errors(code_errors: list, code_errors_path: Path):
 711    err_df = pd.DataFrame(
 712        [
 713            {
 714                "CONCEPT": ", ".join(err.codes[~err.mask].tolist()),
 715                "VOCABULARY": err.code_type,
 716                "SOURCE": err.codes_file,
 717                "CAUSE": err.message,
 718            }
 719            for err in code_errors
 720        ]
 721    )
 722
 723    err_df = err_df.drop_duplicates()  # Remove Duplicates from Error file
 724    err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"])
 725    err_df.to_csv(code_errors_path, index=False, mode="w")
 726
 727
 728def write_vocab_version(phen_path: Path):
 729    # write the vocab version files
 730
 731    if not trud.VERSION_PATH.exists():
 732        raise FileNotFoundError(
 733            f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed"
 734        )
 735
 736    if not omop.VERSION_PATH.exists():
 737        raise FileNotFoundError(
 738            f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed"
 739        )
 740
 741    with trud.VERSION_PATH.open("r") as file:
 742        trud_version = yaml.safe_load(file)
 743
 744    with omop.VERSION_PATH.open("r") as file:
 745        omop_version = yaml.safe_load(file)
 746
 747    # Create the combined YAML structure
 748    version_data = {
 749        "versions": {
 750            "acmc": acmc.__version__,
 751            "trud": trud_version,
 752            "omop": omop_version,
 753        }
 754    }
 755
 756    with open(phen_path / VOCAB_VERSION_FILE, "w") as file:
 757        yaml.dump(
 758            version_data,
 759            file,
 760            Dumper=util.QuotedDumper,
 761            default_flow_style=False,
 762            sort_keys=False,
 763            default_style='"',
 764        )
 765
 766
 767def map(phen_dir: str, target_code_type: str):
 768    logger.info(f"Processing phenotype: {phen_dir}")
 769
 770    # Validate configuration
 771    validate(phen_dir)
 772
 773    # initialise paths
 774    phen_path = Path(phen_dir)
 775    config_path = phen_path / CONFIG_FILE
 776
 777    # load configuration
 778    with config_path.open("r") as file:
 779        config = yaml.safe_load(file)
 780    phenotype = config["phenotype"]
 781
 782    if len(phenotype["map"]) == 0:
 783        raise ValueError(f"No map codes defined in the phenotype configuration")
 784
 785    if target_code_type is not None and target_code_type not in phenotype["map"]:
 786        raise ValueError(
 787            f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}"
 788        )
 789
 790    if target_code_type is not None:
 791        map_target_code_type(phen_path, phenotype, target_code_type)
 792    else:
 793        for t in phenotype["map"]:
 794            map_target_code_type(phen_path, phenotype, t)
 795
 796    logger.info(f"Phenotype processed successfully")
 797
 798
 799def map_target_code_type(phen_path: Path, phenotype: dict, target_code_type: str):
 800    logger.debug(f"Target coding format: {target_code_type}")
 801    concepts_path = phen_path / CONCEPTS_DIR
 802    # Create output dataframe
 803    out = pd.DataFrame([])
 804    code_errors = []
 805
 806    # Process each folder in codes section
 807    for concept_set in phenotype["concept_sets"]:
 808        logger.debug(f"--- {concept_set['file']} ---")
 809
 810        # Load code file
 811        codes_file_path = Path(concepts_path / concept_set["file"]["path"])
 812        df = read_table_file(codes_file_path)
 813
 814        # process structural actions
 815        df = process_actions(df, concept_set)
 816
 817        # preprocessing and validate of source concepts
 818        logger.debug("Processing and validating source concept codes")
 819        df, errors = preprocess_source_concepts(
 820            df,
 821            concept_set,
 822            codes_file_path,
 823        )
 824
 825        # create df with just the source code columns
 826        source_column_names = list(concept_set["file"]["columns"].keys())
 827        source_df = df[source_column_names]
 828
 829        logger.debug(source_df.columns)
 830        logger.debug(source_df.head())
 831
 832        logger.debug(f"Length of errors from preprocess_source_concepts {len(errors)}")
 833        if len(errors) > 0:
 834            code_errors.extend(errors)
 835        logger.debug(f" Length of code_errors {len(code_errors)}")
 836
 837        # Map source concepts codes to target codes
 838        # if processing a source coding list with categorical data
 839        if (
 840            "actions" in concept_set["file"]
 841            and "divide_col" in concept_set["file"]["actions"]
 842            and len(df) > 0
 843        ):
 844            divide_col = concept_set["file"]["actions"]["divide_col"]
 845            logger.debug(f"Action: Dividing Table by {divide_col}")
 846            logger.debug(f"column into: {df[divide_col].unique()}")
 847            df_grp = df.groupby(divide_col)
 848            for cat, grp in df_grp:
 849                if cat == concept_set["file"]["category"]:
 850                    grp = grp.drop(columns=[divide_col])  # delete categorical column
 851                    source_df = grp[source_column_names]
 852                    trans_out = translate_codes(
 853                        source_df,
 854                        target_code_type=target_code_type,
 855                        concept_name=concept_set["name"],
 856                    )
 857                    out = pd.concat([out, trans_out])
 858        else:
 859            source_df = df[source_column_names]
 860            trans_out = translate_codes(
 861                source_df,
 862                target_code_type=target_code_type,
 863                concept_name=concept_set["name"],
 864            )
 865            out = pd.concat([out, trans_out])
 866
 867    if len(code_errors) > 0:
 868        logger.error(f"The map processing has {len(code_errors)} errors")
 869        error_path = phen_path / MAP_DIR / "errors"
 870        error_path.mkdir(parents=True, exist_ok=True)
 871        error_filename = f"{target_code_type}-code-errors.csv"
 872        write_code_errors(code_errors, error_path / error_filename)
 873
 874    # Check there is output from processing
 875    if len(out.index) == 0:
 876        logger.error(f"No output after map processing")
 877        raise Exception(
 878            f"No output after map processing, check config {str(phen_path.resolve())}"
 879        )
 880
 881    # final processing
 882    out = out.reset_index(drop=True)
 883    out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
 884    out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"])
 885
 886    out_count = len(out.index)
 887    # added metadata
 888    # Loop over each source_concept_type and perform the left join on all columns apart from source code columns
 889    result_list = []
 890    source_column_names = list(concept_set["file"]["columns"].keys())
 891    for source_concept_type in source_column_names:
 892        # Filter output based on the current source_concept_type
 893        out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type]
 894        filtered_count = len(out_filtered_df.index)
 895
 896        # Remove the source type columns except the current type will leave the metadata and the join
 897        remove_types = [
 898            type for type in source_column_names if type != source_concept_type
 899        ]
 900        metadata_df = df.drop(columns=remove_types)
 901        metadata_df = metadata_df.rename(
 902            columns={source_concept_type: "SOURCE_CONCEPT"}
 903        )
 904        metadata_df_count = len(metadata_df.index)
 905
 906        # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata
 907        result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT")
 908        result_count = len(result.index)
 909
 910        logger.debug(
 911            f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}"
 912        )
 913
 914        # Append the result to the result_list
 915        result_list.append(result)
 916
 917    # Concatenate all the results into a single DataFrame
 918    final_out = pd.concat(result_list, ignore_index=True)
 919    final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
 920    logger.debug(
 921        f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}"
 922    )
 923
 924    # Save output to map directory
 925    output_filename = target_code_type + ".csv"
 926    map_path = phen_path / MAP_DIR / output_filename
 927    final_out.to_csv(map_path, index=False)
 928    logger.info(f"Saved mapped concepts to {str(map_path.resolve())}")
 929
 930    # save concept sets as separate files
 931    concept_set_path = phen_path / CSV_PATH / target_code_type
 932
 933    # empty the concept-set directory except for hiddle files, e.g. .git
 934    if concept_set_path.exists():
 935        for item in concept_set_path.iterdir():
 936            if not item.name.startswith("."):
 937                item.unlink()
 938    else:
 939        concept_set_path.mkdir(parents=True, exist_ok=True)
 940
 941    # write each concept as a separate file
 942    for name, concept in final_out.groupby("CONCEPT_SET"):
 943        concept = concept.sort_values(by="CONCEPT")  # sort rows
 944        concept = concept.dropna(how="all", axis=1)  # remove empty cols
 945        concept = concept.reindex(
 946            sorted(concept.columns), axis=1
 947        )  # sort cols alphabetically
 948        filename = f"{name}.csv"
 949        concept_path = concept_set_path / filename
 950        concept.to_csv(concept_path, index=False)
 951
 952    write_vocab_version(phen_path)
 953
 954    logger.info(f"Phenotype processed target code type {target_code_type}")
 955
 956
 957def generate_version_tag(
 958    repo: git.Repo, increment: str = DEFAULT_VERSION_INC, use_v_prefix: bool = False
 959) -> str:
 960    # Get all valid semantic version tags
 961    versions = []
 962    for tag in repo.tags:
 963        tag_name = (
 964            tag.name.lstrip("v") if use_v_prefix else tag.name
 965        )  # Remove 'v' if needed
 966        if semver.Version.is_valid(tag_name):
 967            versions.append(semver.Version.parse(tag_name))
 968
 969    # Determine the next version
 970    if not versions:
 971        new_version = semver.Version(0, 0, 1)
 972    else:
 973        latest_version = max(versions)
 974        if increment == "major":
 975            new_version = latest_version.bump_major()
 976        elif increment == "minor":
 977            new_version = latest_version.bump_minor()
 978        else:
 979            new_version = latest_version.bump_patch()
 980
 981    # Create the new tag
 982    new_version_str = f"v{new_version}" if use_v_prefix else str(new_version)
 983
 984    return new_version_str
 985
 986
 987def publish(
 988    phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC
 989):
 990    """Publishes updates to the phenotype by commiting all changes to the repo directory"""
 991
 992    # Validate config
 993    validate(phen_dir)
 994    phen_path = Path(phen_dir)
 995
 996    # load git repo and set the branch
 997    repo = git.Repo(phen_path)
 998    if DEFAULT_GIT_BRANCH in repo.branches:
 999        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
1000        main_branch.checkout()
1001    else:
1002        raise AttributeError(
1003            f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}"
1004        )
1005
1006    # check if any changes to publish
1007    if not repo.is_dirty() and not repo.untracked_files:
1008        if remote_url is not None and "origin" not in repo.remotes:
1009            logger.info(f"First publish to remote url {remote_url}")
1010        else:
1011            logger.info("Nothing to publish, no changes to the repo")
1012            return
1013
1014    # get next version
1015    new_version_str = generate_version_tag(repo, increment)
1016    logger.info(f"New version: {new_version_str}")
1017
1018    # Write version in configuration file
1019    config_path = phen_path / CONFIG_FILE
1020    with config_path.open("r") as file:
1021        config = yaml.safe_load(file)
1022
1023    config["phenotype"]["version"] = new_version_str
1024    with open(config_path, "w") as file:
1025        yaml.dump(
1026            config,
1027            file,
1028            Dumper=util.QuotedDumper,
1029            default_flow_style=False,
1030            sort_keys=False,
1031            default_style='"',
1032        )
1033
1034    # Add and commit changes to repo including version updates
1035    commit_message = f"Committing updates to phenotype {phen_path}"
1036    repo.git.add("--all")
1037    repo.index.commit(commit_message)
1038
1039    # Add tag to the repo
1040    repo.create_tag(new_version_str)
1041
1042    # push to origin if a remote repo
1043    if remote_url is not None and "origin" not in repo.remotes:
1044        git_url = construct_git_url(remote_url)
1045        repo.create_remote("origin", git_url)
1046
1047    try:
1048        if "origin" in repo.remotes:
1049            logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}")
1050            origin = repo.remotes.origin
1051            logger.info(f"Pushing main branch to remote repo")
1052            repo.git.push("--set-upstream", "origin", "main")
1053            logger.info(f"Pushing version tags to remote git repo")
1054            origin.push(tags=True)
1055            logger.debug("Changes pushed to 'origin'")
1056        else:
1057            logger.debug("Remote 'origin' is not set")
1058    except Exception as e:
1059        tag_ref = repo.tags[new_version_str]
1060        repo.delete_tag(tag_ref)
1061        repo.git.reset("--soft", "HEAD~1")
1062        raise e
1063
1064    logger.info(f"Phenotype published successfully")
1065
1066
1067def export(phen_dir: str, version: str):
1068    """Exports a phen repo at a specific tagged version into a target directory"""
1069    logger.info(f"Exporting phenotype {phen_dir} at version {version}")
1070
1071    # validate configuration
1072    validate(phen_dir)
1073    phen_path = Path(phen_dir)
1074
1075    # load configuration
1076    config_path = phen_path / CONFIG_FILE
1077    with config_path.open("r") as file:
1078        config = yaml.safe_load(file)
1079
1080    map_path = phen_path / MAP_DIR
1081    if not map_path.exists():
1082        logger.warning(f"Map path does not exist '{map_path}'")
1083
1084    export_path = phen_path / OMOP_PATH
1085    # check export directory exists and if not create it
1086    if not export_path.exists():
1087        export_path.mkdir(parents=True)
1088        logger.debug(f"OMOP export directory '{export_path}' created.")
1089
1090    # omop export db
1091    export_db_path = omop.export(
1092        map_path,
1093        export_path,
1094        config["phenotype"]["version"],
1095        config["phenotype"]["omop"],
1096    )
1097
1098    # write to tables
1099    # export as csv
1100    logger.info(f"Phenotype exported successfully")
1101
1102
1103def copy(phen_dir: str, target_dir: str, version: str):
1104    """Copys a phen repo at a specific tagged version into a target directory"""
1105
1106    # Validate
1107    validate(phen_dir)
1108    phen_path = Path(phen_dir)
1109
1110    # Check target directory exists
1111    target_path = Path(target_dir)
1112    if not target_path.exists():
1113        raise FileNotFoundError(f"The target directory {target_path} does not exist")
1114
1115    # Set copy directory
1116    copy_path = target_path / version
1117    logger.info(f"Copying repo {phen_path} to {copy_path}")
1118
1119    if (
1120        copy_path.exists() and copy_path.is_dir()
1121    ):  # Check if it exists and is a directory
1122        copy = check_delete_dir(
1123            copy_path,
1124            f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ",
1125        )
1126    else:
1127        copy = True
1128
1129    if not copy:
1130        logger.info(f"Not copying the version {version}")
1131        return
1132
1133    logger.debug(f"Cloning repo from {phen_path} into {copy_path}...")
1134    repo = git.Repo.clone_from(phen_path, copy_path)
1135
1136    # Check out the latest commit or specified version
1137    if version:
1138        # Checkout a specific version (e.g., branch, tag, or commit hash)
1139        logger.info(f"Checking out version {version}...")
1140        repo.git.checkout(version)
1141    else:
1142        # Checkout the latest commit (HEAD)
1143        logger.info(f"Checking out the latest commit...")
1144        repo.git.checkout("HEAD")
1145
1146    logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}")
1147
1148    logger.info(f"Phenotype copied successfully")
1149
1150
1151# Convert concept_sets list into dictionaries
1152def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1153    """Extracts concepts as {name: file_path} dictionary and a name set."""
1154    concepts_dict = {
1155        item["name"]: item["file"]["path"]
1156        for item in config_data["phenotype"]["concept_sets"]
1157    }
1158    name_set = set(concepts_dict.keys())
1159    return concepts_dict, name_set
1160
1161
1162def extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]:
1163    """
1164    Extracts clean keys from a DeepDiff dictionary.
1165
1166    :param diff: DeepDiff result dictionary
1167    :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed")
1168    :return: A set of clean key names
1169    """
1170    return {key.split("root['")[1].split("']")[0] for key in diff.get(key_type, [])}
1171
1172
1173def diff_config(old_config: dict, new_config: dict) -> str:
1174    report = f"\n# Changes to phenotype configuration\n"
1175    report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n"
1176
1177    old_concepts, old_names = extract_concepts(old_config)
1178    new_concepts, new_names = extract_concepts(new_config)
1179
1180    # Check added and removed names
1181    added_names = new_names - old_names  # Names that appear in new but not in old
1182    removed_names = old_names - new_names  # Names that were in old but not in new
1183
1184    # find file path changes for unchanged names
1185    unchanged_names = old_names & new_names  # Names that exist in both
1186    file_diff = DeepDiff(
1187        {name: old_concepts[name] for name in unchanged_names},
1188        {name: new_concepts[name] for name in unchanged_names},
1189    )
1190
1191    # Find renamed concepts (same file, different name)
1192    renamed_concepts = []
1193    for removed in removed_names:
1194        old_path = old_concepts[removed]
1195        for added in added_names:
1196            new_path = new_concepts[added]
1197            if old_path == new_path:
1198                renamed_concepts.append((removed, added))
1199
1200    # Remove renamed concepts from added and removed sets
1201    for old_name, new_name in renamed_concepts:
1202        added_names.discard(new_name)
1203        removed_names.discard(old_name)
1204
1205    # generate config report
1206    if added_names:
1207        report += "## Added Concepts\n"
1208        for name in added_names:
1209            report += f"- `{name}` (File: `{new_concepts[name]}`)\n"
1210        report += "\n"
1211
1212    if removed_names:
1213        report += "## Removed Concepts\n"
1214        for name in removed_names:
1215            report += f"- `{name}` (File: `{old_concepts[name]}`)\n"
1216        report += "\n"
1217
1218    if renamed_concepts:
1219        report += "## Renamed Concepts\n"
1220        for old_name, new_name in renamed_concepts:
1221            report += (
1222                f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n"
1223            )
1224        report += "\n"
1225
1226    if "values_changed" in file_diff:
1227        report += "## Updated File Paths\n"
1228        for name, change in file_diff["values_changed"].items():
1229            old_file = change["old_value"]
1230            new_file = change["new_value"]
1231            clean_name = name.split("root['")[1].split("']")[0]
1232            report += (
1233                f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n"
1234            )
1235        report += "\n"
1236
1237    if not (
1238        added_names
1239        or removed_names
1240        or renamed_concepts
1241        or file_diff.get("values_changed")
1242    ):
1243        report += "No changes in concept sets.\n"
1244
1245    return report
1246
1247
1248def diff_map_files(old_map_path: Path, new_map_path: Path) -> str:
1249    old_output_files = [
1250        file.name
1251        for file in old_map_path.iterdir()
1252        if file.is_file() and not file.name.startswith(".")
1253    ]
1254    new_output_files = [
1255        file.name
1256        for file in new_map_path.iterdir()
1257        if file.is_file() and not file.name.startswith(".")
1258    ]
1259
1260    # Convert the lists to sets for easy comparison
1261    old_output_set = set(old_output_files)
1262    new_output_set = set(new_output_files)
1263
1264    # Outputs that are in old_output_set but not in new_output_set (removed files)
1265    removed_outputs = old_output_set - new_output_set
1266    # Outputs that are in new_output_set but not in old_output_set (added files)
1267    added_outputs = new_output_set - old_output_set
1268    # Outputs that are the intersection of old_output_set and new_output_set
1269    common_outputs = old_output_set & new_output_set
1270
1271    report = f"\n# Changes to available translations\n"
1272    report += f"This compares the coding translations files available.\n\n"
1273    report += f"- Removed outputs: {sorted(list(removed_outputs))}\n"
1274    report += f"- Added outputs: {sorted(list(added_outputs))}\n"
1275    report += f"- Common outputs: {sorted(list(common_outputs))}\n\n"
1276
1277    # Step N: Compare common outputs between versions
1278    report += f"# Changes to concepts in translation files\n\n"
1279    report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n"
1280    for file in common_outputs:
1281        old_output = old_map_path / file
1282        new_output = new_map_path / file
1283
1284        logger.debug(f"Old ouptput: {str(old_output.resolve())}")
1285        logger.debug(f"New ouptput: {str(new_output.resolve())}")
1286
1287        df1 = pd.read_csv(old_output)
1288        df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1289        df2 = pd.read_csv(new_output)
1290        df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1291
1292        # Check for added and removed concepts
1293        report += f"- File {file}\n"
1294        sorted_list = sorted(list(set(df1.index) - set(df2.index)))
1295        report += f"- Removed concepts {sorted_list}\n"
1296        sorted_list = sorted(list(set(df2.index) - set(df1.index)))
1297        report += f"- Added concepts {sorted_list}\n"
1298
1299        # Check for changed concepts
1300        diff = df2 - df1  # diff in counts
1301        diff = diff[
1302            (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna()
1303        ]  # get non-zero counts
1304        s = "\n"
1305        if len(diff.index) > 0:
1306            for concept, row in diff.iterrows():
1307                s += "\t - {} {}\n".format(concept, row["CONCEPT"])
1308            report += f"- Changed concepts {s}\n\n"
1309        else:
1310            report += f"- Changed concepts []\n\n"
1311
1312    return report
1313
1314
1315def diff_phen(
1316    new_phen_path: Path,
1317    new_version: str,
1318    old_phen_path: Path,
1319    old_version: str,
1320    report_path: Path,
1321):
1322    """Compare the differences between two versions of a phenotype"""
1323
1324    # validate phenotypes
1325    logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}")
1326    validate(str(old_phen_path.resolve()))
1327    logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}")
1328    validate(str(new_phen_path.resolve()))
1329
1330    # get old and new config
1331    old_config_path = old_phen_path / CONFIG_FILE
1332    with old_config_path.open("r") as file:
1333        old_config = yaml.safe_load(file)
1334    new_config_path = new_phen_path / CONFIG_FILE
1335    with new_config_path.open("r") as file:
1336        new_config = yaml.safe_load(file)
1337
1338    # write report heading
1339    report = f"# Phenotype Comparison Report\n"
1340    report += f"## Original phenotype\n"
1341    report += f"  - {old_config['phenotype']['omop']['vocabulary_id']}\n"
1342    report += f"  - {old_version}\n"
1343    report += f"  - {str(old_phen_path.resolve())}\n"
1344    report += f"## Changed phenotype:\n"
1345    report += f"  - {new_config['phenotype']['omop']['vocabulary_id']}\n"
1346    report += f"  - {new_version}\n"
1347    report += f"  - {str(new_phen_path.resolve())}\n"
1348
1349    # Step 1: check differences configuration files
1350    # Convert list of dicts into a dict: {name: file}
1351    report += diff_config(old_config, new_config)
1352
1353    # Step 2: check differences between map files
1354    # List files from output directories
1355    old_map_path = old_phen_path / MAP_DIR
1356    new_map_path = new_phen_path / MAP_DIR
1357    report += diff_map_files(old_map_path, new_map_path)
1358
1359    # initialise report file
1360    logger.debug(f"Writing to report file {str(report_path.resolve())}")
1361    report_file = open(report_path, "w")
1362    report_file.write(report)
1363    report_file.close()
1364
1365    logger.info(f"Phenotypes diff'd successfully")
1366
1367
1368def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str):
1369    # make tmp directory .acmc
1370    timestamp = time.strftime("%Y%m%d_%H%M%S")
1371    temp_dir = Path(f".acmc/diff_{timestamp}")
1372
1373    changed_phen_path = Path(phen_dir)
1374    if not changed_phen_path.exists():
1375        raise ValueError(
1376            f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}"
1377        )
1378
1379    old_phen_path = Path(old_phen_dir)
1380    if not old_phen_path.exists():
1381        raise ValueError(
1382            f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}"
1383        )
1384
1385    try:
1386        # Create the directory
1387        temp_dir.mkdir(parents=True, exist_ok=True)
1388        logger.debug(f"Temporary directory created: {temp_dir}")
1389
1390        # Create temporary directories
1391        changed_path = temp_dir / "changed"
1392        changed_path.mkdir(parents=True, exist_ok=True)
1393        old_path = temp_dir / "old"
1394        old_path.mkdir(parents=True, exist_ok=True)
1395
1396        # checkout changed
1397        if version == "latest":
1398            logger.debug(
1399                f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..."
1400            )
1401            shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True)
1402        else:
1403            logger.debug(
1404                f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..."
1405            )
1406            changed_repo = git.Repo.clone_from(changed_phen_path, changed_path)
1407            changed_repo.git.checkout(version)
1408
1409        # checkout old
1410        if old_version == "latest":
1411            logger.debug(
1412                f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1413            )
1414            shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True)
1415        else:
1416            logger.debug(
1417                f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1418            )
1419            old_repo = git.Repo.clone_from(old_phen_dir, old_path)
1420            old_repo.git.checkout(old_version)
1421
1422        report_filename = f"{version}_{old_version}_diff.md"
1423        report_path = changed_phen_path / report_filename
1424        # diff old with new
1425        diff_phen(changed_path, version, old_path, old_version, report_path)
1426
1427    finally:
1428        # clean up tmp directory
1429        if temp_dir.exists():
1430            shutil.rmtree(temp_dir)
1431            print(f"Temporary directory removed: {temp_dir}")
logger = <Logger acmc_logger (INFO)>
PHEN_DIR = 'phen'
DEFAULT_PHEN_PATH = PosixPath('workspace/phen')
CONCEPTS_DIR = 'concepts'
MAP_DIR = 'map'
CONCEPT_SET_DIR = 'concept-sets'
CSV_PATH = PosixPath('concept-sets/csv')
OMOP_PATH = PosixPath('concept-sets/omop')
DEFAULT_PHEN_DIR_LIST = ['concepts', 'map', 'concept-sets']
CONFIG_FILE = 'config.yaml'
VOCAB_VERSION_FILE = 'vocab_version.yaml'
SEMANTIC_VERSION_TYPES = ['major', 'minor', 'patch']
DEFAULT_VERSION_INC = 'patch'
DEFAULT_GIT_BRANCH = 'main'
SPLIT_COL_ACTION = 'split_col'
CODES_COL_ACTION = 'codes_col'
DIVIDE_COL_ACTION = 'divide_col'
COL_ACTIONS = ['split_col', 'codes_col', 'divide_col']
CODE_FILE_TYPES = ['.xlsx', '.xls', '.csv']
SOURCE_COL_SUFFIX = '_acmc_source'
TARGET_COL_SUFFIX = '_acmc_target'
CONFIG_SCHEMA = {'phenotype': {'type': 'dict', 'required': True, 'schema': {'version': {'type': 'string', 'required': True, 'regex': '^\\d+\\.\\d+\\.\\d+$'}, 'omop': {'type': 'dict', 'required': True, 'schema': {'vocabulary_id': {'type': 'string', 'required': True}, 'vocabulary_name': {'type': 'string', 'required': True}, 'vocabulary_reference': {'type': 'string', 'required': True, 'regex': '^https?://.*'}}}, 'map': {'type': 'list', 'schema': {'type': 'string', 'allowed': ['read2', 'read3', 'snomed', 'icd10', 'opcs4', 'atc']}}, 'concept_sets': {'type': 'list', 'required': True, 'schema': {'type': 'dict', 'schema': {'name': {'type': 'string', 'required': True}, 'file': {'type': 'dict', 'required': False, 'schema': {'path': {'type': 'string', 'required': True}, 'columns': {'type': 'dict', 'required': True}, 'category': {'type': 'string'}, 'actions': {'type': 'dict', 'schema': {'divide_col': {'type': 'string'}}}}}, 'metadata': {'type': 'dict', 'required': True}}}}}}}
class PhenValidationException(builtins.Exception):
129class PhenValidationException(Exception):
130    """Custom exception class raised when validation errors in phenotype configuration file"""
131
132    def __init__(self, message, validation_errors=None):
133        super().__init__(message)
134        self.validation_errors = validation_errors

Custom exception class raised when validation errors in phenotype configuration file

PhenValidationException(message, validation_errors=None)
132    def __init__(self, message, validation_errors=None):
133        super().__init__(message)
134        self.validation_errors = validation_errors
validation_errors
def construct_git_url(remote_url: str):
137def construct_git_url(remote_url: str):
138    """Constructs a git url for github or gitlab including a PAT token environment variable"""
139    # check the url
140    parsed_url = urlparse(remote_url)
141
142    # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd
143    # need to update this function if the URL scheme is different.
144    if "github.com" in parsed_url.netloc:
145        # get GitHub PAT from environment variable
146        auth = os.getenv("ACMC_GITHUB_PAT")
147        if not auth:
148            raise ValueError(
149                "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable."
150            )
151    else:
152        # get GitLab PAT from environment variable
153        auth = os.getenv("ACMC_GITLAB_PAT")
154        if not auth:
155            raise ValueError(
156                "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable."
157            )
158        auth = f"oauth2:{auth}"
159
160    # Construct the new URL with credentials
161    new_netloc = f"{auth}@{parsed_url.netloc}"
162    return urlunparse(
163        (
164            parsed_url.scheme,
165            new_netloc,
166            parsed_url.path,
167            parsed_url.params,
168            parsed_url.query,
169            parsed_url.fragment,
170        )
171    )

Constructs a git url for github or gitlab including a PAT token environment variable

def create_empty_git_dir(path: pathlib.Path):
174def create_empty_git_dir(path: Path):
175    """Creates a directory with a .gitkeep file so that it's tracked in git"""
176    path.mkdir(exist_ok=True)
177    keep_path = path / ".gitkeep"
178    keep_path.touch(exist_ok=True)

Creates a directory with a .gitkeep file so that it's tracked in git

def check_delete_dir(path: pathlib.Path, msg: str) -> bool:
181def check_delete_dir(path: Path, msg: str) -> bool:
182    """Checks on the command line if a user wants to delete a directory
183
184    Args:
185        path (Path): path of the directory to be deleted
186        msg (str): message to be displayed to the user
187
188    Returns:
189        Boolean: True if deleted
190    """
191    deleted = False
192
193    user_input = input(f"{msg}").strip().lower()
194    if user_input in ["yes", "y"]:
195        shutil.rmtree(path)
196        deleted = True
197    else:
198        logger.info("Directory was not deleted.")
199
200    return deleted

Checks on the command line if a user wants to delete a directory

Args: path (Path): path of the directory to be deleted msg (str): message to be displayed to the user

Returns: Boolean: True if deleted

def fork( phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
203def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
204    """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
205
206    Args:
207        phen_dir (str): local directory path where the upstream repo is to be cloned
208        upstream_url (str): url to the upstream repo
209        upstream_version (str): version in the upstream repo to clone
210        new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
211
212    Raises:
213        ValueError: if the specified version is not in the upstream repo
214        ValueError: if the upstream repo is not a valid phenotype repo
215        ValueError: if there's any other problems with Git
216    """
217    logger.info(
218        f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}"
219    )
220
221    phen_path = Path(phen_dir)
222    # check if directory already exists and ask user if they want to recreate it
223    if (
224        phen_path.exists() and phen_path.is_dir()
225    ):  # Check if it exists and is a directory
226        configure = check_delete_dir(
227            phen_path,
228            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
229        )
230    else:
231        configure = True
232
233    if not configure:
234        logger.info(f"Exiting, phenotype not initiatised")
235        return
236
237    try:
238        # Clone repo
239        git_url = construct_git_url(upstream_url)
240        repo = git.Repo.clone_from(git_url, phen_path)
241
242        # Fetch all branches and tags
243        repo.remotes.origin.fetch()
244
245        # Check if the version exists
246        available_refs = [ref.name.split("/")[-1] for ref in repo.references]
247        if upstream_version not in available_refs:
248            raise ValueError(
249                f"Version '{upstream_version}' not found in the repository: {upstream_url}."
250            )
251
252        # Checkout the specified version
253        repo.git.checkout(upstream_version)
254        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
255        main_branch.checkout()
256
257        # Check if 'config.yaml' exists in the root directory
258        config_path = phen_path / "config.yaml"
259        if not os.path.isfile(config_path):
260            raise ValueError(
261                f"The forked repository is not a valid ACMC repo because 'config.yaml' is missing in the root directory."
262            )
263
264        # Validate the phenotype is compatible with the acmc tool
265        validate(str(phen_path.resolve()))
266
267        # Delete each tag locally
268        tags = repo.tags
269        for tag in tags:
270            repo.delete_tag(tag)
271            logger.debug(f"Deleted tags from forked repo: {tag}")
272
273        # Add upstream remote
274        repo.create_remote("upstream", upstream_url)
275        remote = repo.remotes["origin"]
276        repo.delete_remote(remote)  # Remove existing origin
277
278        # Optionally set a new origin remote
279        if new_origin_url:
280            git_url = construct_git_url(new_origin_url)
281            repo.create_remote("origin", git_url)
282            repo.git.push("--set-upstream", "origin", "main")
283
284        logger.info(f"Repository forked successfully at {phen_path}")
285        logger.info(f"Upstream set to {upstream_url}")
286        if new_origin_url:
287            logger.info(f"Origin set to {new_origin_url}")
288
289    except Exception as e:
290        if phen_path.exists():
291            shutil.rmtree(phen_path)
292        raise ValueError(f"Error occurred during repository fork: {str(e)}")

Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"

Args: phen_dir (str): local directory path where the upstream repo is to be cloned upstream_url (str): url to the upstream repo upstream_version (str): version in the upstream repo to clone new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.

Raises: ValueError: if the specified version is not in the upstream repo ValueError: if the upstream repo is not a valid phenotype repo ValueError: if there's any other problems with Git

def init(phen_dir: str, remote_url: str):
295def init(phen_dir: str, remote_url: str):
296    """Initial phenotype directory as git repo with standard structure"""
297    logger.info(f"Initialising Phenotype in directory: {phen_dir}")
298    phen_path = Path(phen_dir)
299
300    # check if directory already exists and ask user if they want to recreate it
301    if (
302        phen_path.exists() and phen_path.is_dir()
303    ):  # Check if it exists and is a directory
304        configure = check_delete_dir(
305            phen_path,
306            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
307        )
308    else:
309        configure = True
310
311    if not configure:
312        logger.info(f"Exiting, phenotype not initiatised")
313        return
314
315    # Initialise repo from local or remote
316    repo: Repo
317
318    # if remote then clone the repo otherwise init a local repo
319    if remote_url != None:
320        # add PAT token to the URL
321        git_url = construct_git_url(remote_url)
322
323        # clone the repo
324        git_cmd = git.cmd.Git()
325        git_cmd.clone(git_url, phen_path)
326
327        # open repo
328        repo = Repo(phen_path)
329        # check if there are any commits (new repo has no commits)
330        if (
331            len(repo.branches) == 0 or repo.head.is_detached
332        ):  # Handle detached HEAD (e.g., after init)
333            logger.debug("The phen repository has no commits yet.")
334            commit_count = 0
335        else:
336            # Get the total number of commits in the default branch
337            commit_count = sum(1 for _ in repo.iter_commits())
338            logger.debug(f"Repo has previous commits: {commit_count}")
339    else:
340        # local repo, create the directories and init
341        phen_path.mkdir(parents=True, exist_ok=True)
342        logger.debug(f"Phen directory '{phen_path}' has been created.")
343        repo = git.Repo.init(phen_path)
344        commit_count = 0
345
346    phen_path = phen_path.resolve()
347    # initialise empty repos
348    if commit_count == 0:
349        # create initial commit
350        initial_file_path = phen_path / "README.md"
351        with open(initial_file_path, "w") as file:
352            file.write(
353                "# Initial commit\nThis is the first commit in the phen repository.\n"
354            )
355        repo.index.add([initial_file_path])
356        repo.index.commit("Initial commit")
357        commit_count = 1
358
359    # Checkout the phens default branch, creating it if it does not exist
360    if DEFAULT_GIT_BRANCH in repo.branches:
361        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
362        main_branch.checkout()
363    else:
364        main_branch = repo.create_head(DEFAULT_GIT_BRANCH)
365        main_branch.checkout()
366
367    # if the phen path does not contain the config file then initialise the phen type
368    config_path = phen_path / CONFIG_FILE
369    if config_path.exists():
370        logger.debug(f"Phenotype configuration files already exist")
371        return
372
373    logger.info("Creating phen directory structure and config files")
374    for d in DEFAULT_PHEN_DIR_LIST:
375        create_empty_git_dir(phen_path / d)
376
377    # create empty phen config file
378    config = {
379        "phenotype": {
380            "version": "0.0.0",
381            "omop": {
382                "vocabulary_id": "",
383                "vocabulary_name": "",
384                "vocabulary_reference": "",
385            },
386            "translate": [],
387            "concept_sets": [],
388        }
389    }
390
391    with open(phen_path / CONFIG_FILE, "w") as file:
392        yaml.dump(
393            config,
394            file,
395            Dumper=util.QuotedDumper,
396            default_flow_style=False,
397            sort_keys=False,
398            default_style='"',
399        )
400
401    # add git ignore
402    ignore_content = """# Ignore SQLite database files
403*.db
404*.sqlite3
405 
406# Ignore SQLite journal and metadata files
407*.db-journal
408*.sqlite3-journal
409
410# python
411.ipynb_checkpoints
412 """
413    ignore_path = phen_path / ".gitignore"
414    with open(ignore_path, "w") as file:
415        file.write(ignore_content)
416
417    # add to git repo and commit
418    for d in DEFAULT_PHEN_DIR_LIST:
419        repo.git.add(phen_path / d)
420    repo.git.add(all=True)
421    repo.index.commit("initialised the phen git repo.")
422
423    logger.info(f"Phenotype initialised successfully")

Initial phenotype directory as git repo with standard structure

def validate(phen_dir: str):
426def validate(phen_dir: str):
427    """Validates the phenotype directory is a git repo with standard structure"""
428    logger.info(f"Validating phenotype: {phen_dir}")
429    phen_path = Path(phen_dir)
430    if not phen_path.is_dir():
431        raise NotADirectoryError(
432            f"Error: '{str(phen_path.resolve())}' is not a directory"
433        )
434
435    config_path = phen_path / CONFIG_FILE
436    if not config_path.is_file():
437        raise FileNotFoundError(
438            f"Error: phen configuration file '{config_path}' does not exist."
439        )
440
441    concepts_path = phen_path / CONCEPTS_DIR
442    if not concepts_path.is_dir():
443        raise FileNotFoundError(
444            f"Error: source concepts directory {concepts_path} does not exist."
445        )
446
447    # Calidate the directory is a git repo
448    try:
449        git.Repo(phen_path)
450    except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError):
451        raise Exception(f"Phen directory {phen_path} is not a git repo")
452
453    # Load configuration File
454    if config_path.suffix == ".yaml":
455        try:
456            with config_path.open("r") as file:
457                phenotype = yaml.safe_load(file)
458
459            validator = Validator(CONFIG_SCHEMA)
460            if validator.validate(phenotype):
461                logger.debug("YAML structure is valid.")
462            else:
463                logger.error(f"YAML structure validation failed: {validator.errors}")
464                raise Exception(f"YAML structure validation failed: {validator.errors}")
465        except yaml.YAMLError as e:
466            logger.error(f"YAML syntax error: {e}")
467            raise e
468    else:
469        raise Exception(
470            f"Unsupported configuration filetype: {str(config_path.resolve())}"
471        )
472
473    # initiatise
474    validation_errors = []
475    phenotype = phenotype["phenotype"]
476    code_types = parse.CodeTypeParser().code_types
477
478    # check the version number is of the format vn.n.n
479    match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"])
480    if not match:
481        validation_errors.append(
482            f"Invalid version format in configuration file: {phenotype['version']}"
483        )
484
485    # create a list of all the concept set names defined in the concept set configuration
486    concept_set_names = []
487    for item in phenotype["concept_sets"]:
488        if item["name"] in concept_set_names:
489            validation_errors.append(
490                f"Duplicate concept set defined in concept sets {item['name'] }"
491            )
492        else:
493            concept_set_names.append(item["name"])
494
495    # check codes definition
496    for item in phenotype["concept_sets"]:
497        # check concepte code file exists
498        concept_code_file_path = concepts_path / item["file"]["path"]
499        if not concept_code_file_path.exists():
500            validation_errors.append(
501                f"Coding file {str(concept_code_file_path.resolve())} does not exist"
502            )
503
504        # check concepte code file is not empty
505        if concept_code_file_path.stat().st_size == 0:
506            validation_errors.append(
507                f"Coding file {str(concept_code_file_path.resolve())} is an empty file"
508            )
509
510        # check code file type is supported
511        if concept_code_file_path.suffix not in CODE_FILE_TYPES:
512            raise ValueError(
513                f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types"
514            )
515
516        # check columns specified are a supported medical coding type
517        for column in item["file"]["columns"]:
518            if column not in code_types:
519                validation_errors.append(
520                    f"Column type {column} for file {concept_code_file_path} is not supported"
521                )
522
523        # check the actions are supported
524        if "actions" in item["file"]:
525            for action in item["file"]["actions"]:
526                if action not in COL_ACTIONS:
527                    validation_errors.append(f"Action {action} is not supported")
528
529    if len(validation_errors) > 0:
530        logger.error(validation_errors)
531        raise PhenValidationException(
532            f"Configuration file {str(config_path.resolve())} failed validation",
533            validation_errors,
534        )
535
536    logger.info(f"Phenotype validated successfully")

Validates the phenotype directory is a git repo with standard structure

def read_table_file(path: pathlib.Path, excel_sheet: str = ''):
539def read_table_file(path: Path, excel_sheet: str = ""):
540    """
541    Load Code List File
542    """
543
544    path = path.resolve()
545    if path.suffix == ".csv":
546        df = pd.read_csv(path, dtype=str)
547    elif path.suffix == ".xlsx" or path.suffix == ".xls":
548        if excel_sheet != "":
549            df = pd.read_excel(path, sheet_name=excel_sheet, dtype=str)
550        else:
551            df = pd.read_excel(path, dtype=str)
552    elif path.suffix == ".dta":
553        df = pd.read_stata(path)
554    else:
555        raise ValueError(
556            f"Unsupported filetype {path.suffix}, only support{CODE_FILE_TYPES} code file types"
557        )
558
559    return df

Load Code List File

def process_actions( df: pandas.core.frame.DataFrame, concept_set: dict) -> pandas.core.frame.DataFrame:
562def process_actions(df: pd.DataFrame, concept_set: dict) -> pd.DataFrame:
563    # Perform Structural Changes to file before preprocessing
564    logger.debug("Processing file structural actions")
565    if (
566        "actions" in concept_set["file"]
567        and "split_col" in concept_set["file"]["actions"]
568        and "codes_col" in concept_set["file"]["actions"]
569    ):
570        split_col = concept_set["file"]["actions"]["split_col"]
571        codes_col = concept_set["file"]["actions"]["codes_col"]
572        logger.debug(
573            "Action: Splitting",
574            split_col,
575            "column into:",
576            df[split_col].unique(),
577        )
578        codes = df[codes_col]
579        oh = pd.get_dummies(df[split_col], dtype=bool)  # one hot encode
580        oh = oh.where((oh != True), codes, axis=0)  # fill in 1s with codes
581        oh[oh == False] = np.nan  # replace 0s with None
582        df = pd.concat([df, oh], axis=1)  # merge in new columns
583
584    return df
def preprocess_source_concepts( df: pandas.core.frame.DataFrame, concept_set: dict, code_file_path: pathlib.Path) -> Tuple[pandas.core.frame.DataFrame, list]:
588def preprocess_source_concepts(
589    df: pd.DataFrame, concept_set: dict, code_file_path: Path
590) -> Tuple[pd.DataFrame, list]:
591    """Parses each column individually - Order and length will not be preserved!"""
592    out = pd.DataFrame([])  # create output df to append to
593    code_errors = []  # list of errors from processing
594
595    # remove unnamed columns due to extra commas, missing headers, or incorrect parsing
596    df = df.drop(columns=[col for col in df.columns if "Unnamed" in col])
597
598    # Preprocess codes
599    code_types = parse.CodeTypeParser().code_types
600    for code_type in concept_set["file"]["columns"]:
601        parser = code_types[code_type]
602        logger.info(f"Processing {code_type} codes for {code_file_path}")
603
604        # get codes by column name
605        source_col_name = concept_set["file"]["columns"][code_type]
606        codes = df[source_col_name].dropna()
607        codes = codes.astype(str)  # convert to string
608        codes = codes.str.strip()  # remove excess spaces
609
610        # process codes, validating them using parser and returning the errors
611        codes, errors = parser.process(codes, code_file_path)
612        if len(errors) > 0:
613            code_errors.extend(errors)
614            logger.warning(f"Codes validation failed with {len(errors)} errors")
615
616        # add processed codes to df
617        new_col_name = f"{source_col_name}_SOURCE"
618        df = df.rename(columns={source_col_name: new_col_name})
619        process_codes = pd.DataFrame({code_type: codes}).join(df)
620        out = pd.concat(
621            [out, process_codes],
622            ignore_index=True,
623        )
624
625    logger.debug(out.head())
626
627    return out, code_errors

Parses each column individually - Order and length will not be preserved!

def get_code_type_from_col_name(col_name: str):
630def get_code_type_from_col_name(col_name: str):
631    return col_name.split("_")[0]
def translate_codes( source_df: pandas.core.frame.DataFrame, target_code_type: str, concept_name: str) -> pandas.core.frame.DataFrame:
635def translate_codes(
636    source_df: pd.DataFrame, target_code_type: str, concept_name: str
637) -> pd.DataFrame:
638    """Translates each source code type the source coding list into a target type and returns all conversions as a concept set"""
639
640    # codes = pd.DataFrame([], dtype=str)
641    codes = pd.DataFrame(
642        columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string"
643    )
644    # Convert codes to target type
645    logger.info(f"Converting to target code type {target_code_type}")
646
647    for source_code_type in source_df.columns:
648        # if target code type is the same as thet source code type, no translation, just appending source as target
649        if source_code_type == target_code_type:
650            copy_df = pd.DataFrame(
651                {
652                    "SOURCE_CONCEPT": source_df[source_code_type],
653                    "SOURCE_CONCEPT_TYPE": source_code_type,
654                    "CONCEPT": source_df[source_code_type],
655                }
656            )
657            codes = pd.concat([codes, copy_df])
658            logger.debug(
659                f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating"
660            )
661        else:
662            # get the translation filename using source to target code types
663            filename = f"{source_code_type}_to_{target_code_type}.parquet"
664            map_path = trud.PROCESSED_PATH / filename
665
666            # do the mapping if it exists
667            if map_path.exists():
668                # get mapping
669                df_map = pd.read_parquet(map_path)
670
671                # do mapping
672                translated_df = pd.merge(
673                    source_df[source_code_type], df_map, how="left"
674                )
675
676                # normalise the output
677                translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"])
678                translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type
679
680                # add to list of codes
681                codes = pd.concat([codes, translated_df])
682
683            else:
684                logger.warning(
685                    f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist"
686                )
687
688    codes = codes.dropna()  # delete NaNs
689
690    # added concept set type to output if any translations
691    if len(codes.index) > 0:
692        codes["CONCEPT_SET"] = concept_name
693    else:
694        logger.debug(f"No codes converted with target code type {target_code_type}")
695
696    return codes

Translates each source code type the source coding list into a target type and returns all conversions as a concept set

def sql_row_exist(conn: sqlite3.Connection, table: str, column: str, value: str) -> bool:
699def sql_row_exist(
700    conn: sqlite3.Connection, table: str, column: str, value: str
701) -> bool:
702    # Execute and check if a result exists
703    cur = conn.cursor()
704    query = f"SELECT 1 FROM {table} WHERE {column} = ? LIMIT 1;"
705    cur.execute(query, (value,))
706    exists = cur.fetchone() is not None
707
708    return exists
def write_code_errors(code_errors: list, code_errors_path: pathlib.Path):
711def write_code_errors(code_errors: list, code_errors_path: Path):
712    err_df = pd.DataFrame(
713        [
714            {
715                "CONCEPT": ", ".join(err.codes[~err.mask].tolist()),
716                "VOCABULARY": err.code_type,
717                "SOURCE": err.codes_file,
718                "CAUSE": err.message,
719            }
720            for err in code_errors
721        ]
722    )
723
724    err_df = err_df.drop_duplicates()  # Remove Duplicates from Error file
725    err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"])
726    err_df.to_csv(code_errors_path, index=False, mode="w")
def write_vocab_version(phen_path: pathlib.Path):
729def write_vocab_version(phen_path: Path):
730    # write the vocab version files
731
732    if not trud.VERSION_PATH.exists():
733        raise FileNotFoundError(
734            f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed"
735        )
736
737    if not omop.VERSION_PATH.exists():
738        raise FileNotFoundError(
739            f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed"
740        )
741
742    with trud.VERSION_PATH.open("r") as file:
743        trud_version = yaml.safe_load(file)
744
745    with omop.VERSION_PATH.open("r") as file:
746        omop_version = yaml.safe_load(file)
747
748    # Create the combined YAML structure
749    version_data = {
750        "versions": {
751            "acmc": acmc.__version__,
752            "trud": trud_version,
753            "omop": omop_version,
754        }
755    }
756
757    with open(phen_path / VOCAB_VERSION_FILE, "w") as file:
758        yaml.dump(
759            version_data,
760            file,
761            Dumper=util.QuotedDumper,
762            default_flow_style=False,
763            sort_keys=False,
764            default_style='"',
765        )
def map(phen_dir: str, target_code_type: str):
768def map(phen_dir: str, target_code_type: str):
769    logger.info(f"Processing phenotype: {phen_dir}")
770
771    # Validate configuration
772    validate(phen_dir)
773
774    # initialise paths
775    phen_path = Path(phen_dir)
776    config_path = phen_path / CONFIG_FILE
777
778    # load configuration
779    with config_path.open("r") as file:
780        config = yaml.safe_load(file)
781    phenotype = config["phenotype"]
782
783    if len(phenotype["map"]) == 0:
784        raise ValueError(f"No map codes defined in the phenotype configuration")
785
786    if target_code_type is not None and target_code_type not in phenotype["map"]:
787        raise ValueError(
788            f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}"
789        )
790
791    if target_code_type is not None:
792        map_target_code_type(phen_path, phenotype, target_code_type)
793    else:
794        for t in phenotype["map"]:
795            map_target_code_type(phen_path, phenotype, t)
796
797    logger.info(f"Phenotype processed successfully")
def map_target_code_type(phen_path: pathlib.Path, phenotype: dict, target_code_type: str):
800def map_target_code_type(phen_path: Path, phenotype: dict, target_code_type: str):
801    logger.debug(f"Target coding format: {target_code_type}")
802    concepts_path = phen_path / CONCEPTS_DIR
803    # Create output dataframe
804    out = pd.DataFrame([])
805    code_errors = []
806
807    # Process each folder in codes section
808    for concept_set in phenotype["concept_sets"]:
809        logger.debug(f"--- {concept_set['file']} ---")
810
811        # Load code file
812        codes_file_path = Path(concepts_path / concept_set["file"]["path"])
813        df = read_table_file(codes_file_path)
814
815        # process structural actions
816        df = process_actions(df, concept_set)
817
818        # preprocessing and validate of source concepts
819        logger.debug("Processing and validating source concept codes")
820        df, errors = preprocess_source_concepts(
821            df,
822            concept_set,
823            codes_file_path,
824        )
825
826        # create df with just the source code columns
827        source_column_names = list(concept_set["file"]["columns"].keys())
828        source_df = df[source_column_names]
829
830        logger.debug(source_df.columns)
831        logger.debug(source_df.head())
832
833        logger.debug(f"Length of errors from preprocess_source_concepts {len(errors)}")
834        if len(errors) > 0:
835            code_errors.extend(errors)
836        logger.debug(f" Length of code_errors {len(code_errors)}")
837
838        # Map source concepts codes to target codes
839        # if processing a source coding list with categorical data
840        if (
841            "actions" in concept_set["file"]
842            and "divide_col" in concept_set["file"]["actions"]
843            and len(df) > 0
844        ):
845            divide_col = concept_set["file"]["actions"]["divide_col"]
846            logger.debug(f"Action: Dividing Table by {divide_col}")
847            logger.debug(f"column into: {df[divide_col].unique()}")
848            df_grp = df.groupby(divide_col)
849            for cat, grp in df_grp:
850                if cat == concept_set["file"]["category"]:
851                    grp = grp.drop(columns=[divide_col])  # delete categorical column
852                    source_df = grp[source_column_names]
853                    trans_out = translate_codes(
854                        source_df,
855                        target_code_type=target_code_type,
856                        concept_name=concept_set["name"],
857                    )
858                    out = pd.concat([out, trans_out])
859        else:
860            source_df = df[source_column_names]
861            trans_out = translate_codes(
862                source_df,
863                target_code_type=target_code_type,
864                concept_name=concept_set["name"],
865            )
866            out = pd.concat([out, trans_out])
867
868    if len(code_errors) > 0:
869        logger.error(f"The map processing has {len(code_errors)} errors")
870        error_path = phen_path / MAP_DIR / "errors"
871        error_path.mkdir(parents=True, exist_ok=True)
872        error_filename = f"{target_code_type}-code-errors.csv"
873        write_code_errors(code_errors, error_path / error_filename)
874
875    # Check there is output from processing
876    if len(out.index) == 0:
877        logger.error(f"No output after map processing")
878        raise Exception(
879            f"No output after map processing, check config {str(phen_path.resolve())}"
880        )
881
882    # final processing
883    out = out.reset_index(drop=True)
884    out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
885    out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"])
886
887    out_count = len(out.index)
888    # added metadata
889    # Loop over each source_concept_type and perform the left join on all columns apart from source code columns
890    result_list = []
891    source_column_names = list(concept_set["file"]["columns"].keys())
892    for source_concept_type in source_column_names:
893        # Filter output based on the current source_concept_type
894        out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type]
895        filtered_count = len(out_filtered_df.index)
896
897        # Remove the source type columns except the current type will leave the metadata and the join
898        remove_types = [
899            type for type in source_column_names if type != source_concept_type
900        ]
901        metadata_df = df.drop(columns=remove_types)
902        metadata_df = metadata_df.rename(
903            columns={source_concept_type: "SOURCE_CONCEPT"}
904        )
905        metadata_df_count = len(metadata_df.index)
906
907        # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata
908        result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT")
909        result_count = len(result.index)
910
911        logger.debug(
912            f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}"
913        )
914
915        # Append the result to the result_list
916        result_list.append(result)
917
918    # Concatenate all the results into a single DataFrame
919    final_out = pd.concat(result_list, ignore_index=True)
920    final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
921    logger.debug(
922        f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}"
923    )
924
925    # Save output to map directory
926    output_filename = target_code_type + ".csv"
927    map_path = phen_path / MAP_DIR / output_filename
928    final_out.to_csv(map_path, index=False)
929    logger.info(f"Saved mapped concepts to {str(map_path.resolve())}")
930
931    # save concept sets as separate files
932    concept_set_path = phen_path / CSV_PATH / target_code_type
933
934    # empty the concept-set directory except for hiddle files, e.g. .git
935    if concept_set_path.exists():
936        for item in concept_set_path.iterdir():
937            if not item.name.startswith("."):
938                item.unlink()
939    else:
940        concept_set_path.mkdir(parents=True, exist_ok=True)
941
942    # write each concept as a separate file
943    for name, concept in final_out.groupby("CONCEPT_SET"):
944        concept = concept.sort_values(by="CONCEPT")  # sort rows
945        concept = concept.dropna(how="all", axis=1)  # remove empty cols
946        concept = concept.reindex(
947            sorted(concept.columns), axis=1
948        )  # sort cols alphabetically
949        filename = f"{name}.csv"
950        concept_path = concept_set_path / filename
951        concept.to_csv(concept_path, index=False)
952
953    write_vocab_version(phen_path)
954
955    logger.info(f"Phenotype processed target code type {target_code_type}")
def generate_version_tag( repo: git.repo.base.Repo, increment: str = 'patch', use_v_prefix: bool = False) -> str:
958def generate_version_tag(
959    repo: git.Repo, increment: str = DEFAULT_VERSION_INC, use_v_prefix: bool = False
960) -> str:
961    # Get all valid semantic version tags
962    versions = []
963    for tag in repo.tags:
964        tag_name = (
965            tag.name.lstrip("v") if use_v_prefix else tag.name
966        )  # Remove 'v' if needed
967        if semver.Version.is_valid(tag_name):
968            versions.append(semver.Version.parse(tag_name))
969
970    # Determine the next version
971    if not versions:
972        new_version = semver.Version(0, 0, 1)
973    else:
974        latest_version = max(versions)
975        if increment == "major":
976            new_version = latest_version.bump_major()
977        elif increment == "minor":
978            new_version = latest_version.bump_minor()
979        else:
980            new_version = latest_version.bump_patch()
981
982    # Create the new tag
983    new_version_str = f"v{new_version}" if use_v_prefix else str(new_version)
984
985    return new_version_str
def publish(phen_dir: str, msg: str, remote_url: str, increment: str = 'patch'):
 988def publish(
 989    phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC
 990):
 991    """Publishes updates to the phenotype by commiting all changes to the repo directory"""
 992
 993    # Validate config
 994    validate(phen_dir)
 995    phen_path = Path(phen_dir)
 996
 997    # load git repo and set the branch
 998    repo = git.Repo(phen_path)
 999    if DEFAULT_GIT_BRANCH in repo.branches:
1000        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
1001        main_branch.checkout()
1002    else:
1003        raise AttributeError(
1004            f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}"
1005        )
1006
1007    # check if any changes to publish
1008    if not repo.is_dirty() and not repo.untracked_files:
1009        if remote_url is not None and "origin" not in repo.remotes:
1010            logger.info(f"First publish to remote url {remote_url}")
1011        else:
1012            logger.info("Nothing to publish, no changes to the repo")
1013            return
1014
1015    # get next version
1016    new_version_str = generate_version_tag(repo, increment)
1017    logger.info(f"New version: {new_version_str}")
1018
1019    # Write version in configuration file
1020    config_path = phen_path / CONFIG_FILE
1021    with config_path.open("r") as file:
1022        config = yaml.safe_load(file)
1023
1024    config["phenotype"]["version"] = new_version_str
1025    with open(config_path, "w") as file:
1026        yaml.dump(
1027            config,
1028            file,
1029            Dumper=util.QuotedDumper,
1030            default_flow_style=False,
1031            sort_keys=False,
1032            default_style='"',
1033        )
1034
1035    # Add and commit changes to repo including version updates
1036    commit_message = f"Committing updates to phenotype {phen_path}"
1037    repo.git.add("--all")
1038    repo.index.commit(commit_message)
1039
1040    # Add tag to the repo
1041    repo.create_tag(new_version_str)
1042
1043    # push to origin if a remote repo
1044    if remote_url is not None and "origin" not in repo.remotes:
1045        git_url = construct_git_url(remote_url)
1046        repo.create_remote("origin", git_url)
1047
1048    try:
1049        if "origin" in repo.remotes:
1050            logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}")
1051            origin = repo.remotes.origin
1052            logger.info(f"Pushing main branch to remote repo")
1053            repo.git.push("--set-upstream", "origin", "main")
1054            logger.info(f"Pushing version tags to remote git repo")
1055            origin.push(tags=True)
1056            logger.debug("Changes pushed to 'origin'")
1057        else:
1058            logger.debug("Remote 'origin' is not set")
1059    except Exception as e:
1060        tag_ref = repo.tags[new_version_str]
1061        repo.delete_tag(tag_ref)
1062        repo.git.reset("--soft", "HEAD~1")
1063        raise e
1064
1065    logger.info(f"Phenotype published successfully")

Publishes updates to the phenotype by commiting all changes to the repo directory

def export(phen_dir: str, version: str):
1068def export(phen_dir: str, version: str):
1069    """Exports a phen repo at a specific tagged version into a target directory"""
1070    logger.info(f"Exporting phenotype {phen_dir} at version {version}")
1071
1072    # validate configuration
1073    validate(phen_dir)
1074    phen_path = Path(phen_dir)
1075
1076    # load configuration
1077    config_path = phen_path / CONFIG_FILE
1078    with config_path.open("r") as file:
1079        config = yaml.safe_load(file)
1080
1081    map_path = phen_path / MAP_DIR
1082    if not map_path.exists():
1083        logger.warning(f"Map path does not exist '{map_path}'")
1084
1085    export_path = phen_path / OMOP_PATH
1086    # check export directory exists and if not create it
1087    if not export_path.exists():
1088        export_path.mkdir(parents=True)
1089        logger.debug(f"OMOP export directory '{export_path}' created.")
1090
1091    # omop export db
1092    export_db_path = omop.export(
1093        map_path,
1094        export_path,
1095        config["phenotype"]["version"],
1096        config["phenotype"]["omop"],
1097    )
1098
1099    # write to tables
1100    # export as csv
1101    logger.info(f"Phenotype exported successfully")

Exports a phen repo at a specific tagged version into a target directory

def copy(phen_dir: str, target_dir: str, version: str):
1104def copy(phen_dir: str, target_dir: str, version: str):
1105    """Copys a phen repo at a specific tagged version into a target directory"""
1106
1107    # Validate
1108    validate(phen_dir)
1109    phen_path = Path(phen_dir)
1110
1111    # Check target directory exists
1112    target_path = Path(target_dir)
1113    if not target_path.exists():
1114        raise FileNotFoundError(f"The target directory {target_path} does not exist")
1115
1116    # Set copy directory
1117    copy_path = target_path / version
1118    logger.info(f"Copying repo {phen_path} to {copy_path}")
1119
1120    if (
1121        copy_path.exists() and copy_path.is_dir()
1122    ):  # Check if it exists and is a directory
1123        copy = check_delete_dir(
1124            copy_path,
1125            f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ",
1126        )
1127    else:
1128        copy = True
1129
1130    if not copy:
1131        logger.info(f"Not copying the version {version}")
1132        return
1133
1134    logger.debug(f"Cloning repo from {phen_path} into {copy_path}...")
1135    repo = git.Repo.clone_from(phen_path, copy_path)
1136
1137    # Check out the latest commit or specified version
1138    if version:
1139        # Checkout a specific version (e.g., branch, tag, or commit hash)
1140        logger.info(f"Checking out version {version}...")
1141        repo.git.checkout(version)
1142    else:
1143        # Checkout the latest commit (HEAD)
1144        logger.info(f"Checking out the latest commit...")
1145        repo.git.checkout("HEAD")
1146
1147    logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}")
1148
1149    logger.info(f"Phenotype copied successfully")

Copys a phen repo at a specific tagged version into a target directory

def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1153def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1154    """Extracts concepts as {name: file_path} dictionary and a name set."""
1155    concepts_dict = {
1156        item["name"]: item["file"]["path"]
1157        for item in config_data["phenotype"]["concept_sets"]
1158    }
1159    name_set = set(concepts_dict.keys())
1160    return concepts_dict, name_set

Extracts concepts as {name: file_path} dictionary and a name set.

def extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]:
1163def extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]:
1164    """
1165    Extracts clean keys from a DeepDiff dictionary.
1166
1167    :param diff: DeepDiff result dictionary
1168    :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed")
1169    :return: A set of clean key names
1170    """
1171    return {key.split("root['")[1].split("']")[0] for key in diff.get(key_type, [])}

Extracts clean keys from a DeepDiff dictionary.

:param diff: DeepDiff result dictionary :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed") :return: A set of clean key names

def diff_config(old_config: dict, new_config: dict) -> str:
1174def diff_config(old_config: dict, new_config: dict) -> str:
1175    report = f"\n# Changes to phenotype configuration\n"
1176    report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n"
1177
1178    old_concepts, old_names = extract_concepts(old_config)
1179    new_concepts, new_names = extract_concepts(new_config)
1180
1181    # Check added and removed names
1182    added_names = new_names - old_names  # Names that appear in new but not in old
1183    removed_names = old_names - new_names  # Names that were in old but not in new
1184
1185    # find file path changes for unchanged names
1186    unchanged_names = old_names & new_names  # Names that exist in both
1187    file_diff = DeepDiff(
1188        {name: old_concepts[name] for name in unchanged_names},
1189        {name: new_concepts[name] for name in unchanged_names},
1190    )
1191
1192    # Find renamed concepts (same file, different name)
1193    renamed_concepts = []
1194    for removed in removed_names:
1195        old_path = old_concepts[removed]
1196        for added in added_names:
1197            new_path = new_concepts[added]
1198            if old_path == new_path:
1199                renamed_concepts.append((removed, added))
1200
1201    # Remove renamed concepts from added and removed sets
1202    for old_name, new_name in renamed_concepts:
1203        added_names.discard(new_name)
1204        removed_names.discard(old_name)
1205
1206    # generate config report
1207    if added_names:
1208        report += "## Added Concepts\n"
1209        for name in added_names:
1210            report += f"- `{name}` (File: `{new_concepts[name]}`)\n"
1211        report += "\n"
1212
1213    if removed_names:
1214        report += "## Removed Concepts\n"
1215        for name in removed_names:
1216            report += f"- `{name}` (File: `{old_concepts[name]}`)\n"
1217        report += "\n"
1218
1219    if renamed_concepts:
1220        report += "## Renamed Concepts\n"
1221        for old_name, new_name in renamed_concepts:
1222            report += (
1223                f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n"
1224            )
1225        report += "\n"
1226
1227    if "values_changed" in file_diff:
1228        report += "## Updated File Paths\n"
1229        for name, change in file_diff["values_changed"].items():
1230            old_file = change["old_value"]
1231            new_file = change["new_value"]
1232            clean_name = name.split("root['")[1].split("']")[0]
1233            report += (
1234                f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n"
1235            )
1236        report += "\n"
1237
1238    if not (
1239        added_names
1240        or removed_names
1241        or renamed_concepts
1242        or file_diff.get("values_changed")
1243    ):
1244        report += "No changes in concept sets.\n"
1245
1246    return report
def diff_map_files(old_map_path: pathlib.Path, new_map_path: pathlib.Path) -> str:
1249def diff_map_files(old_map_path: Path, new_map_path: Path) -> str:
1250    old_output_files = [
1251        file.name
1252        for file in old_map_path.iterdir()
1253        if file.is_file() and not file.name.startswith(".")
1254    ]
1255    new_output_files = [
1256        file.name
1257        for file in new_map_path.iterdir()
1258        if file.is_file() and not file.name.startswith(".")
1259    ]
1260
1261    # Convert the lists to sets for easy comparison
1262    old_output_set = set(old_output_files)
1263    new_output_set = set(new_output_files)
1264
1265    # Outputs that are in old_output_set but not in new_output_set (removed files)
1266    removed_outputs = old_output_set - new_output_set
1267    # Outputs that are in new_output_set but not in old_output_set (added files)
1268    added_outputs = new_output_set - old_output_set
1269    # Outputs that are the intersection of old_output_set and new_output_set
1270    common_outputs = old_output_set & new_output_set
1271
1272    report = f"\n# Changes to available translations\n"
1273    report += f"This compares the coding translations files available.\n\n"
1274    report += f"- Removed outputs: {sorted(list(removed_outputs))}\n"
1275    report += f"- Added outputs: {sorted(list(added_outputs))}\n"
1276    report += f"- Common outputs: {sorted(list(common_outputs))}\n\n"
1277
1278    # Step N: Compare common outputs between versions
1279    report += f"# Changes to concepts in translation files\n\n"
1280    report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n"
1281    for file in common_outputs:
1282        old_output = old_map_path / file
1283        new_output = new_map_path / file
1284
1285        logger.debug(f"Old ouptput: {str(old_output.resolve())}")
1286        logger.debug(f"New ouptput: {str(new_output.resolve())}")
1287
1288        df1 = pd.read_csv(old_output)
1289        df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1290        df2 = pd.read_csv(new_output)
1291        df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1292
1293        # Check for added and removed concepts
1294        report += f"- File {file}\n"
1295        sorted_list = sorted(list(set(df1.index) - set(df2.index)))
1296        report += f"- Removed concepts {sorted_list}\n"
1297        sorted_list = sorted(list(set(df2.index) - set(df1.index)))
1298        report += f"- Added concepts {sorted_list}\n"
1299
1300        # Check for changed concepts
1301        diff = df2 - df1  # diff in counts
1302        diff = diff[
1303            (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna()
1304        ]  # get non-zero counts
1305        s = "\n"
1306        if len(diff.index) > 0:
1307            for concept, row in diff.iterrows():
1308                s += "\t - {} {}\n".format(concept, row["CONCEPT"])
1309            report += f"- Changed concepts {s}\n\n"
1310        else:
1311            report += f"- Changed concepts []\n\n"
1312
1313    return report
def diff_phen( new_phen_path: pathlib.Path, new_version: str, old_phen_path: pathlib.Path, old_version: str, report_path: pathlib.Path):
1316def diff_phen(
1317    new_phen_path: Path,
1318    new_version: str,
1319    old_phen_path: Path,
1320    old_version: str,
1321    report_path: Path,
1322):
1323    """Compare the differences between two versions of a phenotype"""
1324
1325    # validate phenotypes
1326    logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}")
1327    validate(str(old_phen_path.resolve()))
1328    logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}")
1329    validate(str(new_phen_path.resolve()))
1330
1331    # get old and new config
1332    old_config_path = old_phen_path / CONFIG_FILE
1333    with old_config_path.open("r") as file:
1334        old_config = yaml.safe_load(file)
1335    new_config_path = new_phen_path / CONFIG_FILE
1336    with new_config_path.open("r") as file:
1337        new_config = yaml.safe_load(file)
1338
1339    # write report heading
1340    report = f"# Phenotype Comparison Report\n"
1341    report += f"## Original phenotype\n"
1342    report += f"  - {old_config['phenotype']['omop']['vocabulary_id']}\n"
1343    report += f"  - {old_version}\n"
1344    report += f"  - {str(old_phen_path.resolve())}\n"
1345    report += f"## Changed phenotype:\n"
1346    report += f"  - {new_config['phenotype']['omop']['vocabulary_id']}\n"
1347    report += f"  - {new_version}\n"
1348    report += f"  - {str(new_phen_path.resolve())}\n"
1349
1350    # Step 1: check differences configuration files
1351    # Convert list of dicts into a dict: {name: file}
1352    report += diff_config(old_config, new_config)
1353
1354    # Step 2: check differences between map files
1355    # List files from output directories
1356    old_map_path = old_phen_path / MAP_DIR
1357    new_map_path = new_phen_path / MAP_DIR
1358    report += diff_map_files(old_map_path, new_map_path)
1359
1360    # initialise report file
1361    logger.debug(f"Writing to report file {str(report_path.resolve())}")
1362    report_file = open(report_path, "w")
1363    report_file.write(report)
1364    report_file.close()
1365
1366    logger.info(f"Phenotypes diff'd successfully")

Compare the differences between two versions of a phenotype

def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str):
1369def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str):
1370    # make tmp directory .acmc
1371    timestamp = time.strftime("%Y%m%d_%H%M%S")
1372    temp_dir = Path(f".acmc/diff_{timestamp}")
1373
1374    changed_phen_path = Path(phen_dir)
1375    if not changed_phen_path.exists():
1376        raise ValueError(
1377            f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}"
1378        )
1379
1380    old_phen_path = Path(old_phen_dir)
1381    if not old_phen_path.exists():
1382        raise ValueError(
1383            f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}"
1384        )
1385
1386    try:
1387        # Create the directory
1388        temp_dir.mkdir(parents=True, exist_ok=True)
1389        logger.debug(f"Temporary directory created: {temp_dir}")
1390
1391        # Create temporary directories
1392        changed_path = temp_dir / "changed"
1393        changed_path.mkdir(parents=True, exist_ok=True)
1394        old_path = temp_dir / "old"
1395        old_path.mkdir(parents=True, exist_ok=True)
1396
1397        # checkout changed
1398        if version == "latest":
1399            logger.debug(
1400                f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..."
1401            )
1402            shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True)
1403        else:
1404            logger.debug(
1405                f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..."
1406            )
1407            changed_repo = git.Repo.clone_from(changed_phen_path, changed_path)
1408            changed_repo.git.checkout(version)
1409
1410        # checkout old
1411        if old_version == "latest":
1412            logger.debug(
1413                f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1414            )
1415            shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True)
1416        else:
1417            logger.debug(
1418                f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1419            )
1420            old_repo = git.Repo.clone_from(old_phen_dir, old_path)
1421            old_repo.git.checkout(old_version)
1422
1423        report_filename = f"{version}_{old_version}_diff.md"
1424        report_path = changed_phen_path / report_filename
1425        # diff old with new
1426        diff_phen(changed_path, version, old_path, old_version, report_path)
1427
1428    finally:
1429        # clean up tmp directory
1430        if temp_dir.exists():
1431            shutil.rmtree(temp_dir)
1432            print(f"Temporary directory removed: {temp_dir}")