acmc.phen

phenotype.py module

This module provides functionality for managing phenotypes.

   1"""
   2phenotype.py module
   3
   4This module provides functionality for managing phenotypes.
   5"""
   6
   7import argparse
   8import pandas as pd
   9import numpy as np
  10import json
  11import os
  12import sqlite3
  13import sys
  14import shutil
  15import time
  16import git
  17import re
  18import logging
  19import requests
  20import yaml
  21import semver
  22from git import Repo
  23from cerberus import Validator  # type: ignore
  24from deepdiff import DeepDiff
  25from pathlib import Path
  26from urllib.parse import urlparse, urlunparse
  27from typing import Tuple, Set, Any
  28import acmc
  29from acmc import trud, omop, parse, util, logging_config as lc
  30
  31# set up logging
  32_logger = lc.setup_logger()
  33
  34pd.set_option("mode.chained_assignment", None)
  35
  36PHEN_DIR = "phen"
  37"""Default phenotype directory name"""
  38
  39DEFAULT_PHEN_PATH = Path("./workspace") / PHEN_DIR
  40"""Default phenotype directory path"""
  41
  42CONCEPTS_DIR = "concepts"
  43"""Default concepts directory name"""
  44
  45MAP_DIR = "map"
  46"""Default map directory name"""
  47
  48CONCEPT_SET_DIR = "concept-sets"
  49"""Default concept set directory name"""
  50
  51CSV_PATH = Path(CONCEPT_SET_DIR) / "csv"
  52"""Default CSV concept set directory path"""
  53
  54OMOP_PATH = Path(CONCEPT_SET_DIR) / "omop"
  55"""Default OMOP concept set directory path"""
  56
  57DEFAULT_PHEN_DIR_LIST = [CONCEPTS_DIR, MAP_DIR, CONCEPT_SET_DIR]
  58"""List of default phenotype directories"""
  59
  60CONFIG_FILE = "config.yml"
  61"""Default configuration filename"""
  62
  63VOCAB_VERSION_FILE = "vocab_version.yml"
  64"""Default vocabulary version filename"""
  65
  66SEMANTIC_VERSION_TYPES = ["major", "minor", "patch"]
  67"""List of semantic version increment types"""
  68
  69DEFAULT_VERSION_INC = "patch"
  70"""Default semantic version increment type"""
  71
  72DEFAULT_GIT_BRANCH = "main"
  73"""Default phenotype repo branch name"""
  74
  75SPLIT_COL_ACTION = "split_col"
  76"""Split column preprocessing action type"""
  77
  78CODES_COL_ACTION = "codes_col"
  79"""Codes column preprocessing action type"""
  80
  81DIVIDE_COL_ACTION = "divide_col"
  82"""Divide column preprocessing action type"""
  83
  84COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION]
  85"""List of column preprocessing action types"""
  86
  87CODE_FILE_TYPES = [".xlsx", ".xls", ".csv"]
  88"""List of supported source concept coding list file types"""
  89
  90# config.yaml schema
  91CONFIG_SCHEMA = {
  92    "phenotype": {
  93        "type": "dict",
  94        "required": True,
  95        "schema": {
  96            "version": {
  97                "type": "string",
  98                "required": True,
  99                "regex": r"^\d+\.\d+\.\d+$",  # Enforces 'vN.N.N' format
 100            },
 101            "omop": {
 102                "type": "dict",
 103                "required": True,
 104                "schema": {
 105                    "vocabulary_id": {"type": "string", "required": True},
 106                    "vocabulary_name": {"type": "string", "required": True},
 107                    "vocabulary_reference": {
 108                        "type": "string",
 109                        "required": True,
 110                        "regex": r"^https?://.*",  # Ensures it's a URL
 111                    },
 112                },
 113            },
 114            "map": {
 115                "type": "list",
 116                "schema": {
 117                    "type": "string",
 118                    "allowed": list(
 119                        parse.SUPPORTED_CODE_TYPES
 120                    ),  # Ensure only predefined values are allowed
 121                },
 122            },
 123            "concept_sets": {
 124                "type": "list",
 125                "required": True,
 126                "schema": {
 127                    "type": "dict",
 128                    "schema": {
 129                        "name": {"type": "string", "required": True},
 130                        "file": {
 131                            "type": "dict",
 132                            "required": False,
 133                            "schema": {
 134                                "path": {"type": "string", "required": True},
 135                                "columns": {"type": "dict", "required": True},
 136                                "category": {
 137                                    "type": "string"
 138                                },  # Optional but must be string if present
 139                                "actions": {
 140                                    "type": "dict",
 141                                    "schema": {"divide_col": {"type": "string"}},
 142                                },
 143                            },
 144                        },
 145                    },
 146                },
 147            },
 148        },
 149    }
 150}
 151"""Phenotype config.yml schema definition"""
 152
 153#                        "metadata": {"type": "dict", "required": True},
 154
 155
 156class PhenValidationException(Exception):
 157    """Custom exception class raised when validation errors in phenotype configuration file"""
 158
 159    def __init__(self, message, validation_errors=None):
 160        super().__init__(message)
 161        self.validation_errors = validation_errors
 162
 163
 164def _construct_git_url(remote_url: str):
 165    """Constructs a git url for github or gitlab including a PAT token environment variable"""
 166    # check the url
 167    parsed_url = urlparse(remote_url)
 168
 169    # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd
 170    # need to update this function if the URL scheme is different.
 171    if "github.com" in parsed_url.netloc:
 172        # get GitHub PAT from environment variable
 173        auth = os.getenv("ACMC_GITHUB_PAT")
 174        if not auth:
 175            raise ValueError(
 176                "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable."
 177            )
 178    else:
 179        # get GitLab PAT from environment variable
 180        auth = os.getenv("ACMC_GITLAB_PAT")
 181        if not auth:
 182            raise ValueError(
 183                "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable."
 184            )
 185        auth = f"oauth2:{auth}"
 186
 187    # Construct the new URL with credentials
 188    new_netloc = f"{auth}@{parsed_url.netloc}"
 189    return urlunparse(
 190        (
 191            parsed_url.scheme,
 192            new_netloc,
 193            parsed_url.path,
 194            parsed_url.params,
 195            parsed_url.query,
 196            parsed_url.fragment,
 197        )
 198    )
 199
 200
 201def _create_empty_git_dir(path: Path):
 202    """Creates a directory with a .gitkeep file so that it's tracked in git"""
 203    path.mkdir(exist_ok=True)
 204    keep_path = path / ".gitkeep"
 205    keep_path.touch(exist_ok=True)
 206
 207
 208def _check_delete_dir(path: Path, msg: str) -> bool:
 209    """Checks on the command line if a user wants to delete a directory
 210
 211    Args:
 212        path (Path): path of the directory to be deleted
 213        msg (str): message to be displayed to the user
 214
 215    Returns:
 216        Boolean: True if deleted
 217    """
 218    deleted = False
 219
 220    user_input = input(f"{msg}").strip().lower()
 221    if user_input in ["yes", "y"]:
 222        shutil.rmtree(path)
 223        deleted = True
 224    else:
 225        _logger.info("Directory was not deleted.")
 226
 227    return deleted
 228
 229
 230def init(phen_dir: str, remote_url: str):
 231    """Initial phenotype directory as git repo with standard structure"""
 232    _logger.info(f"Initialising Phenotype in directory: {phen_dir}")
 233    phen_path = Path(phen_dir)
 234
 235    # check if directory already exists and ask user if they want to recreate it
 236    if (
 237        phen_path.exists() and phen_path.is_dir()
 238    ):  # Check if it exists and is a directory
 239        configure = _check_delete_dir(
 240            phen_path,
 241            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
 242        )
 243    else:
 244        configure = True
 245
 246    if not configure:
 247        _logger.info(f"Exiting, phenotype not initiatised")
 248        return
 249
 250    # Initialise repo from local or remote
 251    repo: Repo
 252
 253    # if remote then clone the repo otherwise init a local repo
 254    if remote_url != None:
 255        # add PAT token to the URL
 256        git_url = _construct_git_url(remote_url)
 257
 258        # clone the repo
 259        git_cmd = git.cmd.Git()
 260        git_cmd.clone(git_url, phen_path)
 261
 262        # open repo
 263        repo = Repo(phen_path)
 264        # check if there are any commits (new repo has no commits)
 265        if (
 266            len(repo.branches) == 0 or repo.head.is_detached
 267        ):  # Handle detached HEAD (e.g., after init)
 268            _logger.debug("The phen repository has no commits yet.")
 269            commit_count = 0
 270        else:
 271            # Get the total number of commits in the default branch
 272            commit_count = sum(1 for _ in repo.iter_commits())
 273            _logger.debug(f"Repo has previous commits: {commit_count}")
 274    else:
 275        # local repo, create the directories and init
 276        phen_path.mkdir(parents=True, exist_ok=True)
 277        _logger.debug(f"Phen directory '{phen_path}' has been created.")
 278        repo = git.Repo.init(phen_path)
 279        commit_count = 0
 280
 281    phen_path = phen_path.resolve()
 282    # initialise empty repos
 283    if commit_count == 0:
 284        # create initial commit
 285        initial_file_path = phen_path / "README.md"
 286        with open(initial_file_path, "w") as file:
 287            file.write(
 288                "# Initial commit\nThis is the first commit in the phen repository.\n"
 289            )
 290        repo.index.add([initial_file_path])
 291        repo.index.commit("Initial commit")
 292        commit_count = 1
 293
 294    # Checkout the phens default branch, creating it if it does not exist
 295    if DEFAULT_GIT_BRANCH in repo.branches:
 296        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
 297        main_branch.checkout()
 298    else:
 299        main_branch = repo.create_head(DEFAULT_GIT_BRANCH)
 300        main_branch.checkout()
 301
 302    # if the phen path does not contain the config file then initialise the phen type
 303    config_path = phen_path / CONFIG_FILE
 304    if config_path.exists():
 305        _logger.debug(f"Phenotype configuration files already exist")
 306        return
 307
 308    _logger.info("Creating phen directory structure and config files")
 309    for d in DEFAULT_PHEN_DIR_LIST:
 310        _create_empty_git_dir(phen_path / d)
 311
 312    # create empty phen config file
 313    config = {
 314        "phenotype": {
 315            "version": "0.0.0",
 316            "omop": {
 317                "vocabulary_id": "",
 318                "vocabulary_name": "",
 319                "vocabulary_reference": "",
 320            },
 321            "translate": [],
 322            "concept_sets": [],
 323        }
 324    }
 325
 326    with open(phen_path / CONFIG_FILE, "w") as file:
 327        yaml.dump(
 328            config,
 329            file,
 330            Dumper=util.QuotedDumper,
 331            default_flow_style=False,
 332            sort_keys=False,
 333            default_style='"',
 334        )
 335
 336    # add git ignore
 337    ignore_content = """# Ignore SQLite database files
 338*.db
 339*.sqlite3
 340 
 341# Ignore SQLite journal and metadata files
 342*.db-journal
 343*.sqlite3-journal
 344
 345# python
 346.ipynb_checkpoints
 347 """
 348    ignore_path = phen_path / ".gitignore"
 349    with open(ignore_path, "w") as file:
 350        file.write(ignore_content)
 351
 352    # add to git repo and commit
 353    for d in DEFAULT_PHEN_DIR_LIST:
 354        repo.git.add(phen_path / d)
 355    repo.git.add(all=True)
 356    repo.index.commit("initialised the phen git repo.")
 357
 358    _logger.info(f"Phenotype initialised successfully")
 359
 360
 361def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
 362    """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
 363
 364    Args:
 365        phen_dir (str): local directory path where the upstream repo is to be cloned
 366        upstream_url (str): url to the upstream repo
 367        upstream_version (str): version in the upstream repo to clone
 368        new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
 369
 370    Raises:
 371        ValueError: if the specified version is not in the upstream repo
 372        ValueError: if the upstream repo is not a valid phenotype repo
 373        ValueError: if there's any other problems with Git
 374    """
 375    _logger.info(
 376        f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}"
 377    )
 378
 379    phen_path = Path(phen_dir)
 380    # check if directory already exists and ask user if they want to recreate it
 381    if (
 382        phen_path.exists() and phen_path.is_dir()
 383    ):  # Check if it exists and is a directory
 384        configure = _check_delete_dir(
 385            phen_path,
 386            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
 387        )
 388    else:
 389        configure = True
 390
 391    if not configure:
 392        _logger.info(f"Exiting, phenotype not initiatised")
 393        return
 394
 395    try:
 396        # Clone repo
 397        git_url = _construct_git_url(upstream_url)
 398        repo = git.Repo.clone_from(git_url, phen_path)
 399
 400        # Fetch all branches and tags
 401        repo.remotes.origin.fetch()
 402
 403        # Check if the version exists
 404        available_refs = [ref.name.split("/")[-1] for ref in repo.references]
 405        if upstream_version not in available_refs:
 406            raise ValueError(
 407                f"Version '{upstream_version}' not found in the repository: {upstream_url}."
 408            )
 409
 410        # Checkout the specified version
 411        repo.git.checkout(upstream_version)
 412        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
 413        main_branch.checkout()
 414
 415        # Check if 'config.yml' exists in the root directory
 416        config_path = phen_path / "config.yml"
 417        if not os.path.isfile(config_path):
 418            raise ValueError(
 419                f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory."
 420            )
 421
 422        # Validate the phenotype is compatible with the acmc tool
 423        validate(str(phen_path.resolve()))
 424
 425        # Delete each tag locally
 426        tags = repo.tags
 427        for tag in tags:
 428            repo.delete_tag(tag)
 429            _logger.debug(f"Deleted tags from forked repo: {tag}")
 430
 431        # Add upstream remote
 432        repo.create_remote("upstream", upstream_url)
 433        remote = repo.remotes["origin"]
 434        repo.delete_remote(remote)  # Remove existing origin
 435
 436        # Optionally set a new origin remote
 437        if new_origin_url:
 438            git_url = _construct_git_url(new_origin_url)
 439            repo.create_remote("origin", git_url)
 440            repo.git.push("--set-upstream", "origin", "main")
 441
 442        _logger.info(f"Repository forked successfully at {phen_path}")
 443        _logger.info(f"Upstream set to {upstream_url}")
 444        if new_origin_url:
 445            _logger.info(f"Origin set to {new_origin_url}")
 446
 447    except Exception as e:
 448        if phen_path.exists():
 449            shutil.rmtree(phen_path)
 450        raise ValueError(f"Error occurred during repository fork: {str(e)}")
 451
 452
 453def validate(phen_dir: str):
 454    """Validates the phenotype directory is a git repo with standard structure"""
 455    _logger.info(f"Validating phenotype: {phen_dir}")
 456    phen_path = Path(phen_dir)
 457    if not phen_path.is_dir():
 458        raise NotADirectoryError(
 459            f"Error: '{str(phen_path.resolve())}' is not a directory"
 460        )
 461
 462    config_path = phen_path / CONFIG_FILE
 463    if not config_path.is_file():
 464        raise FileNotFoundError(
 465            f"Error: phen configuration file '{config_path}' does not exist."
 466        )
 467
 468    concepts_path = phen_path / CONCEPTS_DIR
 469    if not concepts_path.is_dir():
 470        raise FileNotFoundError(
 471            f"Error: source concepts directory {concepts_path} does not exist."
 472        )
 473
 474    # Calidate the directory is a git repo
 475    try:
 476        git.Repo(phen_path)
 477    except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError):
 478        raise Exception(f"Phen directory {phen_path} is not a git repo")
 479
 480    # Load configuration File
 481    if config_path.suffix == ".yml":
 482        try:
 483            with config_path.open("r") as file:
 484                phenotype = yaml.safe_load(file)
 485
 486            validator = Validator(CONFIG_SCHEMA)
 487            if validator.validate(phenotype):
 488                _logger.debug("YAML structure is valid.")
 489            else:
 490                _logger.error(f"YAML structure validation failed: {validator.errors}")
 491                raise Exception(f"YAML structure validation failed: {validator.errors}")
 492        except yaml.YAMLError as e:
 493            _logger.error(f"YAML syntax error: {e}")
 494            raise e
 495    else:
 496        raise Exception(
 497            f"Unsupported configuration filetype: {str(config_path.resolve())}"
 498        )
 499
 500    # initiatise
 501    validation_errors = []
 502    phenotype = phenotype["phenotype"]
 503    code_types = parse.CodeTypeParser().code_types
 504
 505    # check the version number is of the format vn.n.n
 506    match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"])
 507    if not match:
 508        validation_errors.append(
 509            f"Invalid version format in configuration file: {phenotype['version']}"
 510        )
 511
 512    # create a list of all the concept set names defined in the concept set configuration
 513    concept_set_names = []
 514    for item in phenotype["concept_sets"]:
 515        if item["name"] in concept_set_names:
 516            validation_errors.append(
 517                f"Duplicate concept set defined in concept sets {item['name'] }"
 518            )
 519        else:
 520            concept_set_names.append(item["name"])
 521
 522    # check codes definition
 523    for item in phenotype["concept_sets"]:
 524        # check concepte code file exists
 525        concept_code_file_path = concepts_path / item["file"]["path"]
 526        if not concept_code_file_path.exists():
 527            validation_errors.append(
 528                f"Coding file {str(concept_code_file_path.resolve())} does not exist"
 529            )
 530
 531        # check concepte code file is not empty
 532        if concept_code_file_path.stat().st_size == 0:
 533            validation_errors.append(
 534                f"Coding file {str(concept_code_file_path.resolve())} is an empty file"
 535            )
 536
 537        # check code file type is supported
 538        if concept_code_file_path.suffix not in CODE_FILE_TYPES:
 539            raise ValueError(
 540                f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types"
 541            )
 542
 543        # check columns specified are a supported medical coding type
 544        for column in item["file"]["columns"]:
 545            if column not in code_types:
 546                validation_errors.append(
 547                    f"Column type {column} for file {concept_code_file_path} is not supported"
 548                )
 549
 550        # check the actions are supported
 551        if "actions" in item["file"]:
 552            for action in item["file"]["actions"]:
 553                if action not in COL_ACTIONS:
 554                    validation_errors.append(f"Action {action} is not supported")
 555
 556    if len(validation_errors) > 0:
 557        _logger.error(validation_errors)
 558        raise PhenValidationException(
 559            f"Configuration file {str(config_path.resolve())} failed validation",
 560            validation_errors,
 561        )
 562
 563    _logger.info(f"Phenotype validated successfully")
 564
 565
 566def _read_table_file(path: Path, excel_sheet: str = ""):
 567    """
 568    Load Code List File
 569    """
 570
 571    path = path.resolve()
 572    if path.suffix == ".csv":
 573        df = pd.read_csv(path, dtype=str)
 574    elif path.suffix == ".xlsx" or path.suffix == ".xls":
 575        if excel_sheet != "":
 576            df = pd.read_excel(path, sheet_name=excel_sheet, dtype=str)
 577        else:
 578            df = pd.read_excel(path, dtype=str)
 579    elif path.suffix == ".dta":
 580        df = pd.read_stata(path)
 581    else:
 582        raise ValueError(
 583            f"Unsupported filetype {path.suffix}, only support{CODE_FILE_TYPES} code file types"
 584        )
 585
 586    return df
 587
 588
 589def _process_actions(df: pd.DataFrame, concept_set: dict) -> pd.DataFrame:
 590    # Perform Structural Changes to file before preprocessing
 591    _logger.debug("Processing file structural actions")
 592    if (
 593        "actions" in concept_set["file"]
 594        and "split_col" in concept_set["file"]["actions"]
 595        and "codes_col" in concept_set["file"]["actions"]
 596    ):
 597        split_col = concept_set["file"]["actions"]["split_col"]
 598        codes_col = concept_set["file"]["actions"]["codes_col"]
 599        _logger.debug(
 600            "Action: Splitting",
 601            split_col,
 602            "column into:",
 603            df[split_col].unique(),
 604        )
 605        codes = df[codes_col]
 606        oh = pd.get_dummies(df[split_col], dtype=bool)  # one hot encode
 607        oh = oh.where((oh != True), codes, axis=0)  # fill in 1s with codes
 608        oh[oh == False] = np.nan  # replace 0s with None
 609        df = pd.concat([df, oh], axis=1)  # merge in new columns
 610
 611    return df
 612
 613
 614def _preprocess_source_concepts(
 615    df: pd.DataFrame, concept_set: dict, code_file_path: Path
 616) -> Tuple[pd.DataFrame, list]:
 617    """Perform QA Checks on columns individually and append to df"""
 618    out = pd.DataFrame([])  # create output df to append to
 619    code_errors = []  # list of errors from processing
 620
 621    # remove unnamed columns due to extra commas, missing headers, or incorrect parsing
 622    df = df.drop(columns=[col for col in df.columns if "Unnamed" in col])
 623
 624    # Preprocess codes
 625    code_types = parse.CodeTypeParser().code_types
 626    for code_type in concept_set["file"]["columns"]:
 627        parser = code_types[code_type]
 628        _logger.info(f"Processing {code_type} codes for {code_file_path}")
 629
 630        # get codes by column name
 631        source_col_name = concept_set["file"]["columns"][code_type]
 632        codes = df[source_col_name].dropna()
 633        codes = codes.astype(str)  # convert to string
 634        codes = codes.str.strip()  # remove excess spaces
 635
 636        # process codes, validating them using parser and returning the errors
 637        codes, errors = parser.process(codes, code_file_path)
 638        if len(errors) > 0:
 639            code_errors.extend(errors)
 640            _logger.warning(f"Codes validation failed with {len(errors)} errors")
 641
 642        # add processed codes to df
 643        new_col_name = f"{source_col_name}_SOURCE"
 644        df = df.rename(columns={source_col_name: new_col_name})
 645        process_codes = pd.DataFrame({code_type: codes}).join(df)
 646        out = pd.concat(
 647            [out, process_codes],
 648            ignore_index=True,
 649        )
 650
 651    _logger.debug(out.head())
 652
 653    return out, code_errors
 654
 655
 656# Translate Df with multiple codes into single code type Series
 657def translate_codes(
 658    source_df: pd.DataFrame, target_code_type: str, concept_name: str
 659) -> pd.DataFrame:
 660    """Translates each source code type the source coding list into a target type and returns all conversions as a concept set"""
 661
 662    # codes = pd.DataFrame([], dtype=str)
 663    codes = pd.DataFrame(
 664        columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string"
 665    )
 666    # Convert codes to target type
 667    _logger.info(f"Converting to target code type {target_code_type}")
 668
 669    for source_code_type in source_df.columns:
 670        # if target code type is the same as thet source code type, no translation, just appending source as target
 671        if source_code_type == target_code_type:
 672            copy_df = pd.DataFrame(
 673                {
 674                    "SOURCE_CONCEPT": source_df[source_code_type],
 675                    "SOURCE_CONCEPT_TYPE": source_code_type,
 676                    "CONCEPT": source_df[source_code_type],
 677                }
 678            )
 679            codes = pd.concat([codes, copy_df])
 680            _logger.debug(
 681                f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating"
 682            )
 683        else:
 684            # get the translation filename using source to target code types
 685            filename = f"{source_code_type}_to_{target_code_type}.parquet"
 686            map_path = trud.PROCESSED_PATH / filename
 687
 688            # do the mapping if it exists
 689            if map_path.exists():
 690                # get mapping
 691                df_map = pd.read_parquet(map_path)
 692
 693                # do mapping
 694                translated_df = pd.merge(
 695                    source_df[source_code_type], df_map, how="left"
 696                )
 697
 698                # normalise the output
 699                translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"])
 700                translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type
 701
 702                # add to list of codes
 703                codes = pd.concat([codes, translated_df])
 704
 705            else:
 706                _logger.warning(
 707                    f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist"
 708                )
 709
 710    codes = codes.dropna()  # delete NaNs
 711
 712    # added concept set type to output if any translations
 713    if len(codes.index) > 0:
 714        codes["CONCEPT_SET"] = concept_name
 715    else:
 716        _logger.debug(f"No codes converted with target code type {target_code_type}")
 717
 718    return codes
 719
 720
 721def _write_code_errors(code_errors: list, code_errors_path: Path):
 722    err_df = pd.DataFrame(
 723        [
 724            {
 725                "CONCEPT": ", ".join(err.codes[~err.mask].tolist()),
 726                "VOCABULARY": err.code_type,
 727                "SOURCE": err.codes_file,
 728                "CAUSE": err.message,
 729            }
 730            for err in code_errors
 731        ]
 732    )
 733
 734    err_df = err_df.drop_duplicates()  # Remove Duplicates from Error file
 735    err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"])
 736    err_df.to_csv(code_errors_path, index=False, mode="w")
 737
 738
 739def write_vocab_version(phen_path: Path):
 740    # write the vocab version files
 741
 742    if not trud.VERSION_PATH.exists():
 743        raise FileNotFoundError(
 744            f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed"
 745        )
 746
 747    if not omop.VERSION_PATH.exists():
 748        raise FileNotFoundError(
 749            f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed"
 750        )
 751
 752    with trud.VERSION_PATH.open("r") as file:
 753        trud_version = yaml.safe_load(file)
 754
 755    with omop.VERSION_PATH.open("r") as file:
 756        omop_version = yaml.safe_load(file)
 757
 758    # Create the combined YAML structure
 759    version_data = {
 760        "versions": {
 761            "acmc": acmc.__version__,
 762            "trud": trud_version,
 763            "omop": omop_version,
 764        }
 765    }
 766
 767    with open(phen_path / VOCAB_VERSION_FILE, "w") as file:
 768        yaml.dump(
 769            version_data,
 770            file,
 771            Dumper=util.QuotedDumper,
 772            default_flow_style=False,
 773            sort_keys=False,
 774            default_style='"',
 775        )
 776
 777
 778def map(phen_dir: str, target_code_type: str):
 779    _logger.info(f"Processing phenotype: {phen_dir}")
 780
 781    # Validate configuration
 782    validate(phen_dir)
 783
 784    # initialise paths
 785    phen_path = Path(phen_dir)
 786    config_path = phen_path / CONFIG_FILE
 787
 788    # load configuration
 789    with config_path.open("r") as file:
 790        config = yaml.safe_load(file)
 791    phenotype = config["phenotype"]
 792
 793    if len(phenotype["map"]) == 0:
 794        raise ValueError(f"No map codes defined in the phenotype configuration")
 795
 796    if target_code_type is not None and target_code_type not in phenotype["map"]:
 797        raise ValueError(
 798            f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}"
 799        )
 800
 801    if target_code_type is not None:
 802        _map_target_code_type(phen_path, phenotype, target_code_type)
 803    else:
 804        for t in phenotype["map"]:
 805            _map_target_code_type(phen_path, phenotype, t)
 806
 807    _logger.info(f"Phenotype processed successfully")
 808
 809
 810def _map_target_code_type(phen_path: Path, phenotype: dict, target_code_type: str):
 811    _logger.debug(f"Target coding format: {target_code_type}")
 812    concepts_path = phen_path / CONCEPTS_DIR
 813    # Create output dataframe
 814    out = pd.DataFrame([])
 815    code_errors = []
 816
 817    # Process each folder in codes section
 818    for concept_set in phenotype["concept_sets"]:
 819        _logger.debug(f"--- {concept_set['file']} ---")
 820
 821        # Load code file
 822        codes_file_path = Path(concepts_path / concept_set["file"]["path"])
 823        df = _read_table_file(codes_file_path)
 824
 825        # process structural actions
 826        df = _process_actions(df, concept_set)
 827
 828        # preprocessing and validate of source concepts
 829        _logger.debug("Processing and validating source concept codes")
 830        df, errors = _preprocess_source_concepts(
 831            df,
 832            concept_set,
 833            codes_file_path,
 834        )
 835
 836        # create df with just the source code columns
 837        source_column_names = list(concept_set["file"]["columns"].keys())
 838        source_df = df[source_column_names]
 839
 840        _logger.debug(source_df.columns)
 841        _logger.debug(source_df.head())
 842
 843        _logger.debug(
 844            f"Length of errors from _preprocess_source_concepts {len(errors)}"
 845        )
 846        if len(errors) > 0:
 847            code_errors.extend(errors)
 848        _logger.debug(f" Length of code_errors {len(code_errors)}")
 849
 850        # Map source concepts codes to target codes
 851        # if processing a source coding list with categorical data
 852        if (
 853            "actions" in concept_set["file"]
 854            and "divide_col" in concept_set["file"]["actions"]
 855            and len(df) > 0
 856        ):
 857            divide_col = concept_set["file"]["actions"]["divide_col"]
 858            _logger.debug(f"Action: Dividing Table by {divide_col}")
 859            _logger.debug(f"column into: {df[divide_col].unique()}")
 860            df_grp = df.groupby(divide_col)
 861            for cat, grp in df_grp:
 862                if cat == concept_set["file"]["category"]:
 863                    grp = grp.drop(columns=[divide_col])  # delete categorical column
 864                    source_df = grp[source_column_names]
 865                    trans_out = translate_codes(
 866                        source_df,
 867                        target_code_type=target_code_type,
 868                        concept_name=concept_set["name"],
 869                    )
 870                    out = pd.concat([out, trans_out])
 871        else:
 872            source_df = df[source_column_names]
 873            trans_out = translate_codes(
 874                source_df,
 875                target_code_type=target_code_type,
 876                concept_name=concept_set["name"],
 877            )
 878            out = pd.concat([out, trans_out])
 879
 880    if len(code_errors) > 0:
 881        _logger.error(f"The map processing has {len(code_errors)} errors")
 882        error_path = phen_path / MAP_DIR / "errors"
 883        error_path.mkdir(parents=True, exist_ok=True)
 884        error_filename = f"{target_code_type}-code-errors.csv"
 885        _write_code_errors(code_errors, error_path / error_filename)
 886
 887    # Check there is output from processing
 888    if len(out.index) == 0:
 889        _logger.error(f"No output after map processing")
 890        raise Exception(
 891            f"No output after map processing, check config {str(phen_path.resolve())}"
 892        )
 893
 894    # final processing
 895    out = out.reset_index(drop=True)
 896    out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
 897    out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"])
 898
 899    out_count = len(out.index)
 900    # added metadata
 901    # Loop over each source_concept_type and perform the left join on all columns apart from source code columns
 902    result_list = []
 903    source_column_names = list(concept_set["file"]["columns"].keys())
 904    for source_concept_type in source_column_names:
 905        # Filter output based on the current source_concept_type
 906        out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type]
 907        filtered_count = len(out_filtered_df.index)
 908
 909        # Remove the source type columns except the current type will leave the metadata and the join
 910        remove_types = [
 911            type for type in source_column_names if type != source_concept_type
 912        ]
 913        metadata_df = df.drop(columns=remove_types)
 914        metadata_df = metadata_df.rename(
 915            columns={source_concept_type: "SOURCE_CONCEPT"}
 916        )
 917        metadata_df_count = len(metadata_df.index)
 918
 919        # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata
 920        result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT")
 921        result_count = len(result.index)
 922
 923        _logger.debug(
 924            f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}"
 925        )
 926
 927        # Append the result to the result_list
 928        result_list.append(result)
 929
 930    # Concatenate all the results into a single DataFrame
 931    final_out = pd.concat(result_list, ignore_index=True)
 932    final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
 933    _logger.debug(
 934        f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}"
 935    )
 936
 937    # Save output to map directory
 938    output_filename = target_code_type + ".csv"
 939    map_path = phen_path / MAP_DIR / output_filename
 940    final_out.to_csv(map_path, index=False)
 941    _logger.info(f"Saved mapped concepts to {str(map_path.resolve())}")
 942
 943    # save concept sets as separate files
 944    concept_set_path = phen_path / CSV_PATH / target_code_type
 945
 946    # empty the concept-set directory except for hiddle files, e.g. .git
 947    if concept_set_path.exists():
 948        for item in concept_set_path.iterdir():
 949            if not item.name.startswith("."):
 950                item.unlink()
 951    else:
 952        concept_set_path.mkdir(parents=True, exist_ok=True)
 953
 954    # write each concept as a separate file
 955    for name, concept in final_out.groupby("CONCEPT_SET"):
 956        concept = concept.sort_values(by="CONCEPT")  # sort rows
 957        concept = concept.dropna(how="all", axis=1)  # remove empty cols
 958        concept = concept.reindex(
 959            sorted(concept.columns), axis=1
 960        )  # sort cols alphabetically
 961        filename = f"{name}.csv"
 962        concept_path = concept_set_path / filename
 963        concept.to_csv(concept_path, index=False)
 964
 965    write_vocab_version(phen_path)
 966
 967    _logger.info(f"Phenotype processed target code type {target_code_type}")
 968
 969
 970def _generate_version_tag(
 971    repo: git.Repo, increment: str = DEFAULT_VERSION_INC, use_v_prefix: bool = False
 972) -> str:
 973    # Get all valid semantic version tags
 974    versions = []
 975    for tag in repo.tags:
 976        if tag.name.startswith("v"):
 977            tag_name = tag.name[1:]  # Remove the first character
 978        else:
 979            tag_name = tag.name
 980        if semver.Version.is_valid(tag_name):
 981            versions.append(semver.Version.parse(tag_name))
 982
 983    _logger.debug(f"Versions: {versions}")
 984    # Determine the next version
 985    if not versions:
 986        new_version = semver.Version(0, 0, 1)
 987    else:
 988        latest_version = max(versions)
 989        if increment == "major":
 990            new_version = latest_version.bump_major()
 991        elif increment == "minor":
 992            new_version = latest_version.bump_minor()
 993        else:
 994            new_version = latest_version.bump_patch()
 995
 996    # Create the new tag
 997    new_version_str = f"v{new_version}" if use_v_prefix else str(new_version)
 998
 999    return new_version_str
