acmc.phen

phenotype.py module

This module provides functionality for managing phenotypes.

   1"""
   2phenotype.py module
   3
   4This module provides functionality for managing phenotypes.
   5"""
   6
   7import argparse
   8import pandas as pd
   9import numpy as np
  10import json
  11import os
  12import sqlite3
  13import sys
  14import shutil
  15import time
  16import git
  17import re
  18import logging
  19import requests
  20import yaml
  21import semver
  22from git import Repo
  23from cerberus import Validator  # type: ignore
  24from deepdiff import DeepDiff
  25from pathlib import Path
  26from urllib.parse import urlparse, urlunparse
  27from typing import Tuple, Set, Any
  28import acmc
  29from acmc import trud, omop, parse, util, logging_config as lc
  30
  31# set up logging
  32_logger = lc.setup_logger()
  33
  34pd.set_option("mode.chained_assignment", None)
  35
  36PHEN_DIR = "phen"
  37"""Default phenotype directory name"""
  38
  39DEFAULT_PHEN_PATH = Path("./workspace") / PHEN_DIR
  40"""Default phenotype directory path"""
  41
  42CONCEPTS_DIR = "concepts"
  43"""Default concepts directory name"""
  44
  45MAP_DIR = "map"
  46"""Default map directory name"""
  47
  48CONCEPT_SET_DIR = "concept-sets"
  49"""Default concept set directory name"""
  50
  51CSV_PATH = Path(CONCEPT_SET_DIR) / "csv"
  52"""Default CSV concept set directory path"""
  53
  54OMOP_PATH = Path(CONCEPT_SET_DIR) / "omop"
  55"""Default OMOP concept set directory path"""
  56
  57DEFAULT_PHEN_DIR_LIST = [CONCEPTS_DIR, MAP_DIR, CONCEPT_SET_DIR]
  58"""List of default phenotype directories"""
  59
  60CONFIG_FILE = "config.yml"
  61"""Default configuration filename"""
  62
  63VOCAB_VERSION_FILE = "vocab_version.yml"
  64"""Default vocabulary version filename"""
  65
  66SEMANTIC_VERSION_TYPES = ["major", "minor", "patch"]
  67"""List of semantic version increment types"""
  68
  69DEFAULT_VERSION_INC = "patch"
  70"""Default semantic version increment type"""
  71
  72DEFAULT_GIT_BRANCH = "main"
  73"""Default phenotype repo branch name"""
  74
  75SPLIT_COL_ACTION = "split_col"
  76"""Split column preprocessing action type"""
  77
  78CODES_COL_ACTION = "codes_col"
  79"""Codes column preprocessing action type"""
  80
  81DIVIDE_COL_ACTION = "divide_col"
  82"""Divide column preprocessing action type"""
  83
  84COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION]
  85"""List of column preprocessing action types"""
  86
  87CODE_FILE_TYPES = [".xlsx", ".xls", ".csv"]
  88"""List of supported source concept coding list file types"""
  89
  90# config.yaml schema
  91CONFIG_SCHEMA = {
  92    "phenotype": {
  93        "type": "dict",
  94        "required": True,
  95        "schema": {
  96            "version": {
  97                "type": "string",
  98                "required": True,
  99                "regex": r"^\d+\.\d+\.\d+$",  # Enforces 'vN.N.N' format
 100            },
 101            "omop": {
 102                "type": "dict",
 103                "required": True,
 104                "schema": {
 105                    "vocabulary_id": {"type": "string", "required": True},
 106                    "vocabulary_name": {"type": "string", "required": True},
 107                    "vocabulary_reference": {
 108                        "type": "string",
 109                        "required": True,
 110                        "regex": r"^https?://.*",  # Ensures it's a URL
 111                    },
 112                },
 113            },
 114            "map": {
 115                "type": "list",
 116                "schema": {
 117                    "type": "string",
 118                    "allowed": list(
 119                        parse.SUPPORTED_CODE_TYPES
 120                    ),  # Ensure only predefined values are allowed
 121                },
 122            },
 123            "concept_sets": {
 124                "type": "list",
 125                "required": True,
 126                "schema": {
 127                    "type": "dict",
 128                    "schema": {
 129                        "name": {"type": "string", "required": True},
 130                        "file": {
 131                            "type": "dict",
 132                            "required": False,
 133                            "schema": {
 134                                "path": {"type": "string", "required": True},
 135                                "columns": {"type": "dict", "required": True},
 136                                "category": {
 137                                    "type": "string"
 138                                },  # Optional but must be string if present
 139                                "actions": {
 140                                    "type": "dict",
 141                                    "schema": {"divide_col": {"type": "string"}},
 142                                },
 143                            },
 144                        },
 145                        "metadata": {"type": "dict", "required": True},
 146                    },
 147                },
 148            },
 149        },
 150    }
 151}
 152"""Phenotype config.yml schema definition"""
 153
 154
 155class PhenValidationException(Exception):
 156    """Custom exception class raised when validation errors in phenotype configuration file"""
 157
 158    def __init__(self, message, validation_errors=None):
 159        super().__init__(message)
 160        self.validation_errors = validation_errors
 161
 162
 163def _construct_git_url(remote_url: str):
 164    """Constructs a git url for github or gitlab including a PAT token environment variable"""
 165    # check the url
 166    parsed_url = urlparse(remote_url)
 167
 168    # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd
 169    # need to update this function if the URL scheme is different.
 170    if "github.com" in parsed_url.netloc:
 171        # get GitHub PAT from environment variable
 172        auth = os.getenv("ACMC_GITHUB_PAT")
 173        if not auth:
 174            raise ValueError(
 175                "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable."
 176            )
 177    else:
 178        # get GitLab PAT from environment variable
 179        auth = os.getenv("ACMC_GITLAB_PAT")
 180        if not auth:
 181            raise ValueError(
 182                "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable."
 183            )
 184        auth = f"oauth2:{auth}"
 185
 186    # Construct the new URL with credentials
 187    new_netloc = f"{auth}@{parsed_url.netloc}"
 188    return urlunparse(
 189        (
 190            parsed_url.scheme,
 191            new_netloc,
 192            parsed_url.path,
 193            parsed_url.params,
 194            parsed_url.query,
 195            parsed_url.fragment,
 196        )
 197    )
 198
 199
 200def _create_empty_git_dir(path: Path):
 201    """Creates a directory with a .gitkeep file so that it's tracked in git"""
 202    path.mkdir(exist_ok=True)
 203    keep_path = path / ".gitkeep"
 204    keep_path.touch(exist_ok=True)
 205
 206
 207def _check_delete_dir(path: Path, msg: str) -> bool:
 208    """Checks on the command line if a user wants to delete a directory
 209
 210    Args:
 211        path (Path): path of the directory to be deleted
 212        msg (str): message to be displayed to the user
 213
 214    Returns:
 215        Boolean: True if deleted
 216    """
 217    deleted = False
 218
 219    user_input = input(f"{msg}").strip().lower()
 220    if user_input in ["yes", "y"]:
 221        shutil.rmtree(path)
 222        deleted = True
 223    else:
 224        _logger.info("Directory was not deleted.")
 225
 226    return deleted
 227
 228
 229def init(phen_dir: str, remote_url: str):
 230    """Initial phenotype directory as git repo with standard structure"""
 231    _logger.info(f"Initialising Phenotype in directory: {phen_dir}")
 232    phen_path = Path(phen_dir)
 233
 234    # check if directory already exists and ask user if they want to recreate it
 235    if (
 236        phen_path.exists() and phen_path.is_dir()
 237    ):  # Check if it exists and is a directory
 238        configure = _check_delete_dir(
 239            phen_path,
 240            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
 241        )
 242    else:
 243        configure = True
 244
 245    if not configure:
 246        _logger.info(f"Exiting, phenotype not initiatised")
 247        return
 248
 249    # Initialise repo from local or remote
 250    repo: Repo
 251
 252    # if remote then clone the repo otherwise init a local repo
 253    if remote_url != None:
 254        # add PAT token to the URL
 255        git_url = _construct_git_url(remote_url)
 256
 257        # clone the repo
 258        git_cmd = git.cmd.Git()
 259        git_cmd.clone(git_url, phen_path)
 260
 261        # open repo
 262        repo = Repo(phen_path)
 263        # check if there are any commits (new repo has no commits)
 264        if (
 265            len(repo.branches) == 0 or repo.head.is_detached
 266        ):  # Handle detached HEAD (e.g., after init)
 267            _logger.debug("The phen repository has no commits yet.")
 268            commit_count = 0
 269        else:
 270            # Get the total number of commits in the default branch
 271            commit_count = sum(1 for _ in repo.iter_commits())
 272            _logger.debug(f"Repo has previous commits: {commit_count}")
 273    else:
 274        # local repo, create the directories and init
 275        phen_path.mkdir(parents=True, exist_ok=True)
 276        _logger.debug(f"Phen directory '{phen_path}' has been created.")
 277        repo = git.Repo.init(phen_path)
 278        commit_count = 0
 279
 280    phen_path = phen_path.resolve()
 281    # initialise empty repos
 282    if commit_count == 0:
 283        # create initial commit
 284        initial_file_path = phen_path / "README.md"
 285        with open(initial_file_path, "w") as file:
 286            file.write(
 287                "# Initial commit\nThis is the first commit in the phen repository.\n"
 288            )
 289        repo.index.add([initial_file_path])
 290        repo.index.commit("Initial commit")
 291        commit_count = 1
 292
 293    # Checkout the phens default branch, creating it if it does not exist
 294    if DEFAULT_GIT_BRANCH in repo.branches:
 295        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
 296        main_branch.checkout()
 297    else:
 298        main_branch = repo.create_head(DEFAULT_GIT_BRANCH)
 299        main_branch.checkout()
 300
 301    # if the phen path does not contain the config file then initialise the phen type
 302    config_path = phen_path / CONFIG_FILE
 303    if config_path.exists():
 304        _logger.debug(f"Phenotype configuration files already exist")
 305        return
 306
 307    _logger.info("Creating phen directory structure and config files")
 308    for d in DEFAULT_PHEN_DIR_LIST:
 309        _create_empty_git_dir(phen_path / d)
 310
 311    # create empty phen config file
 312    config = {
 313        "phenotype": {
 314            "version": "0.0.0",
 315            "omop": {
 316                "vocabulary_id": "",
 317                "vocabulary_name": "",
 318                "vocabulary_reference": "",
 319            },
 320            "translate": [],
 321            "concept_sets": [],
 322        }
 323    }
 324
 325    with open(phen_path / CONFIG_FILE, "w") as file:
 326        yaml.dump(
 327            config,
 328            file,
 329            Dumper=util.QuotedDumper,
 330            default_flow_style=False,
 331            sort_keys=False,
 332            default_style='"',
 333        )
 334
 335    # add git ignore
 336    ignore_content = """# Ignore SQLite database files
 337*.db
 338*.sqlite3
 339 
 340# Ignore SQLite journal and metadata files
 341*.db-journal
 342*.sqlite3-journal
 343
 344# python
 345.ipynb_checkpoints
 346 """
 347    ignore_path = phen_path / ".gitignore"
 348    with open(ignore_path, "w") as file:
 349        file.write(ignore_content)
 350
 351    # add to git repo and commit
 352    for d in DEFAULT_PHEN_DIR_LIST:
 353        repo.git.add(phen_path / d)
 354    repo.git.add(all=True)
 355    repo.index.commit("initialised the phen git repo.")
 356
 357    _logger.info(f"Phenotype initialised successfully")
 358
 359
 360def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
 361    """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
 362
 363    Args:
 364        phen_dir (str): local directory path where the upstream repo is to be cloned
 365        upstream_url (str): url to the upstream repo
 366        upstream_version (str): version in the upstream repo to clone
 367        new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
 368
 369    Raises:
 370        ValueError: if the specified version is not in the upstream repo
 371        ValueError: if the upstream repo is not a valid phenotype repo
 372        ValueError: if there's any other problems with Git
 373    """
 374    _logger.info(
 375        f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}"
 376    )
 377
 378    phen_path = Path(phen_dir)
 379    # check if directory already exists and ask user if they want to recreate it
 380    if (
 381        phen_path.exists() and phen_path.is_dir()
 382    ):  # Check if it exists and is a directory
 383        configure = _check_delete_dir(
 384            phen_path,
 385            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
 386        )
 387    else:
 388        configure = True
 389
 390    if not configure:
 391        _logger.info(f"Exiting, phenotype not initiatised")
 392        return
 393
 394    try:
 395        # Clone repo
 396        git_url = _construct_git_url(upstream_url)
 397        repo = git.Repo.clone_from(git_url, phen_path)
 398
 399        # Fetch all branches and tags
 400        repo.remotes.origin.fetch()
 401
 402        # Check if the version exists
 403        available_refs = [ref.name.split("/")[-1] for ref in repo.references]
 404        if upstream_version not in available_refs:
 405            raise ValueError(
 406                f"Version '{upstream_version}' not found in the repository: {upstream_url}."
 407            )
 408
 409        # Checkout the specified version
 410        repo.git.checkout(upstream_version)
 411        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
 412        main_branch.checkout()
 413
 414        # Check if 'config.yml' exists in the root directory
 415        config_path = phen_path / "config.yml"
 416        if not os.path.isfile(config_path):
 417            raise ValueError(
 418                f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory."
 419            )
 420
 421        # Validate the phenotype is compatible with the acmc tool
 422        validate(str(phen_path.resolve()))
 423
 424        # Delete each tag locally
 425        tags = repo.tags
 426        for tag in tags:
 427            repo.delete_tag(tag)
 428            _logger.debug(f"Deleted tags from forked repo: {tag}")
 429
 430        # Add upstream remote
 431        repo.create_remote("upstream", upstream_url)
 432        remote = repo.remotes["origin"]
 433        repo.delete_remote(remote)  # Remove existing origin
 434
 435        # Optionally set a new origin remote
 436        if new_origin_url:
 437            git_url = _construct_git_url(new_origin_url)
 438            repo.create_remote("origin", git_url)
 439            repo.git.push("--set-upstream", "origin", "main")
 440
 441        _logger.info(f"Repository forked successfully at {phen_path}")
 442        _logger.info(f"Upstream set to {upstream_url}")
 443        if new_origin_url:
 444            _logger.info(f"Origin set to {new_origin_url}")
 445
 446    except Exception as e:
 447        if phen_path.exists():
 448            shutil.rmtree(phen_path)
 449        raise ValueError(f"Error occurred during repository fork: {str(e)}")
 450
 451
 452def validate(phen_dir: str):
 453    """Validates the phenotype directory is a git repo with standard structure"""
 454    _logger.info(f"Validating phenotype: {phen_dir}")
 455    phen_path = Path(phen_dir)
 456    if not phen_path.is_dir():
 457        raise NotADirectoryError(
 458            f"Error: '{str(phen_path.resolve())}' is not a directory"
 459        )
 460
 461    config_path = phen_path / CONFIG_FILE
 462    if not config_path.is_file():
 463        raise FileNotFoundError(
 464            f"Error: phen configuration file '{config_path}' does not exist."
 465        )
 466
 467    concepts_path = phen_path / CONCEPTS_DIR
 468    if not concepts_path.is_dir():
 469        raise FileNotFoundError(
 470            f"Error: source concepts directory {concepts_path} does not exist."
 471        )
 472
 473    # Calidate the directory is a git repo
 474    try:
 475        git.Repo(phen_path)
 476    except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError):
 477        raise Exception(f"Phen directory {phen_path} is not a git repo")
 478
 479    # Load configuration File
 480    if config_path.suffix == ".yml":
 481        try:
 482            with config_path.open("r") as file:
 483                phenotype = yaml.safe_load(file)
 484
 485            validator = Validator(CONFIG_SCHEMA)
 486            if validator.validate(phenotype):
 487                _logger.debug("YAML structure is valid.")
 488            else:
 489                _logger.error(f"YAML structure validation failed: {validator.errors}")
 490                raise Exception(f"YAML structure validation failed: {validator.errors}")
 491        except yaml.YAMLError as e:
 492            _logger.error(f"YAML syntax error: {e}")
 493            raise e
 494    else:
 495        raise Exception(
 496            f"Unsupported configuration filetype: {str(config_path.resolve())}"
 497        )
 498
 499    # initiatise
 500    validation_errors = []
 501    phenotype = phenotype["phenotype"]
 502    code_types = parse.CodeTypeParser().code_types
 503
 504    # check the version number is of the format vn.n.n
 505    match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"])
 506    if not match:
 507        validation_errors.append(
 508            f"Invalid version format in configuration file: {phenotype['version']}"
 509        )
 510
 511    # create a list of all the concept set names defined in the concept set configuration
 512    concept_set_names = []
 513    for item in phenotype["concept_sets"]:
 514        if item["name"] in concept_set_names:
 515            validation_errors.append(
 516                f"Duplicate concept set defined in concept sets {item['name'] }"
 517            )
 518        else:
 519            concept_set_names.append(item["name"])
 520
 521    # check codes definition
 522    for item in phenotype["concept_sets"]:
 523        # check concepte code file exists
 524        concept_code_file_path = concepts_path / item["file"]["path"]
 525        if not concept_code_file_path.exists():
 526            validation_errors.append(
 527                f"Coding file {str(concept_code_file_path.resolve())} does not exist"
 528            )
 529
 530        # check concepte code file is not empty
 531        if concept_code_file_path.stat().st_size == 0:
 532            validation_errors.append(
 533                f"Coding file {str(concept_code_file_path.resolve())} is an empty file"
 534            )
 535
 536        # check code file type is supported
 537        if concept_code_file_path.suffix not in CODE_FILE_TYPES:
 538            raise ValueError(
 539                f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types"
 540            )
 541
 542        # check columns specified are a supported medical coding type
 543        for column in item["file"]["columns"]:
 544            if column not in code_types:
 545                validation_errors.append(
 546                    f"Column type {column} for file {concept_code_file_path} is not supported"
 547                )
 548
 549        # check the actions are supported
 550        if "actions" in item["file"]:
 551            for action in item["file"]["actions"]:
 552                if action not in COL_ACTIONS:
 553                    validation_errors.append(f"Action {action} is not supported")
 554
 555    if len(validation_errors) > 0:
 556        _logger.error(validation_errors)
 557        raise PhenValidationException(
 558            f"Configuration file {str(config_path.resolve())} failed validation",
 559            validation_errors,
 560        )
 561
 562    _logger.info(f"Phenotype validated successfully")
 563
 564
 565def _read_table_file(path: Path, excel_sheet: str = ""):
 566    """
 567    Load Code List File
 568    """
 569
 570    path = path.resolve()
 571    if path.suffix == ".csv":
 572        df = pd.read_csv(path, dtype=str)
 573    elif path.suffix == ".xlsx" or path.suffix == ".xls":
 574        if excel_sheet != "":
 575            df = pd.read_excel(path, sheet_name=excel_sheet, dtype=str)
 576        else:
 577            df = pd.read_excel(path, dtype=str)
 578    elif path.suffix == ".dta":
 579        df = pd.read_stata(path)
 580    else:
 581        raise ValueError(
 582            f"Unsupported filetype {path.suffix}, only support{CODE_FILE_TYPES} code file types"
 583        )
 584
 585    return df
 586
 587
 588def _process_actions(df: pd.DataFrame, concept_set: dict) -> pd.DataFrame:
 589    # Perform Structural Changes to file before preprocessing
 590    _logger.debug("Processing file structural actions")
 591    if (
 592        "actions" in concept_set["file"]
 593        and "split_col" in concept_set["file"]["actions"]
 594        and "codes_col" in concept_set["file"]["actions"]
 595    ):
 596        split_col = concept_set["file"]["actions"]["split_col"]
 597        codes_col = concept_set["file"]["actions"]["codes_col"]
 598        _logger.debug(
 599            "Action: Splitting",
 600            split_col,
 601            "column into:",
 602            df[split_col].unique(),
 603        )
 604        codes = df[codes_col]
 605        oh = pd.get_dummies(df[split_col], dtype=bool)  # one hot encode
 606        oh = oh.where((oh != True), codes, axis=0)  # fill in 1s with codes
 607        oh[oh == False] = np.nan  # replace 0s with None
 608        df = pd.concat([df, oh], axis=1)  # merge in new columns
 609
 610    return df
 611
 612
 613def _preprocess_source_concepts(
 614    df: pd.DataFrame, concept_set: dict, code_file_path: Path
 615) -> Tuple[pd.DataFrame, list]:
 616    """Perform QA Checks on columns individually and append to df"""
 617    out = pd.DataFrame([])  # create output df to append to
 618    code_errors = []  # list of errors from processing
 619
 620    # remove unnamed columns due to extra commas, missing headers, or incorrect parsing
 621    df = df.drop(columns=[col for col in df.columns if "Unnamed" in col])
 622
 623    # Preprocess codes
 624    code_types = parse.CodeTypeParser().code_types
 625    for code_type in concept_set["file"]["columns"]:
 626        parser = code_types[code_type]
 627        _logger.info(f"Processing {code_type} codes for {code_file_path}")
 628
 629        # get codes by column name
 630        source_col_name = concept_set["file"]["columns"][code_type]
 631        codes = df[source_col_name].dropna()
 632        codes = codes.astype(str)  # convert to string
 633        codes = codes.str.strip()  # remove excess spaces
 634
 635        # process codes, validating them using parser and returning the errors
 636        codes, errors = parser.process(codes, code_file_path)
 637        if len(errors) > 0:
 638            code_errors.extend(errors)
 639            _logger.warning(f"Codes validation failed with {len(errors)} errors")
 640
 641        # add processed codes to df
 642        new_col_name = f"{source_col_name}_SOURCE"
 643        df = df.rename(columns={source_col_name: new_col_name})
 644        process_codes = pd.DataFrame({code_type: codes}).join(df)
 645        out = pd.concat(
 646            [out, process_codes],
 647            ignore_index=True,
 648        )
 649
 650    _logger.debug(out.head())
 651
 652    return out, code_errors
 653
 654
 655# Translate Df with multiple codes into single code type Series
 656def translate_codes(
 657    source_df: pd.DataFrame, target_code_type: str, concept_name: str
 658) -> pd.DataFrame:
 659    """Translates each source code type the source coding list into a target type and returns all conversions as a concept set"""
 660
 661    # codes = pd.DataFrame([], dtype=str)
 662    codes = pd.DataFrame(
 663        columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string"
 664    )
 665    # Convert codes to target type
 666    _logger.info(f"Converting to target code type {target_code_type}")
 667
 668    for source_code_type in source_df.columns:
 669        # if target code type is the same as thet source code type, no translation, just appending source as target
 670        if source_code_type == target_code_type:
 671            copy_df = pd.DataFrame(
 672                {
 673                    "SOURCE_CONCEPT": source_df[source_code_type],
 674                    "SOURCE_CONCEPT_TYPE": source_code_type,
 675                    "CONCEPT": source_df[source_code_type],
 676                }
 677            )
 678            codes = pd.concat([codes, copy_df])
 679            _logger.debug(
 680                f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating"
 681            )
 682        else:
 683            # get the translation filename using source to target code types
 684            filename = f"{source_code_type}_to_{target_code_type}.parquet"
 685            map_path = trud.PROCESSED_PATH / filename
 686
 687            # do the mapping if it exists
 688            if map_path.exists():
 689                # get mapping
 690                df_map = pd.read_parquet(map_path)
 691
 692                # do mapping
 693                translated_df = pd.merge(
 694                    source_df[source_code_type], df_map, how="left"
 695                )
 696
 697                # normalise the output
 698                translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"])
 699                translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type
 700
 701                # add to list of codes
 702                codes = pd.concat([codes, translated_df])
 703
 704            else:
 705                _logger.warning(
 706                    f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist"
 707                )
 708
 709    codes = codes.dropna()  # delete NaNs
 710
 711    # added concept set type to output if any translations
 712    if len(codes.index) > 0:
 713        codes["CONCEPT_SET"] = concept_name
 714    else:
 715        _logger.debug(f"No codes converted with target code type {target_code_type}")
 716
 717    return codes
 718
 719
 720def _write_code_errors(code_errors: list, code_errors_path: Path):
 721    err_df = pd.DataFrame(
 722        [
 723            {
 724                "CONCEPT": ", ".join(err.codes[~err.mask].tolist()),
 725                "VOCABULARY": err.code_type,
 726                "SOURCE": err.codes_file,
 727                "CAUSE": err.message,
 728            }
 729            for err in code_errors
 730        ]
 731    )
 732
 733    err_df = err_df.drop_duplicates()  # Remove Duplicates from Error file
 734    err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"])
 735    err_df.to_csv(code_errors_path, index=False, mode="w")
 736
 737
 738def write_vocab_version(phen_path: Path):
 739    # write the vocab version files
 740
 741    if not trud.VERSION_PATH.exists():
 742        raise FileNotFoundError(
 743            f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed"
 744        )
 745
 746    if not omop.VERSION_PATH.exists():
 747        raise FileNotFoundError(
 748            f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed"
 749        )
 750
 751    with trud.VERSION_PATH.open("r") as file:
 752        trud_version = yaml.safe_load(file)
 753
 754    with omop.VERSION_PATH.open("r") as file:
 755        omop_version = yaml.safe_load(file)
 756
 757    # Create the combined YAML structure
 758    version_data = {
 759        "versions": {
 760            "acmc": acmc.__version__,
 761            "trud": trud_version,
 762            "omop": omop_version,
 763        }
 764    }
 765
 766    with open(phen_path / VOCAB_VERSION_FILE, "w") as file:
 767        yaml.dump(
 768            version_data,
 769            file,
 770            Dumper=util.QuotedDumper,
 771            default_flow_style=False,
 772            sort_keys=False,
 773            default_style='"',
 774        )
 775
 776
 777def map(phen_dir: str, target_code_type: str):
 778    _logger.info(f"Processing phenotype: {phen_dir}")
 779
 780    # Validate configuration
 781    validate(phen_dir)
 782
 783    # initialise paths
 784    phen_path = Path(phen_dir)
 785    config_path = phen_path / CONFIG_FILE
 786
 787    # load configuration
 788    with config_path.open("r") as file:
 789        config = yaml.safe_load(file)
 790    phenotype = config["phenotype"]
 791
 792    if len(phenotype["map"]) == 0:
 793        raise ValueError(f"No map codes defined in the phenotype configuration")
 794
 795    if target_code_type is not None and target_code_type not in phenotype["map"]:
 796        raise ValueError(
 797            f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}"
 798        )
 799
 800    if target_code_type is not None:
 801        _map_target_code_type(phen_path, phenotype, target_code_type)
 802    else:
 803        for t in phenotype["map"]:
 804            _map_target_code_type(phen_path, phenotype, t)
 805
 806    _logger.info(f"Phenotype processed successfully")
 807
 808
 809def _map_target_code_type(phen_path: Path, phenotype: dict, target_code_type: str):
 810    _logger.debug(f"Target coding format: {target_code_type}")
 811    concepts_path = phen_path / CONCEPTS_DIR
 812    # Create output dataframe
 813    out = pd.DataFrame([])
 814    code_errors = []
 815
 816    # Process each folder in codes section
 817    for concept_set in phenotype["concept_sets"]:
 818        _logger.debug(f"--- {concept_set['file']} ---")
 819
 820        # Load code file
 821        codes_file_path = Path(concepts_path / concept_set["file"]["path"])
 822        df = _read_table_file(codes_file_path)
 823
 824        # process structural actions
 825        df = _process_actions(df, concept_set)
 826
 827        # preprocessing and validate of source concepts
 828        _logger.debug("Processing and validating source concept codes")
 829        df, errors = _preprocess_source_concepts(
 830            df,
 831            concept_set,
 832            codes_file_path,
 833        )
 834
 835        # create df with just the source code columns
 836        source_column_names = list(concept_set["file"]["columns"].keys())
 837        source_df = df[source_column_names]
 838
 839        _logger.debug(source_df.columns)
 840        _logger.debug(source_df.head())
 841
 842        _logger.debug(
 843            f"Length of errors from _preprocess_source_concepts {len(errors)}"
 844        )
 845        if len(errors) > 0:
 846            code_errors.extend(errors)
 847        _logger.debug(f" Length of code_errors {len(code_errors)}")
 848
 849        # Map source concepts codes to target codes
 850        # if processing a source coding list with categorical data
 851        if (
 852            "actions" in concept_set["file"]
 853            and "divide_col" in concept_set["file"]["actions"]
 854            and len(df) > 0
 855        ):
 856            divide_col = concept_set["file"]["actions"]["divide_col"]
 857            _logger.debug(f"Action: Dividing Table by {divide_col}")
 858            _logger.debug(f"column into: {df[divide_col].unique()}")
 859            df_grp = df.groupby(divide_col)
 860            for cat, grp in df_grp:
 861                if cat == concept_set["file"]["category"]:
 862                    grp = grp.drop(columns=[divide_col])  # delete categorical column
 863                    source_df = grp[source_column_names]
 864                    trans_out = translate_codes(
 865                        source_df,
 866                        target_code_type=target_code_type,
 867                        concept_name=concept_set["name"],
 868                    )
 869                    out = pd.concat([out, trans_out])
 870        else:
 871            source_df = df[source_column_names]
 872            trans_out = translate_codes(
 873                source_df,
 874                target_code_type=target_code_type,
 875                concept_name=concept_set["name"],
 876            )
 877            out = pd.concat([out, trans_out])
 878
 879    if len(code_errors) > 0:
 880        _logger.error(f"The map processing has {len(code_errors)} errors")
 881        error_path = phen_path / MAP_DIR / "errors"
 882        error_path.mkdir(parents=True, exist_ok=True)
 883        error_filename = f"{target_code_type}-code-errors.csv"
 884        _write_code_errors(code_errors, error_path / error_filename)
 885
 886    # Check there is output from processing
 887    if len(out.index) == 0:
 888        _logger.error(f"No output after map processing")
 889        raise Exception(
 890            f"No output after map processing, check config {str(phen_path.resolve())}"
 891        )
 892
 893    # final processing
 894    out = out.reset_index(drop=True)
 895    out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
 896    out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"])
 897
 898    out_count = len(out.index)
 899    # added metadata
 900    # Loop over each source_concept_type and perform the left join on all columns apart from source code columns
 901    result_list = []
 902    source_column_names = list(concept_set["file"]["columns"].keys())
 903    for source_concept_type in source_column_names:
 904        # Filter output based on the current source_concept_type
 905        out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type]
 906        filtered_count = len(out_filtered_df.index)
 907
 908        # Remove the source type columns except the current type will leave the metadata and the join
 909        remove_types = [
 910            type for type in source_column_names if type != source_concept_type
 911        ]
 912        metadata_df = df.drop(columns=remove_types)
 913        metadata_df = metadata_df.rename(
 914            columns={source_concept_type: "SOURCE_CONCEPT"}
 915        )
 916        metadata_df_count = len(metadata_df.index)
 917
 918        # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata
 919        result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT")
 920        result_count = len(result.index)
 921
 922        _logger.debug(
 923            f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}"
 924        )
 925
 926        # Append the result to the result_list
 927        result_list.append(result)
 928
 929    # Concatenate all the results into a single DataFrame
 930    final_out = pd.concat(result_list, ignore_index=True)
 931    final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
 932    _logger.debug(
 933        f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}"
 934    )
 935
 936    # Save output to map directory
 937    output_filename = target_code_type + ".csv"
 938    map_path = phen_path / MAP_DIR / output_filename
 939    final_out.to_csv(map_path, index=False)
 940    _logger.info(f"Saved mapped concepts to {str(map_path.resolve())}")
 941
 942    # save concept sets as separate files
 943    concept_set_path = phen_path / CSV_PATH / target_code_type
 944
 945    # empty the concept-set directory except for hiddle files, e.g. .git
 946    if concept_set_path.exists():
 947        for item in concept_set_path.iterdir():
 948            if not item.name.startswith("."):
 949                item.unlink()
 950    else:
 951        concept_set_path.mkdir(parents=True, exist_ok=True)
 952
 953    # write each concept as a separate file
 954    for name, concept in final_out.groupby("CONCEPT_SET"):
 955        concept = concept.sort_values(by="CONCEPT")  # sort rows
 956        concept = concept.dropna(how="all", axis=1)  # remove empty cols
 957        concept = concept.reindex(
 958            sorted(concept.columns), axis=1
 959        )  # sort cols alphabetically
 960        filename = f"{name}.csv"
 961        concept_path = concept_set_path / filename
 962        concept.to_csv(concept_path, index=False)
 963
 964    write_vocab_version(phen_path)
 965
 966    _logger.info(f"Phenotype processed target code type {target_code_type}")
 967
 968
 969def _generate_version_tag(
 970    repo: git.Repo, increment: str = DEFAULT_VERSION_INC, use_v_prefix: bool = False
 971) -> str:
 972    # Get all valid semantic version tags
 973    versions = []
 974    for tag in repo.tags:
 975        if tag.name.startswith("v"):
 976            tag_name = tag.name[1:]  # Remove the first character
 977        else:
 978            tag_name = tag.name
 979        if semver.Version.is_valid(tag_name):
 980            versions.append(semver.Version.parse(tag_name))
 981
 982    _logger.debug(f"Versions: {versions}")
 983    # Determine the next version
 984    if not versions:
 985        new_version = semver.Version(0, 0, 1)
 986    else:
 987        latest_version = max(versions)
 988        if increment == "major":
 989            new_version = latest_version.bump_major()
 990        elif increment == "minor":
 991            new_version = latest_version.bump_minor()
 992        else:
 993            new_version = latest_version.bump_patch()
 994
 995    # Create the new tag
 996    new_version_str = f"v{new_version}" if use_v_prefix else str(new_version)
 997
 998    return new_version_str
 999