1000
1001
1002def publish(
1003    phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC
1004):
1005    """Publishes updates to the phenotype by commiting all changes to the repo directory"""
1006
1007    # Validate config
1008    validate(phen_dir)
1009    phen_path = Path(phen_dir)
1010
1011    # load git repo and set the branch
1012    repo = git.Repo(phen_path)
1013    if DEFAULT_GIT_BRANCH in repo.branches:
1014        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
1015        main_branch.checkout()
1016    else:
1017        raise AttributeError(
1018            f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}"
1019        )
1020
1021    # check if any changes to publish
1022    if not repo.is_dirty() and not repo.untracked_files:
1023        if remote_url is not None and "origin" not in repo.remotes:
1024            _logger.info(f"First publish to remote url {remote_url}")
1025        else:
1026            _logger.info("Nothing to publish, no changes to the repo")
1027            return
1028
1029    # get next version
1030    new_version_str = _generate_version_tag(repo, increment)
1031    _logger.info(f"New version: {new_version_str}")
1032
1033    # Write version in configuration file
1034    config_path = phen_path / CONFIG_FILE
1035    with config_path.open("r") as file:
1036        config = yaml.safe_load(file)
1037
1038    config["phenotype"]["version"] = new_version_str
1039    with open(config_path, "w") as file:
1040        yaml.dump(
1041            config,
1042            file,
1043            Dumper=util.QuotedDumper,
1044            default_flow_style=False,
1045            sort_keys=False,
1046            default_style='"',
1047        )
1048
1049    # Add and commit changes to repo including version updates
1050    commit_message = f"Committing updates to phenotype {phen_path}"
1051    repo.git.add("--all")
1052    repo.index.commit(commit_message)
1053
1054    # Add tag to the repo
1055    repo.create_tag(new_version_str)
1056
1057    # push to origin if a remote repo
1058    if remote_url is not None and "origin" not in repo.remotes:
1059        git_url = _construct_git_url(remote_url)
1060        repo.create_remote("origin", git_url)
1061
1062    try:
1063        if "origin" in repo.remotes:
1064            _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}")
1065            origin = repo.remotes.origin
1066            _logger.info(f"Pushing main branch to remote repo")
1067            repo.git.push("--set-upstream", "origin", "main")
1068            _logger.info(f"Pushing version tags to remote git repo")
1069            origin.push(tags=True)
1070            _logger.debug("Changes pushed to 'origin'")
1071        else:
1072            _logger.debug("Remote 'origin' is not set")
1073    except Exception as e:
1074        tag_ref = repo.tags[new_version_str]
1075        repo.delete_tag(tag_ref)
1076        repo.git.reset("--soft", "HEAD~1")
1077        raise e
1078
1079    _logger.info(f"Phenotype published successfully")
1080
1081
1082def export(phen_dir: str, version: str):
1083    """Exports a phen repo at a specific tagged version into a target directory"""
1084    _logger.info(f"Exporting phenotype {phen_dir} at version {version}")
1085
1086    # validate configuration
1087    validate(phen_dir)
1088    phen_path = Path(phen_dir)
1089
1090    # load configuration
1091    config_path = phen_path / CONFIG_FILE
1092    with config_path.open("r") as file:
1093        config = yaml.safe_load(file)
1094
1095    map_path = phen_path / MAP_DIR
1096    if not map_path.exists():
1097        _logger.warning(f"Map path does not exist '{map_path}'")
1098
1099    export_path = phen_path / OMOP_PATH
1100    # check export directory exists and if not create it
1101    if not export_path.exists():
1102        export_path.mkdir(parents=True)
1103        _logger.debug(f"OMOP export directory '{export_path}' created.")
1104
1105    # omop export db
1106    export_db_path = omop.export(
1107        map_path,
1108        export_path,
1109        config["phenotype"]["version"],
1110        config["phenotype"]["omop"],
1111    )
1112
1113    # write to tables
1114    # export as csv
1115    _logger.info(f"Phenotype exported successfully")
1116
1117
1118def copy(phen_dir: str, target_dir: str, version: str):
1119    """Copys a phen repo at a specific tagged version into a target directory"""
1120
1121    # Validate
1122    validate(phen_dir)
1123    phen_path = Path(phen_dir)
1124
1125    # Check target directory exists
1126    target_path = Path(target_dir)
1127    if not target_path.exists():
1128        raise FileNotFoundError(f"The target directory {target_path} does not exist")
1129
1130    # Set copy directory
1131    copy_path = target_path / version
1132    _logger.info(f"Copying repo {phen_path} to {copy_path}")
1133
1134    if (
1135        copy_path.exists() and copy_path.is_dir()
1136    ):  # Check if it exists and is a directory
1137        copy = _check_delete_dir(
1138            copy_path,
1139            f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ",
1140        )
1141    else:
1142        copy = True
1143
1144    if not copy:
1145        _logger.info(f"Not copying the version {version}")
1146        return
1147
1148    _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...")
1149    repo = git.Repo.clone_from(phen_path, copy_path)
1150
1151    # Check out the latest commit or specified version
1152    if version:
1153        # Checkout a specific version (e.g., branch, tag, or commit hash)
1154        _logger.info(f"Checking out version {version}...")
1155        repo.git.checkout(version)
1156    else:
1157        # Checkout the latest commit (HEAD)
1158        _logger.info(f"Checking out the latest commit...")
1159        repo.git.checkout("HEAD")
1160
1161    _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}")
1162
1163    _logger.info(f"Phenotype copied successfully")
1164
1165
1166# Convert concept_sets list into dictionaries
1167def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1168    """Extracts concepts as {name: file_path} dictionary and a name set."""
1169    concepts_dict = {
1170        item["name"]: item["file"]["path"]
1171        for item in config_data["phenotype"]["concept_sets"]
1172    }
1173    name_set = set(concepts_dict.keys())
1174    return concepts_dict, name_set
1175
1176
1177def _extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]:
1178    """
1179    Extracts clean keys from a DeepDiff dictionary.
1180
1181    :param diff: DeepDiff result dictionary
1182    :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed")
1183    :return: A set of clean key names
1184    """
1185    return {key.split("root['")[1].split("']")[0] for key in diff.get(key_type, [])}
1186
1187
1188def diff_config(old_config: dict, new_config: dict) -> str:
1189    report = f"\n# Changes to phenotype configuration\n"
1190    report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n"
1191
1192    old_concepts, old_names = extract_concepts(old_config)
1193    new_concepts, new_names = extract_concepts(new_config)
1194
1195    # Check added and removed names
1196    added_names = new_names - old_names  # Names that appear in new but not in old
1197    removed_names = old_names - new_names  # Names that were in old but not in new
1198
1199    # find file path changes for unchanged names
1200    unchanged_names = old_names & new_names  # Names that exist in both
1201    file_diff = DeepDiff(
1202        {name: old_concepts[name] for name in unchanged_names},
1203        {name: new_concepts[name] for name in unchanged_names},
1204    )
1205
1206    # Find renamed concepts (same file, different name)
1207    renamed_concepts = []
1208    for removed in removed_names:
1209        old_path = old_concepts[removed]
1210        for added in added_names:
1211            new_path = new_concepts[added]
1212            if old_path == new_path:
1213                renamed_concepts.append((removed, added))
1214
1215    # Remove renamed concepts from added and removed sets
1216    for old_name, new_name in renamed_concepts:
1217        added_names.discard(new_name)
1218        removed_names.discard(old_name)
1219
1220    # generate config report
1221    if added_names:
1222        report += "## Added Concepts\n"
1223        for name in added_names:
1224            report += f"- `{name}` (File: `{new_concepts[name]}`)\n"
1225        report += "\n"
1226
1227    if removed_names:
1228        report += "## Removed Concepts\n"
1229        for name in removed_names:
1230            report += f"- `{name}` (File: `{old_concepts[name]}`)\n"
1231        report += "\n"
1232
1233    if renamed_concepts:
1234        report += "## Renamed Concepts\n"
1235        for old_name, new_name in renamed_concepts:
1236            report += (
1237                f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n"
1238            )
1239        report += "\n"
1240
1241    if "values_changed" in file_diff:
1242        report += "## Updated File Paths\n"
1243        for name, change in file_diff["values_changed"].items():
1244            old_file = change["old_value"]
1245            new_file = change["new_value"]
1246            clean_name = name.split("root['")[1].split("']")[0]
1247            report += (
1248                f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n"
1249            )
1250        report += "\n"
1251
1252    if not (
1253        added_names
1254        or removed_names
1255        or renamed_concepts
1256        or file_diff.get("values_changed")
1257    ):
1258        report += "No changes in concept sets.\n"
1259
1260    return report
1261
1262
1263def diff_map_files(old_map_path: Path, new_map_path: Path) -> str:
1264    old_output_files = [
1265        file.name
1266        for file in old_map_path.iterdir()
1267        if file.is_file() and not file.name.startswith(".")
1268    ]
1269    new_output_files = [
1270        file.name
1271        for file in new_map_path.iterdir()
1272        if file.is_file() and not file.name.startswith(".")
1273    ]
1274
1275    # Convert the lists to sets for easy comparison
1276    old_output_set = set(old_output_files)
1277    new_output_set = set(new_output_files)
1278
1279    # Outputs that are in old_output_set but not in new_output_set (removed files)
1280    removed_outputs = old_output_set - new_output_set
1281    # Outputs that are in new_output_set but not in old_output_set (added files)
1282    added_outputs = new_output_set - old_output_set
1283    # Outputs that are the intersection of old_output_set and new_output_set
1284    common_outputs = old_output_set & new_output_set
1285
1286    report = f"\n# Changes to available translations\n"
1287    report += f"This compares the coding translations files available.\n\n"
1288    report += f"- Removed outputs: {sorted(list(removed_outputs))}\n"
1289    report += f"- Added outputs: {sorted(list(added_outputs))}\n"
1290    report += f"- Common outputs: {sorted(list(common_outputs))}\n\n"
1291
1292    # Step N: Compare common outputs between versions
1293    report += f"# Changes to concepts in translation files\n\n"
1294    report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n"
1295    for file in common_outputs:
1296        old_output = old_map_path / file
1297        new_output = new_map_path / file
1298
1299        _logger.debug(f"Old ouptput: {str(old_output.resolve())}")
1300        _logger.debug(f"New ouptput: {str(new_output.resolve())}")
1301
1302        df1 = pd.read_csv(old_output)
1303        df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1304        df2 = pd.read_csv(new_output)
1305        df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1306
1307        # Check for added and removed concepts
1308        report += f"- File {file}\n"
1309        sorted_list = sorted(list(set(df1.index) - set(df2.index)))
1310        report += f"- Removed concepts {sorted_list}\n"
1311        sorted_list = sorted(list(set(df2.index) - set(df1.index)))
1312        report += f"- Added concepts {sorted_list}\n"
1313
1314        # Check for changed concepts
1315        diff = df2 - df1  # diff in counts
1316        diff = diff[
1317            (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna()
1318        ]  # get non-zero counts
1319        s = "\n"
1320        if len(diff.index) > 0:
1321            for concept, row in diff.iterrows():
1322                s += "\t - {} {}\n".format(concept, row["CONCEPT"])
1323            report += f"- Changed concepts {s}\n\n"
1324        else:
1325            report += f"- Changed concepts []\n\n"
1326
1327    return report
1328
1329
1330def diff_phen(
1331    new_phen_path: Path,
1332    new_version: str,
1333    old_phen_path: Path,
1334    old_version: str,
1335    report_path: Path,
1336):
1337    """Compare the differences between two versions of a phenotype"""
1338
1339    # validate phenotypes
1340    _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}")
1341    validate(str(old_phen_path.resolve()))
1342    _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}")
1343    validate(str(new_phen_path.resolve()))
1344
1345    # get old and new config
1346    old_config_path = old_phen_path / CONFIG_FILE
1347    with old_config_path.open("r") as file:
1348        old_config = yaml.safe_load(file)
1349    new_config_path = new_phen_path / CONFIG_FILE
1350    with new_config_path.open("r") as file:
1351        new_config = yaml.safe_load(file)
1352
1353    # write report heading
1354    report = f"# Phenotype Comparison Report\n"
1355    report += f"## Original phenotype\n"
1356    report += f"  - {old_config['phenotype']['omop']['vocabulary_id']}\n"
1357    report += f"  - {old_version}\n"
1358    report += f"  - {str(old_phen_path.resolve())}\n"
1359    report += f"## Changed phenotype:\n"
1360    report += f"  - {new_config['phenotype']['omop']['vocabulary_id']}\n"
1361    report += f"  - {new_version}\n"
1362    report += f"  - {str(new_phen_path.resolve())}\n"
1363
1364    # Step 1: check differences configuration files
1365    # Convert list of dicts into a dict: {name: file}
1366    report += diff_config(old_config, new_config)
1367
1368    # Step 2: check differences between map files
1369    # List files from output directories
1370    old_map_path = old_phen_path / MAP_DIR
1371    new_map_path = new_phen_path / MAP_DIR
1372    report += diff_map_files(old_map_path, new_map_path)
1373
1374    # initialise report file
1375    _logger.debug(f"Writing to report file {str(report_path.resolve())}")
1376    report_file = open(report_path, "w")
1377    report_file.write(report)
1378    report_file.close()
1379
1380    _logger.info(f"Phenotypes diff'd successfully")
1381
1382
1383def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str):
1384    # make tmp directory .acmc
1385    timestamp = time.strftime("%Y%m%d_%H%M%S")
1386    temp_dir = Path(f".acmc/diff_{timestamp}")
1387
1388    changed_phen_path = Path(phen_dir)
1389    if not changed_phen_path.exists():
1390        raise ValueError(
1391            f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}"
1392        )
1393
1394    old_phen_path = Path(old_phen_dir)
1395    if not old_phen_path.exists():
1396        raise ValueError(
1397            f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}"
1398        )
1399
1400    t_path = old_phen_path / "config.yml"
1401    with t_path.open("r") as file:
1402        c = yaml.safe_load(file)
1403
1404    try:
1405        # Create the directory
1406        temp_dir.mkdir(parents=True, exist_ok=True)
1407        _logger.debug(f"Temporary directory created: {temp_dir}")
1408
1409        # Create temporary directories
1410        changed_path = temp_dir / "changed"
1411        changed_path.mkdir(parents=True, exist_ok=True)
1412        old_path = temp_dir / "old"
1413        old_path.mkdir(parents=True, exist_ok=True)
1414
1415        # checkout changed
1416        if version == "latest":
1417            _logger.debug(
1418                f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..."
1419            )
1420            shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True)
1421        else:
1422            _logger.debug(
1423                f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..."
1424            )
1425            changed_repo = git.Repo.clone_from(changed_phen_path, changed_path)
1426            changed_repo.git.checkout(version)
1427
1428        # checkout old
1429        if old_version == "latest":
1430            _logger.debug(
1431                f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1432            )
1433            shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True)
1434        else:
1435            _logger.debug(
1436                f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1437            )
1438            old_repo = git.Repo.clone_from(old_phen_dir, old_path)
1439            old_repo.git.checkout(old_version)
1440
1441        report_filename = f"{version}_{old_version}_diff.md"
1442        report_path = changed_phen_path / report_filename
1443        # diff old with new
1444        diff_phen(changed_path, version, old_path, old_version, report_path)
1445
1446    finally:
1447        # clean up tmp directory
1448        if temp_dir.exists():
1449            shutil.rmtree(temp_dir)
1450            print(f"Temporary directory removed: {temp_dir}")
PHEN_DIR = 'phen'

Default phenotype directory name

DEFAULT_PHEN_PATH = PosixPath('workspace/phen')

Default phenotype directory path

CONCEPTS_DIR = 'concepts'

Default concepts directory name

MAP_DIR = 'map'

Default map directory name

CONCEPT_SET_DIR = 'concept-sets'

Default concept set directory name

CSV_PATH = PosixPath('concept-sets/csv')

Default CSV concept set directory path

OMOP_PATH = PosixPath('concept-sets/omop')

Default OMOP concept set directory path

DEFAULT_PHEN_DIR_LIST = ['concepts', 'map', 'concept-sets']

List of default phenotype directories

CONFIG_FILE = 'config.yml'

Default configuration filename

VOCAB_VERSION_FILE = 'vocab_version.yml'

Default vocabulary version filename

SEMANTIC_VERSION_TYPES = ['major', 'minor', 'patch']

List of semantic version increment types

DEFAULT_VERSION_INC = 'patch'

Default semantic version increment type

DEFAULT_GIT_BRANCH = 'main'

Default phenotype repo branch name

SPLIT_COL_ACTION = 'split_col'

Split column preprocessing action type

CODES_COL_ACTION = 'codes_col'

Codes column preprocessing action type

DIVIDE_COL_ACTION = 'divide_col'