1000
1001def publish(
1002    phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC
1003):
1004    """Publishes updates to the phenotype by commiting all changes to the repo directory"""
1005
1006    # Validate config
1007    validate(phen_dir)
1008    phen_path = Path(phen_dir)
1009
1010    # load git repo and set the branch
1011    repo = git.Repo(phen_path)
1012    if DEFAULT_GIT_BRANCH in repo.branches:
1013        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
1014        main_branch.checkout()
1015    else:
1016        raise AttributeError(
1017            f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}"
1018        )
1019
1020    # check if any changes to publish
1021    if not repo.is_dirty() and not repo.untracked_files:
1022        if remote_url is not None and "origin" not in repo.remotes:
1023            _logger.info(f"First publish to remote url {remote_url}")
1024        else:
1025            _logger.info("Nothing to publish, no changes to the repo")
1026            return
1027
1028    # get next version
1029    new_version_str = _generate_version_tag(repo, increment)
1030    _logger.info(f"New version: {new_version_str}")
1031
1032    # Write version in configuration file
1033    config_path = phen_path / CONFIG_FILE
1034    with config_path.open("r") as file:
1035        config = yaml.safe_load(file)
1036
1037    config["phenotype"]["version"] = new_version_str
1038    with open(config_path, "w") as file:
1039        yaml.dump(
1040            config,
1041            file,
1042            Dumper=util.QuotedDumper,
1043            default_flow_style=False,
1044            sort_keys=False,
1045            default_style='"',
1046        )
1047
1048    # Add and commit changes to repo including version updates
1049    commit_message = f"Committing updates to phenotype {phen_path}"
1050    repo.git.add("--all")
1051    repo.index.commit(commit_message)
1052
1053    # Add tag to the repo
1054    repo.create_tag(new_version_str)
1055
1056    # push to origin if a remote repo
1057    if remote_url is not None and "origin" not in repo.remotes:
1058        git_url = _construct_git_url(remote_url)
1059        repo.create_remote("origin", git_url)
1060
1061    try:
1062        if "origin" in repo.remotes:
1063            _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}")
1064            origin = repo.remotes.origin
1065            _logger.info(f"Pushing main branch to remote repo")
1066            repo.git.push("--set-upstream", "origin", "main")
1067            _logger.info(f"Pushing version tags to remote git repo")
1068            origin.push(tags=True)
1069            _logger.debug("Changes pushed to 'origin'")
1070        else:
1071            _logger.debug("Remote 'origin' is not set")
1072    except Exception as e:
1073        tag_ref = repo.tags[new_version_str]
1074        repo.delete_tag(tag_ref)
1075        repo.git.reset("--soft", "HEAD~1")
1076        raise e
1077
1078    _logger.info(f"Phenotype published successfully")
1079
1080
1081def export(phen_dir: str, version: str):
1082    """Exports a phen repo at a specific tagged version into a target directory"""
1083    _logger.info(f"Exporting phenotype {phen_dir} at version {version}")
1084
1085    # validate configuration
1086    validate(phen_dir)
1087    phen_path = Path(phen_dir)
1088
1089    # load configuration
1090    config_path = phen_path / CONFIG_FILE
1091    with config_path.open("r") as file:
1092        config = yaml.safe_load(file)
1093
1094    map_path = phen_path / MAP_DIR
1095    if not map_path.exists():
1096        _logger.warning(f"Map path does not exist '{map_path}'")
1097
1098    export_path = phen_path / OMOP_PATH
1099    # check export directory exists and if not create it
1100    if not export_path.exists():
1101        export_path.mkdir(parents=True)
1102        _logger.debug(f"OMOP export directory '{export_path}' created.")
1103
1104    # omop export db
1105    export_db_path = omop.export(
1106        map_path,
1107        export_path,
1108        config["phenotype"]["version"],
1109        config["phenotype"]["omop"],
1110    )
1111
1112    # write to tables
1113    # export as csv
1114    _logger.info(f"Phenotype exported successfully")
1115
1116
1117def copy(phen_dir: str, target_dir: str, version: str):
1118    """Copys a phen repo at a specific tagged version into a target directory"""
1119
1120    # Validate
1121    validate(phen_dir)
1122    phen_path = Path(phen_dir)
1123
1124    # Check target directory exists
1125    target_path = Path(target_dir)
1126    if not target_path.exists():
1127        raise FileNotFoundError(f"The target directory {target_path} does not exist")
1128
1129    # Set copy directory
1130    copy_path = target_path / version
1131    _logger.info(f"Copying repo {phen_path} to {copy_path}")
1132
1133    if (
1134        copy_path.exists() and copy_path.is_dir()
1135    ):  # Check if it exists and is a directory
1136        copy = _check_delete_dir(
1137            copy_path,
1138            f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ",
1139        )
1140    else:
1141        copy = True
1142
1143    if not copy:
1144        _logger.info(f"Not copying the version {version}")
1145        return
1146
1147    _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...")
1148    repo = git.Repo.clone_from(phen_path, copy_path)
1149
1150    # Check out the latest commit or specified version
1151    if version:
1152        # Checkout a specific version (e.g., branch, tag, or commit hash)
1153        _logger.info(f"Checking out version {version}...")
1154        repo.git.checkout(version)
1155    else:
1156        # Checkout the latest commit (HEAD)
1157        _logger.info(f"Checking out the latest commit...")
1158        repo.git.checkout("HEAD")
1159
1160    _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}")
1161
1162    _logger.info(f"Phenotype copied successfully")
1163
1164
1165# Convert concept_sets list into dictionaries
1166def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1167    """Extracts concepts as {name: file_path} dictionary and a name set."""
1168    concepts_dict = {
1169        item["name"]: item["file"]["path"]
1170        for item in config_data["phenotype"]["concept_sets"]
1171    }
1172    name_set = set(concepts_dict.keys())
1173    return concepts_dict, name_set
1174
1175
1176def _extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]:
1177    """
1178    Extracts clean keys from a DeepDiff dictionary.
1179
1180    :param diff: DeepDiff result dictionary
1181    :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed")
1182    :return: A set of clean key names
1183    """
1184    return {key.split("root['")[1].split("']")[0] for key in diff.get(key_type, [])}
1185
1186
1187def diff_config(old_config: dict, new_config: dict) -> str:
1188    report = f"\n# Changes to phenotype configuration\n"
1189    report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n"
1190
1191    old_concepts, old_names = extract_concepts(old_config)
1192    new_concepts, new_names = extract_concepts(new_config)
1193
1194    # Check added and removed names
1195    added_names = new_names - old_names  # Names that appear in new but not in old
1196    removed_names = old_names - new_names  # Names that were in old but not in new
1197
1198    # find file path changes for unchanged names
1199    unchanged_names = old_names & new_names  # Names that exist in both
1200    file_diff = DeepDiff(
1201        {name: old_concepts[name] for name in unchanged_names},
1202        {name: new_concepts[name] for name in unchanged_names},
1203    )
1204
1205    # Find renamed concepts (same file, different name)
1206    renamed_concepts = []
1207    for removed in removed_names:
1208        old_path = old_concepts[removed]
1209        for added in added_names:
1210            new_path = new_concepts[added]
1211            if old_path == new_path:
1212                renamed_concepts.append((removed, added))
1213
1214    # Remove renamed concepts from added and removed sets
1215    for old_name, new_name in renamed_concepts:
1216        added_names.discard(new_name)
1217        removed_names.discard(old_name)
1218
1219    # generate config report
1220    if added_names:
1221        report += "## Added Concepts\n"
1222        for name in added_names:
1223            report += f"- `{name}` (File: `{new_concepts[name]}`)\n"
1224        report += "\n"
1225
1226    if removed_names:
1227        report += "## Removed Concepts\n"
1228        for name in removed_names:
1229            report += f"- `{name}` (File: `{old_concepts[name]}`)\n"
1230        report += "\n"
1231
1232    if renamed_concepts:
1233        report += "## Renamed Concepts\n"
1234        for old_name, new_name in renamed_concepts:
1235            report += (
1236                f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n"
1237            )
1238        report += "\n"
1239
1240    if "values_changed" in file_diff:
1241        report += "## Updated File Paths\n"
1242        for name, change in file_diff["values_changed"].items():
1243            old_file = change["old_value"]
1244            new_file = change["new_value"]
1245            clean_name = name.split("root['")[1].split("']")[0]
1246            report += (
1247                f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n"
1248            )
1249        report += "\n"
1250
1251    if not (
1252        added_names
1253        or removed_names
1254        or renamed_concepts
1255        or file_diff.get("values_changed")
1256    ):
1257        report += "No changes in concept sets.\n"
1258
1259    return report
1260
1261
1262def diff_map_files(old_map_path: Path, new_map_path: Path) -> str:
1263    old_output_files = [
1264        file.name
1265        for file in old_map_path.iterdir()
1266        if file.is_file() and not file.name.startswith(".")
1267    ]
1268    new_output_files = [
1269        file.name
1270        for file in new_map_path.iterdir()
1271        if file.is_file() and not file.name.startswith(".")
1272    ]
1273
1274    # Convert the lists to sets for easy comparison
1275    old_output_set = set(old_output_files)
1276    new_output_set = set(new_output_files)
1277
1278    # Outputs that are in old_output_set but not in new_output_set (removed files)
1279    removed_outputs = old_output_set - new_output_set
1280    # Outputs that are in new_output_set but not in old_output_set (added files)
1281    added_outputs = new_output_set - old_output_set
1282    # Outputs that are the intersection of old_output_set and new_output_set
1283    common_outputs = old_output_set & new_output_set
1284
1285    report = f"\n# Changes to available translations\n"
1286    report += f"This compares the coding translations files available.\n\n"
1287    report += f"- Removed outputs: {sorted(list(removed_outputs))}\n"
1288    report += f"- Added outputs: {sorted(list(added_outputs))}\n"
1289    report += f"- Common outputs: {sorted(list(common_outputs))}\n\n"
1290
1291    # Step N: Compare common outputs between versions
1292    report += f"# Changes to concepts in translation files\n\n"
1293    report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n"
1294    for file in common_outputs:
1295        old_output = old_map_path / file
1296        new_output = new_map_path / file
1297
1298        _logger.debug(f"Old ouptput: {str(old_output.resolve())}")
1299        _logger.debug(f"New ouptput: {str(new_output.resolve())}")
1300
1301        df1 = pd.read_csv(old_output)
1302        df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1303        df2 = pd.read_csv(new_output)
1304        df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1305
1306        # Check for added and removed concepts
1307        report += f"- File {file}\n"
1308        sorted_list = sorted(list(set(df1.index) - set(df2.index)))
1309        report += f"- Removed concepts {sorted_list}\n"
1310        sorted_list = sorted(list(set(df2.index) - set(df1.index)))
1311        report += f"- Added concepts {sorted_list}\n"
1312
1313        # Check for changed concepts
1314        diff = df2 - df1  # diff in counts
1315        diff = diff[
1316            (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna()
1317        ]  # get non-zero counts
1318        s = "\n"
1319        if len(diff.index) > 0:
1320            for concept, row in diff.iterrows():
1321                s += "\t - {} {}\n".format(concept, row["CONCEPT"])
1322            report += f"- Changed concepts {s}\n\n"
1323        else:
1324            report += f"- Changed concepts []\n\n"
1325
1326    return report
1327
1328
1329def diff_phen(
1330    new_phen_path: Path,
1331    new_version: str,
1332    old_phen_path: Path,
1333    old_version: str,
1334    report_path: Path,
1335):
1336    """Compare the differences between two versions of a phenotype"""
1337
1338    # validate phenotypes
1339    _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}")
1340    validate(str(old_phen_path.resolve()))
1341    _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}")
1342    validate(str(new_phen_path.resolve()))
1343
1344    # get old and new config
1345    old_config_path = old_phen_path / CONFIG_FILE
1346    with old_config_path.open("r") as file:
1347        old_config = yaml.safe_load(file)
1348    new_config_path = new_phen_path / CONFIG_FILE
1349    with new_config_path.open("r") as file:
1350        new_config = yaml.safe_load(file)
1351
1352    # write report heading
1353    report = f"# Phenotype Comparison Report\n"
1354    report += f"## Original phenotype\n"
1355    report += f"  - {old_config['phenotype']['omop']['vocabulary_id']}\n"
1356    report += f"  - {old_version}\n"
1357    report += f"  - {str(old_phen_path.resolve())}\n"
1358    report += f"## Changed phenotype:\n"
1359    report += f"  - {new_config['phenotype']['omop']['vocabulary_id']}\n"
1360    report += f"  - {new_version}\n"
1361    report += f"  - {str(new_phen_path.resolve())}\n"
1362
1363    # Step 1: check differences configuration files
1364    # Convert list of dicts into a dict: {name: file}
1365    report += diff_config(old_config, new_config)
1366
1367    # Step 2: check differences between map files
1368    # List files from output directories
1369    old_map_path = old_phen_path / MAP_DIR
1370    new_map_path = new_phen_path / MAP_DIR
1371    report += diff_map_files(old_map_path, new_map_path)
1372
1373    # initialise report file
1374    _logger.debug(f"Writing to report file {str(report_path.resolve())}")
1375    report_file = open(report_path, "w")
1376    report_file.write(report)
1377    report_file.close()
1378
1379    _logger.info(f"Phenotypes diff'd successfully")
1380
1381
1382def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str):
1383    # make tmp directory .acmc
1384    timestamp = time.strftime("%Y%m%d_%H%M%S")
1385    temp_dir = Path(f".acmc/diff_{timestamp}")
1386
1387    changed_phen_path = Path(phen_dir)
1388    if not changed_phen_path.exists():
1389        raise ValueError(
1390            f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}"
1391        )
1392
1393    old_phen_path = Path(old_phen_dir)
1394    if not old_phen_path.exists():
1395        raise ValueError(
1396            f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}"
1397        )
1398
1399    try:
1400        # Create the directory
1401        temp_dir.mkdir(parents=True, exist_ok=True)
1402        _logger.debug(f"Temporary directory created: {temp_dir}")
1403
1404        # Create temporary directories
1405        changed_path = temp_dir / "changed"
1406        changed_path.mkdir(parents=True, exist_ok=True)
1407        old_path = temp_dir / "old"
1408        old_path.mkdir(parents=True, exist_ok=True)
1409
1410        # checkout changed
1411        if version == "latest":
1412            _logger.debug(
1413                f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..."
1414            )
1415            shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True)
1416        else:
1417            _logger.debug(
1418                f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..."
1419            )
1420            changed_repo = git.Repo.clone_from(changed_phen_path, changed_path)
1421            changed_repo.git.checkout(version)
1422
1423        # checkout old
1424        if old_version == "latest":
1425            _logger.debug(
1426                f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1427            )
1428            shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True)
1429        else:
1430            _logger.debug(
1431                f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1432            )
1433            old_repo = git.Repo.clone_from(old_phen_dir, old_path)
1434            old_repo.git.checkout(old_version)
1435
1436        report_filename = f"{version}_{old_version}_diff.md"
1437        report_path = changed_phen_path / report_filename
1438        # diff old with new
1439        diff_phen(changed_path, version, old_path, old_version, report_path)
1440
1441    finally:
1442        # clean up tmp directory
1443        if temp_dir.exists():
1444            shutil.rmtree(temp_dir)
1445            print(f"Temporary directory removed: {temp_dir}")
PHEN_DIR = 'phen'