Divide column preprocessing action type

COL_ACTIONS = ['split_col', 'codes_col', 'divide_col']

List of column preprocessing action types

CODE_FILE_TYPES = ['.xlsx', '.xls', '.csv']

List of supported source concept coding list file types

CONFIG_SCHEMA = {'phenotype': {'type': 'dict', 'required': True, 'schema': {'version': {'type': 'string', 'required': True, 'regex': '^\\d+\\.\\d+\\.\\d+$'}, 'omop': {'type': 'dict', 'required': True, 'schema': {'vocabulary_id': {'type': 'string', 'required': True}, 'vocabulary_name': {'type': 'string', 'required': True}, 'vocabulary_reference': {'type': 'string', 'required': True, 'regex': '^https?://.*'}}}, 'map': {'type': 'list', 'schema': {'type': 'string', 'allowed': ['atc', 'opcs4', 'read3', 'read2', 'snomed', 'icd10']}}, 'concept_sets': {'type': 'list', 'required': True, 'schema': {'type': 'dict', 'schema': {'name': {'type': 'string', 'required': True}, 'file': {'type': 'dict', 'required': False, 'schema': {'path': {'type': 'string', 'required': True}, 'columns': {'type': 'dict', 'required': True}, 'category': {'type': 'string'}, 'actions': {'type': 'dict', 'schema': {'divide_col': {'type': 'string'}}}}}}}}}}}

Phenotype config.yml schema definition

class PhenValidationException(builtins.Exception):
157class PhenValidationException(Exception):
158    """Custom exception class raised when validation errors in phenotype configuration file"""
159
160    def __init__(self, message, validation_errors=None):
161        super().__init__(message)
162        self.validation_errors = validation_errors

Custom exception class raised when validation errors in phenotype configuration file

PhenValidationException(message, validation_errors=None)
160    def __init__(self, message, validation_errors=None):
161        super().__init__(message)
162        self.validation_errors = validation_errors
validation_errors
def init(phen_dir: str, remote_url: str):
231def init(phen_dir: str, remote_url: str):
232    """Initial phenotype directory as git repo with standard structure"""
233    _logger.info(f"Initialising Phenotype in directory: {phen_dir}")
234    phen_path = Path(phen_dir)
235
236    # check if directory already exists and ask user if they want to recreate it
237    if (
238        phen_path.exists() and phen_path.is_dir()
239    ):  # Check if it exists and is a directory
240        configure = _check_delete_dir(
241            phen_path,
242            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
243        )
244    else:
245        configure = True
246
247    if not configure:
248        _logger.info(f"Exiting, phenotype not initiatised")
249        return
250
251    # Initialise repo from local or remote
252    repo: Repo
253
254    # if remote then clone the repo otherwise init a local repo
255    if remote_url != None:
256        # add PAT token to the URL
257        git_url = _construct_git_url(remote_url)
258
259        # clone the repo
260        git_cmd = git.cmd.Git()
261        git_cmd.clone(git_url, phen_path)
262
263        # open repo
264        repo = Repo(phen_path)
265        # check if there are any commits (new repo has no commits)
266        if (
267            len(repo.branches) == 0 or repo.head.is_detached
268        ):  # Handle detached HEAD (e.g., after init)
269            _logger.debug("The phen repository has no commits yet.")
270            commit_count = 0
271        else:
272            # Get the total number of commits in the default branch
273            commit_count = sum(1 for _ in repo.iter_commits())
274            _logger.debug(f"Repo has previous commits: {commit_count}")
275    else:
276        # local repo, create the directories and init
277        phen_path.mkdir(parents=True, exist_ok=True)
278        _logger.debug(f"Phen directory '{phen_path}' has been created.")
279        repo = git.Repo.init(phen_path)
280        commit_count = 0
281
282    phen_path = phen_path.resolve()
283    # initialise empty repos
284    if commit_count == 0:
285        # create initial commit
286        initial_file_path = phen_path / "README.md"
287        with open(initial_file_path, "w") as file:
288            file.write(
289                "# Initial commit\nThis is the first commit in the phen repository.\n"
290            )
291        repo.index.add([initial_file_path])
292        repo.index.commit("Initial commit")
293        commit_count = 1
294
295    # Checkout the phens default branch, creating it if it does not exist
296    if DEFAULT_GIT_BRANCH in repo.branches:
297        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
298        main_branch.checkout()
299    else:
300        main_branch = repo.create_head(DEFAULT_GIT_BRANCH)
301        main_branch.checkout()
302
303    # if the phen path does not contain the config file then initialise the phen type
304    config_path = phen_path / CONFIG_FILE
305    if config_path.exists():
306        _logger.debug(f"Phenotype configuration files already exist")
307        return
308
309    _logger.info("Creating phen directory structure and config files")
310    for d in DEFAULT_PHEN_DIR_LIST:
311        _create_empty_git_dir(phen_path / d)
312
313    # create empty phen config file
314    config = {
315        "phenotype": {
316            "version": "0.0.0",
317            "omop": {
318                "vocabulary_id": "",
319                "vocabulary_name": "",
320                "vocabulary_reference": "",
321            },
322            "translate": [],
323            "concept_sets": [],
324        }
325    }
326
327    with open(phen_path / CONFIG_FILE, "w") as file:
328        yaml.dump(
329            config,
330            file,
331            Dumper=util.QuotedDumper,
332            default_flow_style=False,
333            sort_keys=False,
334            default_style='"',
335        )
336
337    # add git ignore
338    ignore_content = """# Ignore SQLite database files
339*.db
340*.sqlite3
341 
342# Ignore SQLite journal and metadata files
343*.db-journal
344*.sqlite3-journal
345
346# python
347.ipynb_checkpoints
348 """
349    ignore_path = phen_path / ".gitignore"
350    with open(ignore_path, "w") as file:
351        file.write(ignore_content)
352
353    # add to git repo and commit
354    for d in DEFAULT_PHEN_DIR_LIST:
355        repo.git.add(phen_path / d)
356    repo.git.add(all=True)
357    repo.index.commit("initialised the phen git repo.")
358
359    _logger.info(f"Phenotype initialised successfully")