Default phenotype directory name

DEFAULT_PHEN_PATH = PosixPath('workspace/phen')

Default phenotype directory path

CONCEPTS_DIR = 'concepts'

Default concepts directory name

MAP_DIR = 'map'

Default map directory name

CONCEPT_SET_DIR = 'concept-sets'

Default concept set directory name

CSV_PATH = PosixPath('concept-sets/csv')

Default CSV concept set directory path

OMOP_PATH = PosixPath('concept-sets/omop')

Default OMOP concept set directory path

DEFAULT_PHEN_DIR_LIST = ['concepts', 'map', 'concept-sets']

List of default phenotype directories

CONFIG_FILE = 'config.yml'

Default configuration filename

VOCAB_VERSION_FILE = 'vocab_version.yml'

Default vocabulary version filename

SEMANTIC_VERSION_TYPES = ['major', 'minor', 'patch']

List of semantic version increment types

DEFAULT_VERSION_INC = 'patch'

Default semantic version increment type

DEFAULT_GIT_BRANCH = 'main'

Default phenotype repo branch name

SPLIT_COL_ACTION = 'split_col'

Split column preprocessing action type

CODES_COL_ACTION = 'codes_col'

Codes column preprocessing action type

DIVIDE_COL_ACTION = 'divide_col'

Divide column preprocessing action type

COL_ACTIONS = ['split_col', 'codes_col', 'divide_col']

List of column preprocessing action types

CODE_FILE_TYPES = ['.xlsx', '.xls', '.csv']

List of supported source concept coding list file types

CONFIG_SCHEMA = {'phenotype': {'type': 'dict', 'required': True, 'schema': {'version': {'type': 'string', 'required': True, 'regex': '^\\d+\\.\\d+\\.\\d+$'}, 'omop': {'type': 'dict', 'required': True, 'schema': {'vocabulary_id': {'type': 'string', 'required': True}, 'vocabulary_name': {'type': 'string', 'required': True}, 'vocabulary_reference': {'type': 'string', 'required': True, 'regex': '^https?://.*'}}}, 'map': {'type': 'list', 'schema': {'type': 'string', 'allowed': ['icd10', 'read2', 'snomed', 'atc', 'opcs4', 'read3']}}, 'concept_sets': {'type': 'list', 'required': True, 'schema': {'type': 'dict', 'schema': {'name': {'type': 'string', 'required': True}, 'file': {'type': 'dict', 'required': False, 'schema': {'path': {'type': 'string', 'required': True}, 'columns': {'type': 'dict', 'required': True}, 'category': {'type': 'string'}, 'actions': {'type': 'dict', 'schema': {'divide_col': {'type': 'string'}}}}}, 'metadata': {'type': 'dict', 'required': True}}}}}}}