Initial phenotype directory as git repo with standard structure

def fork( phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
362def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
363    """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
364
365    Args:
366        phen_dir (str): local directory path where the upstream repo is to be cloned
367        upstream_url (str): url to the upstream repo
368        upstream_version (str): version in the upstream repo to clone
369        new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
370
371    Raises:
372        ValueError: if the specified version is not in the upstream repo
373        ValueError: if the upstream repo is not a valid phenotype repo
374        ValueError: if there's any other problems with Git
375    """
376    _logger.info(
377        f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}"
378    )
379
380    phen_path = Path(phen_dir)
381    # check if directory already exists and ask user if they want to recreate it
382    if (
383        phen_path.exists() and phen_path.is_dir()
384    ):  # Check if it exists and is a directory
385        configure = _check_delete_dir(
386            phen_path,
387            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
388        )
389    else:
390        configure = True
391
392    if not configure:
393        _logger.info(f"Exiting, phenotype not initiatised")
394        return
395
396    try:
397        # Clone repo
398        git_url = _construct_git_url(upstream_url)
399        repo = git.Repo.clone_from(git_url, phen_path)
400
401        # Fetch all branches and tags
402        repo.remotes.origin.fetch()
403
404        # Check if the version exists
405        available_refs = [ref.name.split("/")[-1] for ref in repo.references]
406        if upstream_version not in available_refs:
407            raise ValueError(
408                f"Version '{upstream_version}' not found in the repository: {upstream_url}."
409            )
410
411        # Checkout the specified version
412        repo.git.checkout(upstream_version)
413        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
414        main_branch.checkout()
415
416        # Check if 'config.yml' exists in the root directory
417        config_path = phen_path / "config.yml"
418        if not os.path.isfile(config_path):
419            raise ValueError(
420                f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory."
421            )
422
423        # Validate the phenotype is compatible with the acmc tool
424        validate(str(phen_path.resolve()))
425
426        # Delete each tag locally
427        tags = repo.tags
428        for tag in tags:
429            repo.delete_tag(tag)
430            _logger.debug(f"Deleted tags from forked repo: {tag}")
431
432        # Add upstream remote
433        repo.create_remote("upstream", upstream_url)
434        remote = repo.remotes["origin"]
435        repo.delete_remote(remote)  # Remove existing origin
436
437        # Optionally set a new origin remote
438        if new_origin_url:
439            git_url = _construct_git_url(new_origin_url)
440            repo.create_remote("origin", git_url)
441            repo.git.push("--set-upstream", "origin", "main")
442
443        _logger.info(f"Repository forked successfully at {phen_path}")
444        _logger.info(f"Upstream set to {upstream_url}")
445        if new_origin_url:
446            _logger.info(f"Origin set to {new_origin_url}")
447
448    except Exception as e:
449        if phen_path.exists():
450            shutil.rmtree(phen_path)
451        raise ValueError(f"Error occurred during repository fork: {str(e)}")

Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"

Arguments:
  • phen_dir (str): local directory path where the upstream repo is to be cloned
  • upstream_url (str): url to the upstream repo
  • upstream_version (str): version in the upstream repo to clone
  • new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
Raises:
  • ValueError: if the specified version is not in the upstream repo
  • ValueError: if the upstream repo is not a valid phenotype repo
  • ValueError: if there's any other problems with Git
def validate(phen_dir: str):
454def validate(phen_dir: str):
455    """Validates the phenotype directory is a git repo with standard structure"""
456    _logger.info(f"Validating phenotype: {phen_dir}")
457    phen_path = Path(phen_dir)
458    if not phen_path.is_dir():
459        raise NotADirectoryError(
460            f"Error: '{str(phen_path.resolve())}' is not a directory"
461        )
462
463    config_path = phen_path / CONFIG_FILE
464    if not config_path.is_file():
465        raise FileNotFoundError(
466            f"Error: phen configuration file '{config_path}' does not exist."
467        )
468
469    concepts_path = phen_path / CONCEPTS_DIR
470    if not concepts_path.is_dir():
471        raise FileNotFoundError(
472            f"Error: source concepts directory {concepts_path} does not exist."
473        )
474
475    # Calidate the directory is a git repo
476    try:
477        git.Repo(phen_path)
478    except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError):
479        raise Exception(f"Phen directory {phen_path} is not a git repo")
480
481    # Load configuration File
482    if config_path.suffix == ".yml":
483        try:
484            with config_path.open("r") as file:
485                phenotype = yaml.safe_load(file)
486
487            validator = Validator(CONFIG_SCHEMA)
488            if validator.validate(phenotype):
489                _logger.debug("YAML structure is valid.")
490            else:
491                _logger.error(f"YAML structure validation failed: {validator.errors}")
492                raise Exception(f"YAML structure validation failed: {validator.errors}")
493        except yaml.YAMLError as e:
494            _logger.error(f"YAML syntax error: {e}")
495            raise e
496    else:
497        raise Exception(
498            f"Unsupported configuration filetype: {str(config_path.resolve())}"
499        )
500
501    # initiatise
502    validation_errors = []
503    phenotype = phenotype["phenotype"]
504    code_types = parse.CodeTypeParser().code_types
505
506    # check the version number is of the format vn.n.n
507    match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"])
508    if not match:
509        validation_errors.append(
510            f"Invalid version format in configuration file: {phenotype['version']}"
511        )
512
513    # create a list of all the concept set names defined in the concept set configuration
514    concept_set_names = []
515    for item in phenotype["concept_sets"]:
516        if item["name"] in concept_set_names:
517            validation_errors.append(
518                f"Duplicate concept set defined in concept sets {item['name'] }"
519            )
520        else:
521            concept_set_names.append(item["name"])
522
523    # check codes definition
524    for item in phenotype["concept_sets"]:
525        # check concepte code file exists
526        concept_code_file_path = concepts_path / item["file"]["path"]
527        if not concept_code_file_path.exists():
528            validation_errors.append(
529                f"Coding file {str(concept_code_file_path.resolve())} does not exist"
530            )
531
532        # check concepte code file is not empty
533        if concept_code_file_path.stat().st_size == 0:
534            validation_errors.append(
535                f"Coding file {str(concept_code_file_path.resolve())} is an empty file"
536            )
537
538        # check code file type is supported
539        if concept_code_file_path.suffix not in CODE_FILE_TYPES:
540            raise ValueError(
541                f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types"
542            )
543
544        # check columns specified are a supported medical coding type
545        for column in item["file"]["columns"]:
546            if column not in code_types:
547                validation_errors.append(
548                    f"Column type {column} for file {concept_code_file_path} is not supported"
549                )
550
551        # check the actions are supported
552        if "actions" in item["file"]:
553            for action in item["file"]["actions"]:
554                if action not in COL_ACTIONS:
555                    validation_errors.append(f"Action {action} is not supported")
556
557    if len(validation_errors) > 0:
558        _logger.error(validation_errors)
559        raise PhenValidationException(
560            f"Configuration file {str(config_path.resolve())} failed validation",
561            validation_errors,
562        )
563
564    _logger.info(f"Phenotype validated successfully")