Phenotype config.yml schema definition

class PhenValidationException(builtins.Exception):
156class PhenValidationException(Exception):
157    """Custom exception class raised when validation errors in phenotype configuration file"""
158
159    def __init__(self, message, validation_errors=None):
160        super().__init__(message)
161        self.validation_errors = validation_errors

Custom exception class raised when validation errors in phenotype configuration file

PhenValidationException(message, validation_errors=None)
159    def __init__(self, message, validation_errors=None):
160        super().__init__(message)
161        self.validation_errors = validation_errors
validation_errors
def init(phen_dir: str, remote_url: str):
230def init(phen_dir: str, remote_url: str):
231    """Initial phenotype directory as git repo with standard structure"""
232    _logger.info(f"Initialising Phenotype in directory: {phen_dir}")
233    phen_path = Path(phen_dir)
234
235    # check if directory already exists and ask user if they want to recreate it
236    if (
237        phen_path.exists() and phen_path.is_dir()
238    ):  # Check if it exists and is a directory
239        configure = _check_delete_dir(
240            phen_path,
241            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
242        )
243    else:
244        configure = True
245
246    if not configure:
247        _logger.info(f"Exiting, phenotype not initiatised")
248        return
249
250    # Initialise repo from local or remote
251    repo: Repo
252
253    # if remote then clone the repo otherwise init a local repo
254    if remote_url != None:
255        # add PAT token to the URL
256        git_url = _construct_git_url(remote_url)
257
258        # clone the repo
259        git_cmd = git.cmd.Git()
260        git_cmd.clone(git_url, phen_path)
261
262        # open repo
263        repo = Repo(phen_path)
264        # check if there are any commits (new repo has no commits)
265        if (
266            len(repo.branches) == 0 or repo.head.is_detached
267        ):  # Handle detached HEAD (e.g., after init)
268            _logger.debug("The phen repository has no commits yet.")
269            commit_count = 0
270        else:
271            # Get the total number of commits in the default branch
272            commit_count = sum(1 for _ in repo.iter_commits())
273            _logger.debug(f"Repo has previous commits: {commit_count}")
274    else:
275        # local repo, create the directories and init
276        phen_path.mkdir(parents=True, exist_ok=True)
277        _logger.debug(f"Phen directory '{phen_path}' has been created.")
278        repo = git.Repo.init(phen_path)
279        commit_count = 0
280
281    phen_path = phen_path.resolve()
282    # initialise empty repos
283    if commit_count == 0:
284        # create initial commit
285        initial_file_path = phen_path / "README.md"
286        with open(initial_file_path, "w") as file:
287            file.write(
288                "# Initial commit\nThis is the first commit in the phen repository.\n"
289            )
290        repo.index.add([initial_file_path])
291        repo.index.commit("Initial commit")
292        commit_count = 1
293
294    # Checkout the phens default branch, creating it if it does not exist
295    if DEFAULT_GIT_BRANCH in repo.branches:
296        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
297        main_branch.checkout()
298    else:
299        main_branch = repo.create_head(DEFAULT_GIT_BRANCH)
300        main_branch.checkout()
301
302    # if the phen path does not contain the config file then initialise the phen type
303    config_path = phen_path / CONFIG_FILE
304    if config_path.exists():
305        _logger.debug(f"Phenotype configuration files already exist")
306        return
307
308    _logger.info("Creating phen directory structure and config files")
309    for d in DEFAULT_PHEN_DIR_LIST:
310        _create_empty_git_dir(phen_path / d)
311
312    # create empty phen config file
313    config = {
314        "phenotype": {
315            "version": "0.0.0",
316            "omop": {
317                "vocabulary_id": "",
318                "vocabulary_name": "",
319                "vocabulary_reference": "",
320            },
321            "translate": [],
322            "concept_sets": [],
323        }
324    }
325
326    with open(phen_path / CONFIG_FILE, "w") as file:
327        yaml.dump(
328            config,
329            file,
330            Dumper=util.QuotedDumper,
331            default_flow_style=False,
332            sort_keys=False,
333            default_style='"',
334        )
335
336    # add git ignore
337    ignore_content = """# Ignore SQLite database files
338*.db
339*.sqlite3
340 
341# Ignore SQLite journal and metadata files
342*.db-journal
343*.sqlite3-journal
344
345# python
346.ipynb_checkpoints
347 """
348    ignore_path = phen_path / ".gitignore"
349    with open(ignore_path, "w") as file:
350        file.write(ignore_content)
351
352    # add to git repo and commit
353    for d in DEFAULT_PHEN_DIR_LIST:
354        repo.git.add(phen_path / d)
355    repo.git.add(all=True)
356    repo.index.commit("initialised the phen git repo.")
357
358    _logger.info(f"Phenotype initialised successfully")

Initial phenotype directory as git repo with standard structure

def fork( phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
361def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
362    """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
363
364    Args:
365        phen_dir (str): local directory path where the upstream repo is to be cloned
366        upstream_url (str): url to the upstream repo
367        upstream_version (str): version in the upstream repo to clone
368        new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
369
370    Raises:
371        ValueError: if the specified version is not in the upstream repo
372        ValueError: if the upstream repo is not a valid phenotype repo
373        ValueError: if there's any other problems with Git
374    """
375    _logger.info(
376        f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}"
377    )
378
379    phen_path = Path(phen_dir)
380    # check if directory already exists and ask user if they want to recreate it
381    if (
382        phen_path.exists() and phen_path.is_dir()
383    ):  # Check if it exists and is a directory
384        configure = _check_delete_dir(
385            phen_path,
386            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
387        )
388    else:
389        configure = True
390
391    if not configure:
392        _logger.info(f"Exiting, phenotype not initiatised")
393        return
394
395    try:
396        # Clone repo
397        git_url = _construct_git_url(upstream_url)
398        repo = git.Repo.clone_from(git_url, phen_path)
399
400        # Fetch all branches and tags
401        repo.remotes.origin.fetch()
402
403        # Check if the version exists
404        available_refs = [ref.name.split("/")[-1] for ref in repo.references]
405        if upstream_version not in available_refs:
406            raise ValueError(
407                f"Version '{upstream_version}' not found in the repository: {upstream_url}."
408            )
409
410        # Checkout the specified version
411        repo.git.checkout(upstream_version)
412        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
413        main_branch.checkout()
414
415        # Check if 'config.yml' exists in the root directory
416        config_path = phen_path / "config.yml"
417        if not os.path.isfile(config_path):
418            raise ValueError(
419                f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory."
420            )
421
422        # Validate the phenotype is compatible with the acmc tool
423        validate(str(phen_path.resolve()))
424
425        # Delete each tag locally
426        tags = repo.tags
427        for tag in tags:
428            repo.delete_tag(tag)
429            _logger.debug(f"Deleted tags from forked repo: {tag}")
430
431        # Add upstream remote
432        repo.create_remote("upstream", upstream_url)
433        remote = repo.remotes["origin"]
434        repo.delete_remote(remote)  # Remove existing origin
435
436        # Optionally set a new origin remote
437        if new_origin_url:
438            git_url = _construct_git_url(new_origin_url)
439            repo.create_remote("origin", git_url)
440            repo.git.push("--set-upstream", "origin", "main")
441
442        _logger.info(f"Repository forked successfully at {phen_path}")
443        _logger.info(f"Upstream set to {upstream_url}")
444        if new_origin_url:
445            _logger.info(f"Origin set to {new_origin_url}")
446
447    except Exception as e:
448        if phen_path.exists():
449            shutil.rmtree(phen_path)
450        raise ValueError(f"Error occurred during repository fork: {str(e)}")

Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"

Arguments:
  • phen_dir (str): local directory path where the upstream repo is to be cloned
  • upstream_url (str): url to the upstream repo
  • upstream_version (str): version in the upstream repo to clone
  • new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
Raises:
  • ValueError: if the specified version is not in the upstream repo
  • ValueError: if the upstream repo is not a valid phenotype repo
  • ValueError: if there's any other problems with Git
def validate(phen_dir: str):
453def validate(phen_dir: str):
454    """Validates the phenotype directory is a git repo with standard structure"""
455    _logger.info(f"Validating phenotype: {phen_dir}")
456    phen_path = Path(phen_dir)
457    if not phen_path.is_dir():
458        raise NotADirectoryError(
459            f"Error: '{str(phen_path.resolve())}' is not a directory"
460        )
461
462    config_path = phen_path / CONFIG_FILE
463    if not config_path.is_file():
464        raise FileNotFoundError(
465            f"Error: phen configuration file '{config_path}' does not exist."
466        )
467
468    concepts_path = phen_path / CONCEPTS_DIR
469    if not concepts_path.is_dir():
470        raise FileNotFoundError(
471            f"Error: source concepts directory {concepts_path} does not exist."
472        )
473
474    # Calidate the directory is a git repo
475    try:
476        git.Repo(phen_path)
477    except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError):
478        raise Exception(f"Phen directory {phen_path} is not a git repo")
479
480    # Load configuration File
481    if config_path.suffix == ".yml":
482        try:
483            with config_path.open("r") as file:
484                phenotype = yaml.safe_load(file)
485
486            validator = Validator(CONFIG_SCHEMA)
487            if validator.validate(phenotype):
488                _logger.debug("YAML structure is valid.")
489            else:
490                _logger.error(f"YAML structure validation failed: {validator.errors}")
491                raise Exception(f"YAML structure validation failed: {validator.errors}")
492        except yaml.YAMLError as e:
493            _logger.error(f"YAML syntax error: {e}")
494            raise e
495    else:
496        raise Exception(
497            f"Unsupported configuration filetype: {str(config_path.resolve())}"
498        )
499
500    # initiatise
501    validation_errors = []
502    phenotype = phenotype["phenotype"]
503    code_types = parse.CodeTypeParser().code_types
504
505    # check the version number is of the format vn.n.n
506    match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"])
507    if not match:
508        validation_errors.append(
509            f"Invalid version format in configuration file: {phenotype['version']}"
510        )
511
512    # create a list of all the concept set names defined in the concept set configuration
513    concept_set_names = []
514    for item in phenotype["concept_sets"]:
515        if item["name"] in concept_set_names:
516            validation_errors.append(
517                f"Duplicate concept set defined in concept sets {item['name'] }"
518            )
519        else:
520            concept_set_names.append(item["name"])
521
522    # check codes definition
523    for item in phenotype["concept_sets"]:
524        # check concepte code file exists
525        concept_code_file_path = concepts_path / item["file"]["path"]
526        if not concept_code_file_path.exists():
527            validation_errors.append(
528                f"Coding file {str(concept_code_file_path.resolve())} does not exist"
529            )
530
531        # check concepte code file is not empty
532        if concept_code_file_path.stat().st_size == 0:
533            validation_errors.append(
534                f"Coding file {str(concept_code_file_path.resolve())} is an empty file"
535            )
536
537        # check code file type is supported
538        if concept_code_file_path.suffix not in CODE_FILE_TYPES:
539            raise ValueError(
540                f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types"
541            )
542
543        # check columns specified are a supported medical coding type
544        for column in item["file"]["columns"]:
545            if column not in code_types:
546                validation_errors.append(
547                    f"Column type {column} for file {concept_code_file_path} is not supported"
548                )
549
550        # check the actions are supported
551        if "actions" in item["file"]:
552            for action in item["file"]["actions"]:
553                if action not in COL_ACTIONS:
554                    validation_errors.append(f"Action {action} is not supported")
555
556    if len(validation_errors) > 0:
557        _logger.error(validation_errors)
558        raise PhenValidationException(
559            f"Configuration file {str(config_path.resolve())} failed validation",
560            validation_errors,
561        )
562
563    _logger.info(f"Phenotype validated successfully")