Validates the phenotype directory is a git repo with standard structure

def translate_codes( source_df: pandas.core.frame.DataFrame, target_code_type: str, concept_name: str) -> pandas.core.frame.DataFrame:
658def translate_codes(
659    source_df: pd.DataFrame, target_code_type: str, concept_name: str
660) -> pd.DataFrame:
661    """Translates each source code type the source coding list into a target type and returns all conversions as a concept set"""
662
663    # codes = pd.DataFrame([], dtype=str)
664    codes = pd.DataFrame(
665        columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string"
666    )
667    # Convert codes to target type
668    _logger.info(f"Converting to target code type {target_code_type}")
669
670    for source_code_type in source_df.columns:
671        # if target code type is the same as thet source code type, no translation, just appending source as target
672        if source_code_type == target_code_type:
673            copy_df = pd.DataFrame(
674                {
675                    "SOURCE_CONCEPT": source_df[source_code_type],
676                    "SOURCE_CONCEPT_TYPE": source_code_type,
677                    "CONCEPT": source_df[source_code_type],
678                }
679            )
680            codes = pd.concat([codes, copy_df])
681            _logger.debug(
682                f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating"
683            )
684        else:
685            # get the translation filename using source to target code types
686            filename = f"{source_code_type}_to_{target_code_type}.parquet"
687            map_path = trud.PROCESSED_PATH / filename
688
689            # do the mapping if it exists
690            if map_path.exists():
691                # get mapping
692                df_map = pd.read_parquet(map_path)
693
694                # do mapping
695                translated_df = pd.merge(
696                    source_df[source_code_type], df_map, how="left"
697                )
698
699                # normalise the output
700                translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"])
701                translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type
702
703                # add to list of codes
704                codes = pd.concat([codes, translated_df])
705
706            else:
707                _logger.warning(
708                    f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist"
709                )
710
711    codes = codes.dropna()  # delete NaNs
712
713    # added concept set type to output if any translations
714    if len(codes.index) > 0:
715        codes["CONCEPT_SET"] = concept_name
716    else:
717        _logger.debug(f"No codes converted with target code type {target_code_type}")
718
719    return codes

Translates each source code type the source coding list into a target type and returns all conversions as a concept set

def write_vocab_version(phen_path: pathlib.Path):
740def write_vocab_version(phen_path: Path):
741    # write the vocab version files
742
743    if not trud.VERSION_PATH.exists():
744        raise FileNotFoundError(
745            f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed"
746        )
747
748    if not omop.VERSION_PATH.exists():
749        raise FileNotFoundError(
750            f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed"
751        )
752
753    with trud.VERSION_PATH.open("r") as file:
754        trud_version = yaml.safe_load(file)
755
756    with omop.VERSION_PATH.open("r") as file:
757        omop_version = yaml.safe_load(file)
758
759    # Create the combined YAML structure
760    version_data = {
761        "versions": {
762            "acmc": acmc.__version__,
763            "trud": trud_version,
764            "omop": omop_version,
765        }
766    }
767
768    with open(phen_path / VOCAB_VERSION_FILE, "w") as file:
769        yaml.dump(
770            version_data,
771            file,
772            Dumper=util.QuotedDumper,
773            default_flow_style=False,
774            sort_keys=False,
775            default_style='"',
776        )
def map(phen_dir: str, target_code_type: str):
779def map(phen_dir: str, target_code_type: str):
780    _logger.info(f"Processing phenotype: {phen_dir}")
781
782    # Validate configuration
783    validate(phen_dir)
784
785    # initialise paths
786    phen_path = Path(phen_dir)
787    config_path = phen_path / CONFIG_FILE
788
789    # load configuration
790    with config_path.open("r") as file:
791        config = yaml.safe_load(file)
792    phenotype = config["phenotype"]
793
794    if len(phenotype["map"]) == 0:
795        raise ValueError(f"No map codes defined in the phenotype configuration")
796
797    if target_code_type is not None and target_code_type not in phenotype["map"]:
798        raise ValueError(
799            f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}"
800        )
801
802    if target_code_type is not None:
803        _map_target_code_type(phen_path, phenotype, target_code_type)
804    else:
805        for t in phenotype["map"]:
806            _map_target_code_type(phen_path, phenotype, t)
807
808    _logger.info(f"Phenotype processed successfully")
def publish(phen_dir: str, msg: str, remote_url: str, increment: str = 'patch'):
1003def publish(
1004    phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC
1005):
1006    """Publishes updates to the phenotype by commiting all changes to the repo directory"""
1007
1008    # Validate config
1009    validate(phen_dir)
1010    phen_path = Path(phen_dir)
1011
1012    # load git repo and set the branch
1013    repo = git.Repo(phen_path)
1014    if DEFAULT_GIT_BRANCH in repo.branches:
1015        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
1016        main_branch.checkout()
1017    else:
1018        raise AttributeError(
1019            f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}"
1020        )
1021
1022    # check if any changes to publish
1023    if not repo.is_dirty() and not repo.untracked_files:
1024        if remote_url is not None and "origin" not in repo.remotes:
1025            _logger.info(f"First publish to remote url {remote_url}")
1026        else:
1027            _logger.info("Nothing to publish, no changes to the repo")
1028            return
1029
1030    # get next version
1031    new_version_str = _generate_version_tag(repo, increment)
1032    _logger.info(f"New version: {new_version_str}")
1033
1034    # Write version in configuration file
1035    config_path = phen_path / CONFIG_FILE
1036    with config_path.open("r") as file:
1037        config = yaml.safe_load(file)
1038
1039    config["phenotype"]["version"] = new_version_str
1040    with open(config_path, "w") as file:
1041        yaml.dump(
1042            config,
1043            file,
1044            Dumper=util.QuotedDumper,
1045            default_flow_style=False,
1046            sort_keys=False,
1047            default_style='"',
1048        )
1049
1050    # Add and commit changes to repo including version updates
1051    commit_message = f"Committing updates to phenotype {phen_path}"
1052    repo.git.add("--all")
1053    repo.index.commit(commit_message)
1054
1055    # Add tag to the repo
1056    repo.create_tag(new_version_str)
1057
1058    # push to origin if a remote repo
1059    if remote_url is not None and "origin" not in repo.remotes:
1060        git_url = _construct_git_url(remote_url)
1061        repo.create_remote("origin", git_url)
1062
1063    try:
1064        if "origin" in repo.remotes:
1065            _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}")
1066            origin = repo.remotes.origin
1067            _logger.info(f"Pushing main branch to remote repo")
1068            repo.git.push("--set-upstream", "origin", "main")
1069            _logger.info(f"Pushing version tags to remote git repo")
1070            origin.push(tags=True)
1071            _logger.debug("Changes pushed to 'origin'")
1072        else:
1073            _logger.debug("Remote 'origin' is not set")
1074    except Exception as e:
1075        tag_ref = repo.tags[new_version_str]
1076        repo.delete_tag(tag_ref)
1077        repo.git.reset("--soft", "HEAD~1")
1078        raise e
1079
1080    _logger.info(f"Phenotype published successfully")

Publishes updates to the phenotype by commiting all changes to the repo directory

def export(phen_dir: str, version: str):
1083def export(phen_dir: str, version: str):
1084    """Exports a phen repo at a specific tagged version into a target directory"""
1085    _logger.info(f"Exporting phenotype {phen_dir} at version {version}")
1086
1087    # validate configuration
1088    validate(phen_dir)
1089    phen_path = Path(phen_dir)
1090
1091    # load configuration
1092    config_path = phen_path / CONFIG_FILE
1093    with config_path.open("r") as file:
1094        config = yaml.safe_load(file)
1095
1096    map_path = phen_path / MAP_DIR
1097    if not map_path.exists():
1098        _logger.warning(f"Map path does not exist '{map_path}'")
1099
1100    export_path = phen_path / OMOP_PATH
1101    # check export directory exists and if not create it
1102    if not export_path.exists():
1103        export_path.mkdir(parents=True)
1104        _logger.debug(f"OMOP export directory '{export_path}' created.")
1105
1106    # omop export db
1107    export_db_path = omop.export(
1108        map_path,
1109        export_path,
1110        config["phenotype"]["version"],
1111        config["phenotype"]["omop"],
1112    )
1113
1114    # write to tables
1115    # export as csv
1116    _logger.info(f"Phenotype exported successfully")

Exports a phen repo at a specific tagged version into a target directory

def copy(phen_dir: str, target_dir: str, version: str):
1119def copy(phen_dir: str, target_dir: str, version: str):
1120    """Copys a phen repo at a specific tagged version into a target directory"""
1121
1122    # Validate
1123    validate(phen_dir)
1124    phen_path = Path(phen_dir)
1125
1126    # Check target directory exists
1127    target_path = Path(target_dir)
1128    if not target_path.exists():
1129        raise FileNotFoundError(f"The target directory {target_path} does not exist")
1130
1131    # Set copy directory
1132    copy_path = target_path / version
1133    _logger.info(f"Copying repo {phen_path} to {copy_path}")
1134
1135    if (
1136        copy_path.exists() and copy_path.is_dir()
1137    ):  # Check if it exists and is a directory
1138        copy = _check_delete_dir(
1139            copy_path,
1140            f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ",
1141        )
1142    else:
1143        copy = True
1144
1145    if not copy:
1146        _logger.info(f"Not copying the version {version}")
1147        return
1148
1149    _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...")
1150    repo = git.Repo.clone_from(phen_path, copy_path)
1151
1152    # Check out the latest commit or specified version
1153    if version:
1154        # Checkout a specific version (e.g., branch, tag, or commit hash)
1155        _logger.info(f"Checking out version {version}...")
1156        repo.git.checkout(version)
1157    else:
1158        # Checkout the latest commit (HEAD)
1159        _logger.info(f"Checking out the latest commit...")
1160        repo.git.checkout("HEAD")
1161
1162    _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}")
1163
1164    _logger.info(f"Phenotype copied successfully")