Validates the phenotype directory is a git repo with standard structure

def translate_codes( source_df: pandas.core.frame.DataFrame, target_code_type: str, concept_name: str) -> pandas.core.frame.DataFrame:
657def translate_codes(
658    source_df: pd.DataFrame, target_code_type: str, concept_name: str
659) -> pd.DataFrame:
660    """Translates each source code type the source coding list into a target type and returns all conversions as a concept set"""
661
662    # codes = pd.DataFrame([], dtype=str)
663    codes = pd.DataFrame(
664        columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string"
665    )
666    # Convert codes to target type
667    _logger.info(f"Converting to target code type {target_code_type}")
668
669    for source_code_type in source_df.columns:
670        # if target code type is the same as thet source code type, no translation, just appending source as target
671        if source_code_type == target_code_type:
672            copy_df = pd.DataFrame(
673                {
674                    "SOURCE_CONCEPT": source_df[source_code_type],
675                    "SOURCE_CONCEPT_TYPE": source_code_type,
676                    "CONCEPT": source_df[source_code_type],
677                }
678            )
679            codes = pd.concat([codes, copy_df])
680            _logger.debug(
681                f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating"
682            )
683        else:
684            # get the translation filename using source to target code types
685            filename = f"{source_code_type}_to_{target_code_type}.parquet"
686            map_path = trud.PROCESSED_PATH / filename
687
688            # do the mapping if it exists
689            if map_path.exists():
690                # get mapping
691                df_map = pd.read_parquet(map_path)
692
693                # do mapping
694                translated_df = pd.merge(
695                    source_df[source_code_type], df_map, how="left"
696                )
697
698                # normalise the output
699                translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"])
700                translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type
701
702                # add to list of codes
703                codes = pd.concat([codes, translated_df])
704
705            else:
706                _logger.warning(
707                    f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist"
708                )
709
710    codes = codes.dropna()  # delete NaNs
711
712    # added concept set type to output if any translations
713    if len(codes.index) > 0:
714        codes["CONCEPT_SET"] = concept_name
715    else:
716        _logger.debug(f"No codes converted with target code type {target_code_type}")
717
718    return codes

Translates each source code type the source coding list into a target type and returns all conversions as a concept set

def write_vocab_version(phen_path: pathlib.Path):
739def write_vocab_version(phen_path: Path):
740    # write the vocab version files
741
742    if not trud.VERSION_PATH.exists():
743        raise FileNotFoundError(
744            f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed"
745        )
746
747    if not omop.VERSION_PATH.exists():
748        raise FileNotFoundError(
749            f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed"
750        )
751
752    with trud.VERSION_PATH.open("r") as file:
753        trud_version = yaml.safe_load(file)
754
755    with omop.VERSION_PATH.open("r") as file:
756        omop_version = yaml.safe_load(file)
757
758    # Create the combined YAML structure
759    version_data = {
760        "versions": {
761            "acmc": acmc.__version__,
762            "trud": trud_version,
763            "omop": omop_version,
764        }
765    }
766
767    with open(phen_path / VOCAB_VERSION_FILE, "w") as file:
768        yaml.dump(
769            version_data,
770            file,
771            Dumper=util.QuotedDumper,
772            default_flow_style=False,
773            sort_keys=False,
774            default_style='"',
775        )
def map(phen_dir: str, target_code_type: str):
778def map(phen_dir: str, target_code_type: str):
779    _logger.info(f"Processing phenotype: {phen_dir}")
780
781    # Validate configuration
782    validate(phen_dir)
783
784    # initialise paths
785    phen_path = Path(phen_dir)
786    config_path = phen_path / CONFIG_FILE
787
788    # load configuration
789    with config_path.open("r") as file:
790        config = yaml.safe_load(file)
791    phenotype = config["phenotype"]
792
793    if len(phenotype["map"]) == 0:
794        raise ValueError(f"No map codes defined in the phenotype configuration")
795
796    if target_code_type is not None and target_code_type not in phenotype["map"]:
797        raise ValueError(
798            f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}"
799        )
800
801    if target_code_type is not None:
802        _map_target_code_type(phen_path, phenotype, target_code_type)
803    else:
804        for t in phenotype["map"]:
805            _map_target_code_type(phen_path, phenotype, t)
806
807    _logger.info(f"Phenotype processed successfully")
def publish(phen_dir: str, msg: str, remote_url: str, increment: str = 'patch'):
1002def publish(
1003    phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC
1004):
1005    """Publishes updates to the phenotype by commiting all changes to the repo directory"""
1006
1007    # Validate config
1008    validate(phen_dir)
1009    phen_path = Path(phen_dir)
1010
1011    # load git repo and set the branch
1012    repo = git.Repo(phen_path)
1013    if DEFAULT_GIT_BRANCH in repo.branches:
1014        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
1015        main_branch.checkout()
1016    else:
1017        raise AttributeError(
1018            f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}"
1019        )
1020
1021    # check if any changes to publish
1022    if not repo.is_dirty() and not repo.untracked_files:
1023        if remote_url is not None and "origin" not in repo.remotes:
1024            _logger.info(f"First publish to remote url {remote_url}")
1025        else:
1026            _logger.info("Nothing to publish, no changes to the repo")
1027            return
1028
1029    # get next version
1030    new_version_str = _generate_version_tag(repo, increment)
1031    _logger.info(f"New version: {new_version_str}")
1032
1033    # Write version in configuration file
1034    config_path = phen_path / CONFIG_FILE
1035    with config_path.open("r") as file:
1036        config = yaml.safe_load(file)
1037
1038    config["phenotype"]["version"] = new_version_str
1039    with open(config_path, "w") as file:
1040        yaml.dump(
1041            config,
1042            file,
1043            Dumper=util.QuotedDumper,
1044            default_flow_style=False,
1045            sort_keys=False,
1046            default_style='"',
1047        )
1048
1049    # Add and commit changes to repo including version updates
1050    commit_message = f"Committing updates to phenotype {phen_path}"
1051    repo.git.add("--all")
1052    repo.index.commit(commit_message)
1053
1054    # Add tag to the repo
1055    repo.create_tag(new_version_str)
1056
1057    # push to origin if a remote repo
1058    if remote_url is not None and "origin" not in repo.remotes:
1059        git_url = _construct_git_url(remote_url)
1060        repo.create_remote("origin", git_url)
1061
1062    try:
1063        if "origin" in repo.remotes:
1064            _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}")
1065            origin = repo.remotes.origin
1066            _logger.info(f"Pushing main branch to remote repo")
1067            repo.git.push("--set-upstream", "origin", "main")
1068            _logger.info(f"Pushing version tags to remote git repo")
1069            origin.push(tags=True)
1070            _logger.debug("Changes pushed to 'origin'")
1071        else:
1072            _logger.debug("Remote 'origin' is not set")
1073    except Exception as e:
1074        tag_ref = repo.tags[new_version_str]
1075        repo.delete_tag(tag_ref)
1076        repo.git.reset("--soft", "HEAD~1")
1077        raise e
1078
1079    _logger.info(f"Phenotype published successfully")

Publishes updates to the phenotype by commiting all changes to the repo directory

def export(phen_dir: str, version: str):
1082def export(phen_dir: str, version: str):
1083    """Exports a phen repo at a specific tagged version into a target directory"""
1084    _logger.info(f"Exporting phenotype {phen_dir} at version {version}")
1085
1086    # validate configuration
1087    validate(phen_dir)
1088    phen_path = Path(phen_dir)
1089
1090    # load configuration
1091    config_path = phen_path / CONFIG_FILE
1092    with config_path.open("r") as file:
1093        config = yaml.safe_load(file)
1094
1095    map_path = phen_path / MAP_DIR
1096    if not map_path.exists():
1097        _logger.warning(f"Map path does not exist '{map_path}'")
1098
1099    export_path = phen_path / OMOP_PATH
1100    # check export directory exists and if not create it
1101    if not export_path.exists():
1102        export_path.mkdir(parents=True)
1103        _logger.debug(f"OMOP export directory '{export_path}' created.")
1104
1105    # omop export db
1106    export_db_path = omop.export(
1107        map_path,
1108        export_path,
1109        config["phenotype"]["version"],
1110        config["phenotype"]["omop"],
1111    )
1112
1113    # write to tables
1114    # export as csv
1115    _logger.info(f"Phenotype exported successfully")

Exports a phen repo at a specific tagged version into a target directory

def copy(phen_dir: str, target_dir: str, version: str):
1118def copy(phen_dir: str, target_dir: str, version: str):
1119    """Copys a phen repo at a specific tagged version into a target directory"""
1120
1121    # Validate
1122    validate(phen_dir)
1123    phen_path = Path(phen_dir)
1124
1125    # Check target directory exists
1126    target_path = Path(target_dir)
1127    if not target_path.exists():
1128        raise FileNotFoundError(f"The target directory {target_path} does not exist")
1129
1130    # Set copy directory
1131    copy_path = target_path / version
1132    _logger.info(f"Copying repo {phen_path} to {copy_path}")
1133
1134    if (
1135        copy_path.exists() and copy_path.is_dir()
1136    ):  # Check if it exists and is a directory
1137        copy = _check_delete_dir(
1138            copy_path,
1139            f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ",
1140        )
1141    else:
1142        copy = True
1143
1144    if not copy:
1145        _logger.info(f"Not copying the version {version}")
1146        return
1147
1148    _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...")
1149    repo = git.Repo.clone_from(phen_path, copy_path)
1150
1151    # Check out the latest commit or specified version
1152    if version:
1153        # Checkout a specific version (e.g., branch, tag, or commit hash)
1154        _logger.info(f"Checking out version {version}...")
1155        repo.git.checkout(version)
1156    else:
1157        # Checkout the latest commit (HEAD)
1158        _logger.info(f"Checking out the latest commit...")
1159        repo.git.checkout("HEAD")
1160
1161    _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}")
1162
1163    _logger.info(f"Phenotype copied successfully")