Copys a phen repo at a specific tagged version into a target directory

def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1168def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1169    """Extracts concepts as {name: file_path} dictionary and a name set."""
1170    concepts_dict = {
1171        item["name"]: item["file"]["path"]
1172        for item in config_data["phenotype"]["concept_sets"]
1173    }
1174    name_set = set(concepts_dict.keys())
1175    return concepts_dict, name_set

Extracts concepts as {name: file_path} dictionary and a name set.

def diff_config(old_config: dict, new_config: dict) -> str:
1189def diff_config(old_config: dict, new_config: dict) -> str:
1190    report = f"\n# Changes to phenotype configuration\n"
1191    report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n"
1192
1193    old_concepts, old_names = extract_concepts(old_config)
1194    new_concepts, new_names = extract_concepts(new_config)
1195
1196    # Check added and removed names
1197    added_names = new_names - old_names  # Names that appear in new but not in old
1198    removed_names = old_names - new_names  # Names that were in old but not in new
1199
1200    # find file path changes for unchanged names
1201    unchanged_names = old_names & new_names  # Names that exist in both
1202    file_diff = DeepDiff(
1203        {name: old_concepts[name] for name in unchanged_names},
1204        {name: new_concepts[name] for name in unchanged_names},
1205    )
1206
1207    # Find renamed concepts (same file, different name)
1208    renamed_concepts = []
1209    for removed in removed_names:
1210        old_path = old_concepts[removed]
1211        for added in added_names:
1212            new_path = new_concepts[added]
1213            if old_path == new_path:
1214                renamed_concepts.append((removed, added))
1215
1216    # Remove renamed concepts from added and removed sets
1217    for old_name, new_name in renamed_concepts:
1218        added_names.discard(new_name)
1219        removed_names.discard(old_name)
1220
1221    # generate config report
1222    if added_names:
1223        report += "## Added Concepts\n"
1224        for name in added_names:
1225            report += f"- `{name}` (File: `{new_concepts[name]}`)\n"
1226        report += "\n"
1227
1228    if removed_names:
1229        report += "## Removed Concepts\n"
1230        for name in removed_names:
1231            report += f"- `{name}` (File: `{old_concepts[name]}`)\n"
1232        report += "\n"
1233
1234    if renamed_concepts:
1235        report += "## Renamed Concepts\n"
1236        for old_name, new_name in renamed_concepts:
1237            report += (
1238                f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n"
1239            )
1240        report += "\n"
1241
1242    if "values_changed" in file_diff:
1243        report += "## Updated File Paths\n"
1244        for name, change in file_diff["values_changed"].items():
1245            old_file = change["old_value"]
1246            new_file = change["new_value"]
1247            clean_name = name.split("root['")[1].split("']")[0]
1248            report += (
1249                f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n"
1250            )
1251        report += "\n"
1252
1253    if not (
1254        added_names
1255        or removed_names
1256        or renamed_concepts
1257        or file_diff.get("values_changed")
1258    ):
1259        report += "No changes in concept sets.\n"
1260
1261    return report
def diff_map_files(old_map_path: pathlib.Path, new_map_path: pathlib.Path) -> str:
1264def diff_map_files(old_map_path: Path, new_map_path: Path) -> str:
1265    old_output_files = [
1266        file.name
1267        for file in old_map_path.iterdir()
1268        if file.is_file() and not file.name.startswith(".")
1269    ]
1270    new_output_files = [
1271        file.name
1272        for file in new_map_path.iterdir()
1273        if file.is_file() and not file.name.startswith(".")
1274    ]
1275
1276    # Convert the lists to sets for easy comparison
1277    old_output_set = set(old_output_files)
1278    new_output_set = set(new_output_files)
1279
1280    # Outputs that are in old_output_set but not in new_output_set (removed files)
1281    removed_outputs = old_output_set - new_output_set
1282    # Outputs that are in new_output_set but not in old_output_set (added files)
1283    added_outputs = new_output_set - old_output_set
1284    # Outputs that are the intersection of old_output_set and new_output_set
1285    common_outputs = old_output_set & new_output_set
1286
1287    report = f"\n# Changes to available translations\n"
1288    report += f"This compares the coding translations files available.\n\n"
1289    report += f"- Removed outputs: {sorted(list(removed_outputs))}\n"
1290    report += f"- Added outputs: {sorted(list(added_outputs))}\n"
1291    report += f"- Common outputs: {sorted(list(common_outputs))}\n\n"
1292
1293    # Step N: Compare common outputs between versions
1294    report += f"# Changes to concepts in translation files\n\n"
1295    report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n"
1296    for file in common_outputs:
1297        old_output = old_map_path / file
1298        new_output = new_map_path / file
1299
1300        _logger.debug(f"Old ouptput: {str(old_output.resolve())}")
1301        _logger.debug(f"New ouptput: {str(new_output.resolve())}")
1302
1303        df1 = pd.read_csv(old_output)
1304        df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1305        df2 = pd.read_csv(new_output)
1306        df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1307
1308        # Check for added and removed concepts
1309        report += f"- File {file}\n"
1310        sorted_list = sorted(list(set(df1.index) - set(df2.index)))
1311        report += f"- Removed concepts {sorted_list}\n"
1312        sorted_list = sorted(list(set(df2.index) - set(df1.index)))
1313        report += f"- Added concepts {sorted_list}\n"
1314
1315        # Check for changed concepts
1316        diff = df2 - df1  # diff in counts
1317        diff = diff[
1318            (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna()
1319        ]  # get non-zero counts
1320        s = "\n"
1321        if len(diff.index) > 0:
1322            for concept, row in diff.iterrows():
1323                s += "\t - {} {}\n".format(concept, row["CONCEPT"])
1324            report += f"- Changed concepts {s}\n\n"
1325        else:
1326            report += f"- Changed concepts []\n\n"
1327
1328    return report
def diff_phen( new_phen_path: pathlib.Path, new_version: str, old_phen_path: pathlib.Path, old_version: str, report_path: pathlib.Path):
1331def diff_phen(
1332    new_phen_path: Path,
1333    new_version: str,
1334    old_phen_path: Path,
1335    old_version: str,
1336    report_path: Path,
1337):
1338    """Compare the differences between two versions of a phenotype"""
1339
1340    # validate phenotypes
1341    _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}")
1342    validate(str(old_phen_path.resolve()))
1343    _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}")
1344    validate(str(new_phen_path.resolve()))
1345
1346    # get old and new config
1347    old_config_path = old_phen_path / CONFIG_FILE
1348    with old_config_path.open("r") as file:
1349        old_config = yaml.safe_load(file)
1350    new_config_path = new_phen_path / CONFIG_FILE
1351    with new_config_path.open("r") as file:
1352        new_config = yaml.safe_load(file)
1353
1354    # write report heading
1355    report = f"# Phenotype Comparison Report\n"
1356    report += f"## Original phenotype\n"
1357    report += f"  - {old_config['phenotype']['omop']['vocabulary_id']}\n"
1358    report += f"  - {old_version}\n"
1359    report += f"  - {str(old_phen_path.resolve())}\n"
1360    report += f"## Changed phenotype:\n"
1361    report += f"  - {new_config['phenotype']['omop']['vocabulary_id']}\n"
1362    report += f"  - {new_version}\n"
1363    report += f"  - {str(new_phen_path.resolve())}\n"
1364
1365    # Step 1: check differences configuration files
1366    # Convert list of dicts into a dict: {name: file}
1367    report += diff_config(old_config, new_config)
1368
1369    # Step 2: check differences between map files
1370    # List files from output directories
1371    old_map_path = old_phen_path / MAP_DIR
1372    new_map_path = new_phen_path / MAP_DIR
1373    report += diff_map_files(old_map_path, new_map_path)
1374
1375    # initialise report file
1376    _logger.debug(f"Writing to report file {str(report_path.resolve())}")
1377    report_file = open(report_path, "w")
1378    report_file.write(report)
1379    report_file.close()
1380
1381    _logger.info(f"Phenotypes diff'd successfully")

Compare the differences between two versions of a phenotype

def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str):
1384def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str):
1385    # make tmp directory .acmc
1386    timestamp = time.strftime("%Y%m%d_%H%M%S")
1387    temp_dir = Path(f".acmc/diff_{timestamp}")
1388
1389    changed_phen_path = Path(phen_dir)
1390    if not changed_phen_path.exists():
1391        raise ValueError(
1392            f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}"
1393        )
1394
1395    old_phen_path = Path(old_phen_dir)
1396    if not old_phen_path.exists():
1397        raise ValueError(
1398            f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}"
1399        )
1400
1401    t_path = old_phen_path / "config.yml"
1402    with t_path.open("r") as file:
1403        c = yaml.safe_load(file)
1404
1405    try:
1406        # Create the directory
1407        temp_dir.mkdir(parents=True, exist_ok=True)
1408        _logger.debug(f"Temporary directory created: {temp_dir}")
1409
1410        # Create temporary directories
1411        changed_path = temp_dir / "changed"
1412        changed_path.mkdir(parents=True, exist_ok=True)
1413        old_path = temp_dir / "old"
1414        old_path.mkdir(parents=True, exist_ok=True)
1415
1416        # checkout changed
1417        if version == "latest":
1418            _logger.debug(
1419                f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..."
1420            )
1421            shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True)
1422        else:
1423            _logger.debug(
1424                f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..."
1425            )
1426            changed_repo = git.Repo.clone_from(changed_phen_path, changed_path)
1427            changed_repo.git.checkout(version)
1428
1429        # checkout old
1430        if old_version == "latest":
1431            _logger.debug(
1432                f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1433            )
1434            shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True)
1435        else:
1436            _logger.debug(
1437                f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1438            )
1439            old_repo = git.Repo.clone_from(old_phen_dir, old_path)
1440            old_repo.git.checkout(old_version)
1441
1442        report_filename = f"{version}_{old_version}_diff.md"
1443        report_path = changed_phen_path / report_filename
1444        # diff old with new
1445        diff_phen(changed_path, version, old_path, old_version, report_path)
1446
1447    finally:
1448        # clean up tmp directory
1449        if temp_dir.exists():
1450            shutil.rmtree(temp_dir)
1451            print(f"Temporary directory removed: {temp_dir}")