Copys a phen repo at a specific tagged version into a target directory

def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1167def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1168    """Extracts concepts as {name: file_path} dictionary and a name set."""
1169    concepts_dict = {
1170        item["name"]: item["file"]["path"]
1171        for item in config_data["phenotype"]["concept_sets"]
1172    }
1173    name_set = set(concepts_dict.keys())
1174    return concepts_dict, name_set

Extracts concepts as {name: file_path} dictionary and a name set.

def diff_config(old_config: dict, new_config: dict) -> str:
1188def diff_config(old_config: dict, new_config: dict) -> str:
1189    report = f"\n# Changes to phenotype configuration\n"
1190    report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n"
1191
1192    old_concepts, old_names = extract_concepts(old_config)
1193    new_concepts, new_names = extract_concepts(new_config)
1194
1195    # Check added and removed names
1196    added_names = new_names - old_names  # Names that appear in new but not in old
1197    removed_names = old_names - new_names  # Names that were in old but not in new
1198
1199    # find file path changes for unchanged names
1200    unchanged_names = old_names & new_names  # Names that exist in both
1201    file_diff = DeepDiff(
1202        {name: old_concepts[name] for name in unchanged_names},
1203        {name: new_concepts[name] for name in unchanged_names},
1204    )
1205
1206    # Find renamed concepts (same file, different name)
1207    renamed_concepts = []
1208    for removed in removed_names:
1209        old_path = old_concepts[removed]
1210        for added in added_names:
1211            new_path = new_concepts[added]
1212            if old_path == new_path:
1213                renamed_concepts.append((removed, added))
1214
1215    # Remove renamed concepts from added and removed sets
1216    for old_name, new_name in renamed_concepts:
1217        added_names.discard(new_name)
1218        removed_names.discard(old_name)
1219
1220    # generate config report
1221    if added_names:
1222        report += "## Added Concepts\n"
1223        for name in added_names:
1224            report += f"- `{name}` (File: `{new_concepts[name]}`)\n"
1225        report += "\n"
1226
1227    if removed_names:
1228        report += "## Removed Concepts\n"
1229        for name in removed_names:
1230            report += f"- `{name}` (File: `{old_concepts[name]}`)\n"
1231        report += "\n"
1232
1233    if renamed_concepts:
1234        report += "## Renamed Concepts\n"
1235        for old_name, new_name in renamed_concepts:
1236            report += (
1237                f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n"
1238            )
1239        report += "\n"
1240
1241    if "values_changed" in file_diff:
1242        report += "## Updated File Paths\n"
1243        for name, change in file_diff["values_changed"].items():
1244            old_file = change["old_value"]
1245            new_file = change["new_value"]
1246            clean_name = name.split("root['")[1].split("']")[0]
1247            report += (
1248                f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n"
1249            )
1250        report += "\n"
1251
1252    if not (
1253        added_names
1254        or removed_names
1255        or renamed_concepts
1256        or file_diff.get("values_changed")
1257    ):
1258        report += "No changes in concept sets.\n"
1259
1260    return report
def diff_map_files(old_map_path: pathlib.Path, new_map_path: pathlib.Path) -> str:
1263def diff_map_files(old_map_path: Path, new_map_path: Path) -> str:
1264    old_output_files = [
1265        file.name
1266        for file in old_map_path.iterdir()
1267        if file.is_file() and not file.name.startswith(".")
1268    ]
1269    new_output_files = [
1270        file.name
1271        for file in new_map_path.iterdir()
1272        if file.is_file() and not file.name.startswith(".")
1273    ]
1274
1275    # Convert the lists to sets for easy comparison
1276    old_output_set = set(old_output_files)
1277    new_output_set = set(new_output_files)
1278
1279    # Outputs that are in old_output_set but not in new_output_set (removed files)
1280    removed_outputs = old_output_set - new_output_set
1281    # Outputs that are in new_output_set but not in old_output_set (added files)
1282    added_outputs = new_output_set - old_output_set
1283    # Outputs that are the intersection of old_output_set and new_output_set
1284    common_outputs = old_output_set & new_output_set
1285
1286    report = f"\n# Changes to available translations\n"
1287    report += f"This compares the coding translations files available.\n\n"
1288    report += f"- Removed outputs: {sorted(list(removed_outputs))}\n"
1289    report += f"- Added outputs: {sorted(list(added_outputs))}\n"
1290    report += f"- Common outputs: {sorted(list(common_outputs))}\n\n"
1291
1292    # Step N: Compare common outputs between versions
1293    report += f"# Changes to concepts in translation files\n\n"
1294    report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n"
1295    for file in common_outputs:
1296        old_output = old_map_path / file
1297        new_output = new_map_path / file
1298
1299        _logger.debug(f"Old ouptput: {str(old_output.resolve())}")
1300        _logger.debug(f"New ouptput: {str(new_output.resolve())}")
1301
1302        df1 = pd.read_csv(old_output)
1303        df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1304        df2 = pd.read_csv(new_output)
1305        df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1306
1307        # Check for added and removed concepts
1308        report += f"- File {file}\n"
1309        sorted_list = sorted(list(set(df1.index) - set(df2.index)))
1310        report += f"- Removed concepts {sorted_list}\n"
1311        sorted_list = sorted(list(set(df2.index) - set(df1.index)))
1312        report += f"- Added concepts {sorted_list}\n"
1313
1314        # Check for changed concepts
1315        diff = df2 - df1  # diff in counts
1316        diff = diff[
1317            (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna()
1318        ]  # get non-zero counts
1319        s = "\n"
1320        if len(diff.index) > 0:
1321            for concept, row in diff.iterrows():
1322                s += "\t - {} {}\n".format(concept, row["CONCEPT"])
1323            report += f"- Changed concepts {s}\n\n"
1324        else:
1325            report += f"- Changed concepts []\n\n"
1326
1327    return report
def diff_phen( new_phen_path: pathlib.Path, new_version: str, old_phen_path: pathlib.Path, old_version: str, report_path: pathlib.Path):
1330def diff_phen(
1331    new_phen_path: Path,
1332    new_version: str,
1333    old_phen_path: Path,
1334    old_version: str,
1335    report_path: Path,
1336):
1337    """Compare the differences between two versions of a phenotype"""
1338
1339    # validate phenotypes
1340    _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}")
1341    validate(str(old_phen_path.resolve()))
1342    _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}")
1343    validate(str(new_phen_path.resolve()))
1344
1345    # get old and new config
1346    old_config_path = old_phen_path / CONFIG_FILE
1347    with old_config_path.open("r") as file:
1348        old_config = yaml.safe_load(file)
1349    new_config_path = new_phen_path / CONFIG_FILE
1350    with new_config_path.open("r") as file:
1351        new_config = yaml.safe_load(file)
1352
1353    # write report heading
1354    report = f"# Phenotype Comparison Report\n"
1355    report += f"## Original phenotype\n"
1356    report += f"  - {old_config['phenotype']['omop']['vocabulary_id']}\n"
1357    report += f"  - {old_version}\n"
1358    report += f"  - {str(old_phen_path.resolve())}\n"
1359    report += f"## Changed phenotype:\n"
1360    report += f"  - {new_config['phenotype']['omop']['vocabulary_id']}\n"
1361    report += f"  - {new_version}\n"
1362    report += f"  - {str(new_phen_path.resolve())}\n"
1363
1364    # Step 1: check differences configuration files
1365    # Convert list of dicts into a dict: {name: file}
1366    report += diff_config(old_config, new_config)
1367
1368    # Step 2: check differences between map files
1369    # List files from output directories
1370    old_map_path = old_phen_path / MAP_DIR
1371    new_map_path = new_phen_path / MAP_DIR
1372    report += diff_map_files(old_map_path, new_map_path)
1373
1374    # initialise report file
1375    _logger.debug(f"Writing to report file {str(report_path.resolve())}")
1376    report_file = open(report_path, "w")
1377    report_file.write(report)
1378    report_file.close()
1379
1380    _logger.info(f"Phenotypes diff'd successfully")

Compare the differences between two versions of a phenotype

def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str):
1383def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str):
1384    # make tmp directory .acmc
1385    timestamp = time.strftime("%Y%m%d_%H%M%S")
1386    temp_dir = Path(f".acmc/diff_{timestamp}")
1387
1388    changed_phen_path = Path(phen_dir)
1389    if not changed_phen_path.exists():
1390        raise ValueError(
1391            f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}"
1392        )
1393
1394    old_phen_path = Path(old_phen_dir)
1395    if not old_phen_path.exists():
1396        raise ValueError(
1397            f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}"
1398        )
1399
1400    try:
1401        # Create the directory
1402        temp_dir.mkdir(parents=True, exist_ok=True)
1403        _logger.debug(f"Temporary directory created: {temp_dir}")
1404
1405        # Create temporary directories
1406        changed_path = temp_dir / "changed"
1407        changed_path.mkdir(parents=True, exist_ok=True)
1408        old_path = temp_dir / "old"
1409        old_path.mkdir(parents=True, exist_ok=True)
1410
1411        # checkout changed
1412        if version == "latest":
1413            _logger.debug(
1414                f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..."
1415            )
1416            shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True)
1417        else:
1418            _logger.debug(
1419                f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..."
1420            )
1421            changed_repo = git.Repo.clone_from(changed_phen_path, changed_path)
1422            changed_repo.git.checkout(version)
1423
1424        # checkout old
1425        if old_version == "latest":
1426            _logger.debug(
1427                f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1428            )
1429            shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True)
1430        else:
1431            _logger.debug(
1432                f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1433            )
1434            old_repo = git.Repo.clone_from(old_phen_dir, old_path)
1435            old_repo.git.checkout(old_version)
1436
1437        report_filename = f"{version}_{old_version}_diff.md"
1438        report_path = changed_phen_path / report_filename
1439        # diff old with new
1440        diff_phen(changed_path, version, old_path, old_version, report_path)
1441
1442    finally:
1443        # clean up tmp directory
1444        if temp_dir.exists():
1445            shutil.rmtree(temp_dir)
1446            print(f"Temporary directory removed: {temp_dir}")