acmc.phen

phenotype.py module

This module provides functionality for managing phenotypes.

   1"""
   2phenotype.py module
   3
   4This module provides functionality for managing phenotypes.
   5"""
   6
   7import argparse
   8import pandas as pd
   9import numpy as np
  10import json
  11import os
  12import sqlite3
  13import sys
  14import shutil
  15import time
  16import git
  17import re
  18import logging
  19import requests
  20import yaml
  21import semver
  22from git import Repo
  23from cerberus import Validator  # type: ignore
  24from deepdiff import DeepDiff
  25from pathlib import Path
  26from urllib.parse import urlparse, urlunparse
  27from typing import Tuple, Set, Any
  28import acmc
  29from acmc import trud, omop, parse, util, logging_config as lc
  30
  31# set up logging
  32_logger = lc.setup_logger()
  33
  34pd.set_option("mode.chained_assignment", None)
  35
  36PHEN_DIR = "phen"
  37"""Default phenotype directory name"""
  38
  39DEFAULT_PHEN_PATH = Path("./workspace") / PHEN_DIR
  40"""Default phenotype directory path"""
  41
  42CONCEPTS_DIR = "concepts"
  43"""Default concepts directory name"""
  44
  45MAP_DIR = "map"
  46"""Default map directory name"""
  47
  48CONCEPT_SET_DIR = "concept-sets"
  49"""Default concept set directory name"""
  50
  51CSV_PATH = Path(CONCEPT_SET_DIR) / "csv"
  52"""Default CSV concept set directory path"""
  53
  54OMOP_PATH = Path(CONCEPT_SET_DIR) / "omop"
  55"""Default OMOP concept set directory path"""
  56
  57DEFAULT_PHEN_DIR_LIST = [CONCEPTS_DIR, MAP_DIR, CONCEPT_SET_DIR]
  58"""List of default phenotype directories"""
  59
  60CONFIG_FILE = "config.yml"
  61"""Default configuration filename"""
  62
  63VOCAB_VERSION_FILE = "vocab_version.yml"
  64"""Default vocabulary version filename"""
  65
  66SEMANTIC_VERSION_TYPES = ["major", "minor", "patch"]
  67"""List of semantic version increment types"""
  68
  69DEFAULT_VERSION_INC = "patch"
  70"""Default semantic version increment type"""
  71
  72DEFAULT_GIT_BRANCH = "main"
  73"""Default phenotype repo branch name"""
  74
  75SPLIT_COL_ACTION = "split_col"
  76"""Split column preprocessing action type"""
  77
  78CODES_COL_ACTION = "codes_col"
  79"""Codes column preprocessing action type"""
  80
  81DIVIDE_COL_ACTION = "divide_col"
  82"""Divide column preprocessing action type"""
  83
  84COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION]
  85"""List of column preprocessing action types"""
  86
  87CODE_FILE_TYPES = [".xlsx", ".xls", ".csv"]
  88"""List of supported source concept coding list file types"""
  89
  90# config.yaml schema
  91CONFIG_SCHEMA = {
  92    "phenotype": {
  93        "type": "dict",
  94        "required": True,
  95        "schema": {
  96            "version": {
  97                "type": "string",
  98                "required": True,
  99                "regex": r"^\d+\.\d+\.\d+$",  # Enforces 'vN.N.N' format
 100            },
 101            "omop": {
 102                "type": "dict",
 103                "required": True,
 104                "schema": {
 105                    "vocabulary_id": {"type": "string", "required": True},
 106                    "vocabulary_name": {"type": "string", "required": True},
 107                    "vocabulary_reference": {
 108                        "type": "string",
 109                        "required": True,
 110                        "regex": r"^https?://.*",  # Ensures it's a URL
 111                    },
 112                },
 113            },
 114            "map": {
 115                "type": "list",
 116                "schema": {
 117                    "type": "string",
 118                    "allowed": list(
 119                        parse.SUPPORTED_CODE_TYPES
 120                    ),  # Ensure only predefined values are allowed
 121                },
 122            },
 123            "concept_sets": {
 124                "type": "list",
 125                "required": True,
 126                "schema": {
 127                    "type": "dict",
 128                    "schema": {
 129                        "name": {"type": "string", "required": True},
 130                        "file": {
 131                            "type": "dict",
 132                            "required": False,
 133                            "schema": {
 134                                "path": {"type": "string", "required": True},
 135                                "columns": {"type": "dict", "required": True},
 136                                "category": {
 137                                    "type": "string"
 138                                },  # Optional but must be string if present
 139                                "actions": {
 140                                    "type": "dict",
 141                                    "schema": {"divide_col": {"type": "string"}},
 142                                },
 143                            },
 144                        },
 145                    },
 146                },
 147            },
 148        },
 149    }
 150}
 151"""Phenotype config.yml schema definition"""
 152
 153
 154class PhenValidationException(Exception):
 155    """Custom exception class raised when validation errors in phenotype configuration file"""
 156
 157    def __init__(self, message, validation_errors=None):
 158        super().__init__(message)
 159        self.validation_errors = validation_errors
 160
 161
 162def _construct_git_url(remote_url: str):
 163    """Constructs a git url for github or gitlab including a PAT token environment variable"""
 164    # check the url
 165    parsed_url = urlparse(remote_url)
 166
 167    # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd
 168    # need to update this function if the URL scheme is different.
 169    if "github.com" in parsed_url.netloc:
 170        # get GitHub PAT from environment variable
 171        auth = os.getenv("ACMC_GITHUB_PAT")
 172        if not auth:
 173            raise ValueError(
 174                "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable."
 175            )
 176    else:
 177        # get GitLab PAT from environment variable
 178        auth = os.getenv("ACMC_GITLAB_PAT")
 179        if not auth:
 180            raise ValueError(
 181                "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable."
 182            )
 183        auth = f"oauth2:{auth}"
 184
 185    # Construct the new URL with credentials
 186    new_netloc = f"{auth}@{parsed_url.netloc}"
 187    return urlunparse(
 188        (
 189            parsed_url.scheme,
 190            new_netloc,
 191            parsed_url.path,
 192            parsed_url.params,
 193            parsed_url.query,
 194            parsed_url.fragment,
 195        )
 196    )
 197
 198
 199def _create_empty_git_dir(path: Path):
 200    """Creates a directory with a .gitkeep file so that it's tracked in git"""
 201    path.mkdir(exist_ok=True)
 202    keep_path = path / ".gitkeep"
 203    keep_path.touch(exist_ok=True)
 204
 205
 206def _check_delete_dir(path: Path, msg: str) -> bool:
 207    """Checks on the command line if a user wants to delete a directory
 208
 209    Args:
 210        path (Path): path of the directory to be deleted
 211        msg (str): message to be displayed to the user
 212
 213    Returns:
 214        Boolean: True if deleted
 215    """
 216    deleted = False
 217
 218    user_input = input(f"{msg}").strip().lower()
 219    if user_input in ["yes", "y"]:
 220        shutil.rmtree(path)
 221        deleted = True
 222    else:
 223        _logger.info("Directory was not deleted.")
 224
 225    return deleted
 226
 227
 228def init(phen_dir: str, remote_url: str):
 229    """Initial phenotype directory as git repo with standard structure"""
 230    _logger.info(f"Initialising Phenotype in directory: {phen_dir}")
 231    phen_path = Path(phen_dir)
 232
 233    # check if directory already exists and ask user if they want to recreate it
 234    if (
 235        phen_path.exists() and phen_path.is_dir()
 236    ):  # Check if it exists and is a directory
 237        configure = _check_delete_dir(
 238            phen_path,
 239            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
 240        )
 241    else:
 242        configure = True
 243
 244    if not configure:
 245        _logger.info(f"Exiting, phenotype not initiatised")
 246        return
 247
 248    # Initialise repo from local or remote
 249    repo: Repo
 250
 251    # if remote then clone the repo otherwise init a local repo
 252    if remote_url != None:
 253        # add PAT token to the URL
 254        git_url = _construct_git_url(remote_url)
 255
 256        # clone the repo
 257        git_cmd = git.cmd.Git()
 258        git_cmd.clone(git_url, phen_path)
 259
 260        # open repo
 261        repo = Repo(phen_path)
 262        # check if there are any commits (new repo has no commits)
 263        if (
 264            len(repo.branches) == 0 or repo.head.is_detached
 265        ):  # Handle detached HEAD (e.g., after init)
 266            _logger.debug("The phen repository has no commits yet.")
 267            commit_count = 0
 268        else:
 269            # Get the total number of commits in the default branch
 270            commit_count = sum(1 for _ in repo.iter_commits())
 271            _logger.debug(f"Repo has previous commits: {commit_count}")
 272    else:
 273        # local repo, create the directories and init
 274        phen_path.mkdir(parents=True, exist_ok=True)
 275        _logger.debug(f"Phen directory '{phen_path}' has been created.")
 276        repo = git.Repo.init(phen_path)
 277        commit_count = 0
 278
 279    phen_path = phen_path.resolve()
 280    # initialise empty repos
 281    if commit_count == 0:
 282        # create initial commit
 283        initial_file_path = phen_path / "README.md"
 284        with open(initial_file_path, "w") as file:
 285            file.write(
 286                "# Initial commit\nThis is the first commit in the phen repository.\n"
 287            )
 288        repo.index.add([initial_file_path])
 289        repo.index.commit("Initial commit")
 290        commit_count = 1
 291
 292    # Checkout the phens default branch, creating it if it does not exist
 293    if DEFAULT_GIT_BRANCH in repo.branches:
 294        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
 295        main_branch.checkout()
 296    else:
 297        main_branch = repo.create_head(DEFAULT_GIT_BRANCH)
 298        main_branch.checkout()
 299
 300    # if the phen path does not contain the config file then initialise the phen type
 301    config_path = phen_path / CONFIG_FILE
 302    if config_path.exists():
 303        _logger.debug(f"Phenotype configuration files already exist")
 304        return
 305
 306    _logger.info("Creating phen directory structure and config files")
 307    for d in DEFAULT_PHEN_DIR_LIST:
 308        _create_empty_git_dir(phen_path / d)
 309
 310    # create empty phen config file
 311    config = {
 312        "phenotype": {
 313            "version": "0.0.0",
 314            "omop": {
 315                "vocabulary_id": "",
 316                "vocabulary_name": "",
 317                "vocabulary_reference": "",
 318            },
 319            "translate": [],
 320            "concept_sets": [],
 321        }
 322    }
 323
 324    with open(phen_path / CONFIG_FILE, "w") as file:
 325        yaml.dump(
 326            config,
 327            file,
 328            Dumper=util.QuotedDumper,
 329            default_flow_style=False,
 330            sort_keys=False,
 331            default_style='"',
 332        )
 333
 334    # add git ignore
 335    ignore_content = """# Ignore SQLite database files
 336*.db
 337*.sqlite3
 338 
 339# Ignore SQLite journal and metadata files
 340*.db-journal
 341*.sqlite3-journal
 342
 343# python
 344.ipynb_checkpoints
 345 """
 346    ignore_path = phen_path / ".gitignore"
 347    with open(ignore_path, "w") as file:
 348        file.write(ignore_content)
 349
 350    # add to git repo and commit
 351    for d in DEFAULT_PHEN_DIR_LIST:
 352        repo.git.add(phen_path / d)
 353    repo.git.add(all=True)
 354    repo.index.commit("initialised the phen git repo.")
 355
 356    _logger.info(f"Phenotype initialised successfully")
 357
 358
 359def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
 360    """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
 361
 362    Args:
 363        phen_dir (str): local directory path where the upstream repo is to be cloned
 364        upstream_url (str): url to the upstream repo
 365        upstream_version (str): version in the upstream repo to clone
 366        new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
 367
 368    Raises:
 369        ValueError: if the specified version is not in the upstream repo
 370        ValueError: if the upstream repo is not a valid phenotype repo
 371        ValueError: if there's any other problems with Git
 372    """
 373    _logger.info(
 374        f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}"
 375    )
 376
 377    phen_path = Path(phen_dir)
 378    # check if directory already exists and ask user if they want to recreate it
 379    if (
 380        phen_path.exists() and phen_path.is_dir()
 381    ):  # Check if it exists and is a directory
 382        configure = _check_delete_dir(
 383            phen_path,
 384            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
 385        )
 386    else:
 387        configure = True
 388
 389    if not configure:
 390        _logger.info(f"Exiting, phenotype not initiatised")
 391        return
 392
 393    try:
 394        # Clone repo
 395        git_url = _construct_git_url(upstream_url)
 396        repo = git.Repo.clone_from(git_url, phen_path)
 397
 398        # Fetch all branches and tags
 399        repo.remotes.origin.fetch()
 400
 401        # Check if the version exists
 402        available_refs = [ref.name.split("/")[-1] for ref in repo.references]
 403        if upstream_version not in available_refs:
 404            raise ValueError(
 405                f"Version '{upstream_version}' not found in the repository: {upstream_url}."
 406            )
 407
 408        # Checkout the specified version
 409        repo.git.checkout(upstream_version)
 410        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
 411        main_branch.checkout()
 412
 413        # Check if 'config.yml' exists in the root directory
 414        config_path = phen_path / "config.yml"
 415        if not os.path.isfile(config_path):
 416            raise ValueError(
 417                f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory."
 418            )
 419
 420        # Validate the phenotype is compatible with the acmc tool
 421        validate(str(phen_path.resolve()))
 422
 423        # Delete each tag locally
 424        tags = repo.tags
 425        for tag in tags:
 426            repo.delete_tag(tag)
 427            _logger.debug(f"Deleted tags from forked repo: {tag}")
 428
 429        # Add upstream remote
 430        repo.create_remote("upstream", upstream_url)
 431        remote = repo.remotes["origin"]
 432        repo.delete_remote(remote)  # Remove existing origin
 433
 434        # Optionally set a new origin remote
 435        if new_origin_url:
 436            git_url = _construct_git_url(new_origin_url)
 437            repo.create_remote("origin", git_url)
 438            repo.git.push("--set-upstream", "origin", "main")
 439
 440        _logger.info(f"Repository forked successfully at {phen_path}")
 441        _logger.info(f"Upstream set to {upstream_url}")
 442        if new_origin_url:
 443            _logger.info(f"Origin set to {new_origin_url}")
 444
 445    except Exception as e:
 446        if phen_path.exists():
 447            shutil.rmtree(phen_path)
 448        raise ValueError(f"Error occurred during repository fork: {str(e)}")
 449
 450
 451def validate(phen_dir: str):
 452    """Validates the phenotype directory is a git repo with standard structure"""
 453    _logger.info(f"Validating phenotype: {phen_dir}")
 454    phen_path = Path(phen_dir)
 455    if not phen_path.is_dir():
 456        raise NotADirectoryError(
 457            f"Error: '{str(phen_path.resolve())}' is not a directory"
 458        )
 459
 460    config_path = phen_path / CONFIG_FILE
 461    if not config_path.is_file():
 462        raise FileNotFoundError(
 463            f"Error: phen configuration file '{config_path}' does not exist."
 464        )
 465
 466    concepts_path = phen_path / CONCEPTS_DIR
 467    if not concepts_path.is_dir():
 468        raise FileNotFoundError(
 469            f"Error: source concepts directory {concepts_path} does not exist."
 470        )
 471
 472    # Calidate the directory is a git repo
 473    try:
 474        git.Repo(phen_path)
 475    except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError):
 476        raise Exception(f"Phen directory {phen_path} is not a git repo")
 477
 478    # Load configuration File
 479    if config_path.suffix == ".yml":
 480        try:
 481            with config_path.open("r") as file:
 482                phenotype = yaml.safe_load(file)
 483
 484            validator = Validator(CONFIG_SCHEMA)
 485            if validator.validate(phenotype):
 486                _logger.debug("YAML structure is valid.")
 487            else:
 488                _logger.error(f"YAML structure validation failed: {validator.errors}")
 489                raise Exception(f"YAML structure validation failed: {validator.errors}")
 490        except yaml.YAMLError as e:
 491            _logger.error(f"YAML syntax error: {e}")
 492            raise e
 493    else:
 494        raise Exception(
 495            f"Unsupported configuration filetype: {str(config_path.resolve())}"
 496        )
 497
 498    # initiatise
 499    validation_errors = []
 500    phenotype = phenotype["phenotype"]
 501    code_types = parse.CodeTypeParser().code_types
 502
 503    # check the version number is of the format vn.n.n
 504    match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"])
 505    if not match:
 506        validation_errors.append(
 507            f"Invalid version format in configuration file: {phenotype['version']}"
 508        )
 509
 510    # create a list of all the concept set names defined in the concept set configuration
 511    concept_set_names = []
 512    for item in phenotype["concept_sets"]:
 513        if item["name"] in concept_set_names:
 514            validation_errors.append(
 515                f"Duplicate concept set defined in concept sets {item['name'] }"
 516            )
 517        else:
 518            concept_set_names.append(item["name"])
 519
 520    # check codes definition
 521    for item in phenotype["concept_sets"]:
 522        # check concepte code file exists
 523        concept_code_file_path = concepts_path / item["file"]["path"]
 524        if not concept_code_file_path.exists():
 525            validation_errors.append(
 526                f"Coding file {str(concept_code_file_path.resolve())} does not exist"
 527            )
 528
 529        # check concepte code file is not empty
 530        if concept_code_file_path.stat().st_size == 0:
 531            validation_errors.append(
 532                f"Coding file {str(concept_code_file_path.resolve())} is an empty file"
 533            )
 534
 535        # check code file type is supported
 536        if concept_code_file_path.suffix not in CODE_FILE_TYPES:
 537            raise ValueError(
 538                f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types"
 539            )
 540
 541        # check columns specified are a supported medical coding type
 542        for column in item["file"]["columns"]:
 543            if column not in code_types:
 544                validation_errors.append(
 545                    f"Column type {column} for file {concept_code_file_path} is not supported"
 546                )
 547
 548        # check the actions are supported
 549        if "actions" in item["file"]:
 550            for action in item["file"]["actions"]:
 551                if action not in COL_ACTIONS:
 552                    validation_errors.append(f"Action {action} is not supported")
 553
 554    if len(validation_errors) > 0:
 555        _logger.error(validation_errors)
 556        raise PhenValidationException(
 557            f"Configuration file {str(config_path.resolve())} failed validation",
 558            validation_errors,
 559        )
 560
 561    _logger.info(f"Phenotype validated successfully")
 562
 563
 564def _read_table_file(path: Path, excel_sheet: str = ""):
 565    """
 566    Load Code List File
 567    """
 568
 569    path = path.resolve()
 570    if path.suffix == ".csv":
 571        df = pd.read_csv(path, dtype=str)
 572    elif path.suffix == ".xlsx" or path.suffix == ".xls":
 573        if excel_sheet != "":
 574            df = pd.read_excel(path, sheet_name=excel_sheet, dtype=str)
 575        else:
 576            df = pd.read_excel(path, dtype=str)
 577    elif path.suffix == ".dta":
 578        df = pd.read_stata(path)
 579    else:
 580        raise ValueError(
 581            f"Unsupported filetype {path.suffix}, only support{CODE_FILE_TYPES} code file types"
 582        )
 583
 584    return df
 585
 586
 587def _process_actions(df: pd.DataFrame, concept_set: dict) -> pd.DataFrame:
 588    # Perform Structural Changes to file before preprocessing
 589    _logger.debug("Processing file structural actions")
 590    if (
 591        "actions" in concept_set["file"]
 592        and "split_col" in concept_set["file"]["actions"]
 593        and "codes_col" in concept_set["file"]["actions"]
 594    ):
 595        split_col = concept_set["file"]["actions"]["split_col"]
 596        codes_col = concept_set["file"]["actions"]["codes_col"]
 597        _logger.debug(
 598            "Action: Splitting",
 599            split_col,
 600            "column into:",
 601            df[split_col].unique(),
 602        )
 603        codes = df[codes_col]
 604        oh = pd.get_dummies(df[split_col], dtype=bool)  # one hot encode
 605        oh = oh.where((oh != True), codes, axis=0)  # fill in 1s with codes
 606        oh[oh == False] = np.nan  # replace 0s with None
 607        df = pd.concat([df, oh], axis=1)  # merge in new columns
 608
 609    return df
 610
 611
 612def _preprocess_source_concepts(
 613    df: pd.DataFrame, concept_set: dict, code_file_path: Path
 614) -> Tuple[pd.DataFrame, list]:
 615    """Perform QA Checks on columns individually and append to df"""
 616    out = pd.DataFrame([])  # create output df to append to
 617    code_errors = []  # list of errors from processing
 618
 619    # remove unnamed columns due to extra commas, missing headers, or incorrect parsing
 620    df = df.drop(columns=[col for col in df.columns if "Unnamed" in col])
 621
 622    # Preprocess codes
 623    code_types = parse.CodeTypeParser().code_types
 624    for code_type in concept_set["file"]["columns"]:
 625        parser = code_types[code_type]
 626        _logger.info(f"Processing {code_type} codes for {code_file_path}")
 627
 628        # get codes by column name
 629        source_col_name = concept_set["file"]["columns"][code_type]
 630        codes = df[source_col_name].dropna()
 631        codes = codes.astype(str)  # convert to string
 632        codes = codes.str.strip()  # remove excess spaces
 633
 634        # process codes, validating them using parser and returning the errors
 635        codes, errors = parser.process(codes, code_file_path)
 636        if len(errors) > 0:
 637            code_errors.extend(errors)
 638            _logger.warning(f"Codes validation failed with {len(errors)} errors")
 639
 640        # add processed codes to df
 641        new_col_name = f"{source_col_name}_SOURCE"
 642        df = df.rename(columns={source_col_name: new_col_name})
 643        process_codes = pd.DataFrame({code_type: codes}).join(df)
 644        out = pd.concat(
 645            [out, process_codes],
 646            ignore_index=True,
 647        )
 648
 649    _logger.debug(out.head())
 650
 651    return out, code_errors
 652
 653
 654# Translate Df with multiple codes into single code type Series
 655def translate_codes(
 656    source_df: pd.DataFrame, target_code_type: str, concept_name: str
 657) -> pd.DataFrame:
 658    """Translates each source code type the source coding list into a target type and returns all conversions as a concept set"""
 659
 660    # codes = pd.DataFrame([], dtype=str)
 661    codes = pd.DataFrame(
 662        columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string"
 663    )
 664    # Convert codes to target type
 665    _logger.info(f"Converting to target code type {target_code_type}")
 666
 667    for source_code_type in source_df.columns:
 668        # if target code type is the same as thet source code type, no translation, just appending source as target
 669        if source_code_type == target_code_type:
 670            copy_df = pd.DataFrame(
 671                {
 672                    "SOURCE_CONCEPT": source_df[source_code_type],
 673                    "SOURCE_CONCEPT_TYPE": source_code_type,
 674                    "CONCEPT": source_df[source_code_type],
 675                }
 676            )
 677            codes = pd.concat([codes, copy_df])
 678            _logger.debug(
 679                f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating"
 680            )
 681        else:
 682            # get the translation filename using source to target code types
 683            filename = f"{source_code_type}_to_{target_code_type}.parquet"
 684            map_path = trud.PROCESSED_PATH / filename
 685
 686            # do the mapping if it exists
 687            if map_path.exists():
 688                # get mapping
 689                df_map = pd.read_parquet(map_path)
 690
 691                # do mapping
 692                translated_df = pd.merge(
 693                    source_df[source_code_type], df_map, how="left"
 694                )
 695
 696                # normalise the output
 697                translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"])
 698                translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type
 699
 700                # add to list of codes
 701                codes = pd.concat([codes, translated_df])
 702
 703            else:
 704                _logger.warning(
 705                    f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist"
 706                )
 707
 708    codes = codes.dropna()  # delete NaNs
 709
 710    # added concept set type to output if any translations
 711    if len(codes.index) > 0:
 712        codes["CONCEPT_SET"] = concept_name
 713    else:
 714        _logger.debug(f"No codes converted with target code type {target_code_type}")
 715
 716    return codes
 717
 718
 719def _write_code_errors(code_errors: list, code_errors_path: Path):
 720    err_df = pd.DataFrame(
 721        [
 722            {
 723                "CONCEPT": ", ".join(err.codes[~err.mask].tolist()),
 724                "VOCABULARY": err.code_type,
 725                "SOURCE": err.codes_file,
 726                "CAUSE": err.message,
 727            }
 728            for err in code_errors
 729        ]
 730    )
 731
 732    err_df = err_df.drop_duplicates()  # Remove Duplicates from Error file
 733    err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"])
 734    err_df.to_csv(code_errors_path, index=False, mode="w")
 735
 736
 737def write_vocab_version(phen_path: Path):
 738    # write the vocab version files
 739
 740    if not trud.VERSION_PATH.exists():
 741        raise FileNotFoundError(
 742            f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed"
 743        )
 744
 745    if not omop.VERSION_PATH.exists():
 746        raise FileNotFoundError(
 747            f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed"
 748        )
 749
 750    with trud.VERSION_PATH.open("r") as file:
 751        trud_version = yaml.safe_load(file)
 752
 753    with omop.VERSION_PATH.open("r") as file:
 754        omop_version = yaml.safe_load(file)
 755
 756    # Create the combined YAML structure
 757    version_data = {
 758        "versions": {
 759            "acmc": acmc.__version__,
 760            "trud": trud_version,
 761            "omop": omop_version,
 762        }
 763    }
 764
 765    with open(phen_path / VOCAB_VERSION_FILE, "w") as file:
 766        yaml.dump(
 767            version_data,
 768            file,
 769            Dumper=util.QuotedDumper,
 770            default_flow_style=False,
 771            sort_keys=False,
 772            default_style='"',
 773        )
 774
 775
 776def map(phen_dir: str, target_code_type: str):
 777    _logger.info(f"Processing phenotype: {phen_dir}")
 778
 779    # Validate configuration
 780    validate(phen_dir)
 781
 782    # initialise paths
 783    phen_path = Path(phen_dir)
 784    config_path = phen_path / CONFIG_FILE
 785
 786    # load configuration
 787    with config_path.open("r") as file:
 788        config = yaml.safe_load(file)
 789    phenotype = config["phenotype"]
 790
 791    if len(phenotype["map"]) == 0:
 792        raise ValueError(f"No map codes defined in the phenotype configuration")
 793
 794    if target_code_type is not None and target_code_type not in phenotype["map"]:
 795        raise ValueError(
 796            f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}"
 797        )
 798
 799    if target_code_type is not None:
 800        _map_target_code_type(phen_path, phenotype, target_code_type)
 801    else:
 802        for t in phenotype["map"]:
 803            _map_target_code_type(phen_path, phenotype, t)
 804
 805    _logger.info(f"Phenotype processed successfully")
 806
 807
 808def _map_target_code_type(phen_path: Path, phenotype: dict, target_code_type: str):
 809    _logger.debug(f"Target coding format: {target_code_type}")
 810    concepts_path = phen_path / CONCEPTS_DIR
 811    # Create output dataframe
 812    out = pd.DataFrame([])
 813    code_errors = []
 814
 815    # Process each folder in codes section
 816    for concept_set in phenotype["concept_sets"]:
 817        _logger.debug(f"--- {concept_set['file']} ---")
 818
 819        # Load code file
 820        codes_file_path = Path(concepts_path / concept_set["file"]["path"])
 821        df = _read_table_file(codes_file_path)
 822
 823        # process structural actions
 824        df = _process_actions(df, concept_set)
 825
 826        # preprocessing and validate of source concepts
 827        _logger.debug("Processing and validating source concept codes")
 828        df, errors = _preprocess_source_concepts(
 829            df,
 830            concept_set,
 831            codes_file_path,
 832        )
 833
 834        # create df with just the source code columns
 835        source_column_names = list(concept_set["file"]["columns"].keys())
 836        source_df = df[source_column_names]
 837
 838        _logger.debug(source_df.columns)
 839        _logger.debug(source_df.head())
 840
 841        _logger.debug(
 842            f"Length of errors from _preprocess_source_concepts {len(errors)}"
 843        )
 844        if len(errors) > 0:
 845            code_errors.extend(errors)
 846        _logger.debug(f" Length of code_errors {len(code_errors)}")
 847
 848        # Map source concepts codes to target codes
 849        # if processing a source coding list with categorical data
 850        if (
 851            "actions" in concept_set["file"]
 852            and "divide_col" in concept_set["file"]["actions"]
 853            and len(df) > 0
 854        ):
 855            divide_col = concept_set["file"]["actions"]["divide_col"]
 856            _logger.debug(f"Action: Dividing Table by {divide_col}")
 857            _logger.debug(f"column into: {df[divide_col].unique()}")
 858            df_grp = df.groupby(divide_col)
 859            for cat, grp in df_grp:
 860                if cat == concept_set["file"]["category"]:
 861                    grp = grp.drop(columns=[divide_col])  # delete categorical column
 862                    source_df = grp[source_column_names]
 863                    trans_out = translate_codes(
 864                        source_df,
 865                        target_code_type=target_code_type,
 866                        concept_name=concept_set["name"],
 867                    )
 868                    out = pd.concat([out, trans_out])
 869        else:
 870            source_df = df[source_column_names]
 871            trans_out = translate_codes(
 872                source_df,
 873                target_code_type=target_code_type,
 874                concept_name=concept_set["name"],
 875            )
 876            out = pd.concat([out, trans_out])
 877
 878    if len(code_errors) > 0:
 879        _logger.error(f"The map processing has {len(code_errors)} errors")
 880        error_path = phen_path / MAP_DIR / "errors"
 881        error_path.mkdir(parents=True, exist_ok=True)
 882        error_filename = f"{target_code_type}-code-errors.csv"
 883        _write_code_errors(code_errors, error_path / error_filename)
 884
 885    # Check there is output from processing
 886    if len(out.index) == 0:
 887        _logger.error(f"No output after map processing")
 888        raise Exception(
 889            f"No output after map processing, check config {str(phen_path.resolve())}"
 890        )
 891
 892    # final processing
 893    out = out.reset_index(drop=True)
 894    out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
 895    out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"])
 896
 897    out_count = len(out.index)
 898    # added metadata
 899    # Loop over each source_concept_type and perform the left join on all columns apart from source code columns
 900    result_list = []
 901    source_column_names = list(concept_set["file"]["columns"].keys())
 902    for source_concept_type in source_column_names:
 903        # Filter output based on the current source_concept_type
 904        out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type]
 905        filtered_count = len(out_filtered_df.index)
 906
 907        # Remove the source type columns except the current type will leave the metadata and the join
 908        remove_types = [
 909            type for type in source_column_names if type != source_concept_type
 910        ]
 911        metadata_df = df.drop(columns=remove_types)
 912        metadata_df = metadata_df.rename(
 913            columns={source_concept_type: "SOURCE_CONCEPT"}
 914        )
 915        metadata_df_count = len(metadata_df.index)
 916
 917        # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata
 918        result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT")
 919        result_count = len(result.index)
 920
 921        _logger.debug(
 922            f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}"
 923        )
 924
 925        # Append the result to the result_list
 926        result_list.append(result)
 927
 928    # Concatenate all the results into a single DataFrame
 929    final_out = pd.concat(result_list, ignore_index=True)
 930    final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
 931    _logger.debug(
 932        f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}"
 933    )
 934
 935    # Save output to map directory
 936    output_filename = target_code_type + ".csv"
 937    map_path = phen_path / MAP_DIR / output_filename
 938    final_out.to_csv(map_path, index=False)
 939    _logger.info(f"Saved mapped concepts to {str(map_path.resolve())}")
 940
 941    # save concept sets as separate files
 942    concept_set_path = phen_path / CSV_PATH / target_code_type
 943
 944    # empty the concept-set directory except for hiddle files, e.g. .git
 945    if concept_set_path.exists():
 946        for item in concept_set_path.iterdir():
 947            if not item.name.startswith("."):
 948                item.unlink()
 949    else:
 950        concept_set_path.mkdir(parents=True, exist_ok=True)
 951
 952    # write each concept as a separate file
 953    for name, concept in final_out.groupby("CONCEPT_SET"):
 954        concept = concept.sort_values(by="CONCEPT")  # sort rows
 955        concept = concept.dropna(how="all", axis=1)  # remove empty cols
 956        concept = concept.reindex(
 957            sorted(concept.columns), axis=1
 958        )  # sort cols alphabetically
 959        filename = f"{name}.csv"
 960        concept_path = concept_set_path / filename
 961        concept.to_csv(concept_path, index=False)
 962
 963    write_vocab_version(phen_path)
 964
 965    _logger.info(f"Phenotype processed target code type {target_code_type}")
 966
 967
 968def _generate_version_tag(
 969    repo: git.Repo, increment: str = DEFAULT_VERSION_INC, use_v_prefix: bool = False
 970) -> str:
 971    # Get all valid semantic version tags
 972    versions = []
 973    for tag in repo.tags:
 974        if tag.name.startswith("v"):
 975            tag_name = tag.name[1:]  # Remove the first character
 976        else:
 977            tag_name = tag.name
 978        if semver.Version.is_valid(tag_name):
 979            versions.append(semver.Version.parse(tag_name))
 980
 981    _logger.debug(f"Versions: {versions}")
 982    # Determine the next version
 983    if not versions:
 984        new_version = semver.Version(0, 0, 1)
 985    else:
 986        latest_version = max(versions)
 987        if increment == "major":
 988            new_version = latest_version.bump_major()
 989        elif increment == "minor":
 990            new_version = latest_version.bump_minor()
 991        else:
 992            new_version = latest_version.bump_patch()
 993
 994    # Create the new tag
 995    new_version_str = f"v{new_version}" if use_v_prefix else str(new_version)
 996
 997    return new_version_str
 998
 999
1000def publish(
1001    phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC
1002):
1003    """Publishes updates to the phenotype by commiting all changes to the repo directory"""
1004
1005    # Validate config
1006    validate(phen_dir)
1007    phen_path = Path(phen_dir)
1008
1009    # load git repo and set the branch
1010    repo = git.Repo(phen_path)
1011    if DEFAULT_GIT_BRANCH in repo.branches:
1012        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
1013        main_branch.checkout()
1014    else:
1015        raise AttributeError(
1016            f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}"
1017        )
1018
1019    # check if any changes to publish
1020    if not repo.is_dirty() and not repo.untracked_files:
1021        if remote_url is not None and "origin" not in repo.remotes:
1022            _logger.info(f"First publish to remote url {remote_url}")
1023        else:
1024            _logger.info("Nothing to publish, no changes to the repo")
1025            return
1026
1027    # get next version
1028    new_version_str = _generate_version_tag(repo, increment)
1029    _logger.info(f"New version: {new_version_str}")
1030
1031    # Write version in configuration file
1032    config_path = phen_path / CONFIG_FILE
1033    with config_path.open("r") as file:
1034        config = yaml.safe_load(file)
1035
1036    config["phenotype"]["version"] = new_version_str
1037    with open(config_path, "w") as file:
1038        yaml.dump(
1039            config,
1040            file,
1041            Dumper=util.QuotedDumper,
1042            default_flow_style=False,
1043            sort_keys=False,
1044            default_style='"',
1045        )
1046
1047    # Add and commit changes to repo including version updates
1048    commit_message = f"Committing updates to phenotype {phen_path}"
1049    repo.git.add("--all")
1050    repo.index.commit(commit_message)
1051
1052    # Add tag to the repo
1053    repo.create_tag(new_version_str)
1054
1055    # push to origin if a remote repo
1056    if remote_url is not None and "origin" not in repo.remotes:
1057        git_url = _construct_git_url(remote_url)
1058        repo.create_remote("origin", git_url)
1059
1060    try:
1061        if "origin" in repo.remotes:
1062            _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}")
1063            origin = repo.remotes.origin
1064            _logger.info(f"Pushing main branch to remote repo")
1065            repo.git.push("--set-upstream", "origin", "main")
1066            _logger.info(f"Pushing version tags to remote git repo")
1067            origin.push(tags=True)
1068            _logger.debug("Changes pushed to 'origin'")
1069        else:
1070            _logger.debug("Remote 'origin' is not set")
1071    except Exception as e:
1072        tag_ref = repo.tags[new_version_str]
1073        repo.delete_tag(tag_ref)
1074        repo.git.reset("--soft", "HEAD~1")
1075        raise e
1076
1077    _logger.info(f"Phenotype published successfully")
1078
1079
1080def export(phen_dir: str, version: str):
1081    """Exports a phen repo at a specific tagged version into a target directory"""
1082    _logger.info(f"Exporting phenotype {phen_dir} at version {version}")
1083
1084    # validate configuration
1085    validate(phen_dir)
1086    phen_path = Path(phen_dir)
1087
1088    # load configuration
1089    config_path = phen_path / CONFIG_FILE
1090    with config_path.open("r") as file:
1091        config = yaml.safe_load(file)
1092
1093    map_path = phen_path / MAP_DIR
1094    if not map_path.exists():
1095        _logger.warning(f"Map path does not exist '{map_path}'")
1096
1097    export_path = phen_path / OMOP_PATH
1098    # check export directory exists and if not create it
1099    if not export_path.exists():
1100        export_path.mkdir(parents=True)
1101        _logger.debug(f"OMOP export directory '{export_path}' created.")
1102
1103    # omop export db
1104    export_db_path = omop.export(
1105        map_path,
1106        export_path,
1107        config["phenotype"]["version"],
1108        config["phenotype"]["omop"],
1109    )
1110
1111    # write to tables
1112    # export as csv
1113    _logger.info(f"Phenotype exported successfully")
1114
1115
1116def copy(phen_dir: str, target_dir: str, version: str):
1117    """Copys a phen repo at a specific tagged version into a target directory"""
1118
1119    # Validate
1120    validate(phen_dir)
1121    phen_path = Path(phen_dir)
1122
1123    # Check target directory exists
1124    target_path = Path(target_dir)
1125    if not target_path.exists():
1126        raise FileNotFoundError(f"The target directory {target_path} does not exist")
1127
1128    # Set copy directory
1129    copy_path = target_path / version
1130    _logger.info(f"Copying repo {phen_path} to {copy_path}")
1131
1132    if (
1133        copy_path.exists() and copy_path.is_dir()
1134    ):  # Check if it exists and is a directory
1135        copy = _check_delete_dir(
1136            copy_path,
1137            f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ",
1138        )
1139    else:
1140        copy = True
1141
1142    if not copy:
1143        _logger.info(f"Not copying the version {version}")
1144        return
1145
1146    _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...")
1147    repo = git.Repo.clone_from(phen_path, copy_path)
1148
1149    # Check out the latest commit or specified version
1150    if version:
1151        # Checkout a specific version (e.g., branch, tag, or commit hash)
1152        _logger.info(f"Checking out version {version}...")
1153        repo.git.checkout(version)
1154    else:
1155        # Checkout the latest commit (HEAD)
1156        _logger.info(f"Checking out the latest commit...")
1157        repo.git.checkout("HEAD")
1158
1159    _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}")
1160
1161    _logger.info(f"Phenotype copied successfully")
1162
1163
1164# Convert concept_sets list into dictionaries
1165def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1166    """Extracts concepts as {name: file_path} dictionary and a name set."""
1167    concepts_dict = {
1168        item["name"]: item["file"]["path"]
1169        for item in config_data["phenotype"]["concept_sets"]
1170    }
1171    name_set = set(concepts_dict.keys())
1172    return concepts_dict, name_set
1173
1174
1175def _extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]:
1176    """
1177    Extracts clean keys from a DeepDiff dictionary.
1178
1179    :param diff: DeepDiff result dictionary
1180    :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed")
1181    :return: A set of clean key names
1182    """
1183    return {key.split("root['")[1].split("']")[0] for key in diff.get(key_type, [])}
1184
1185
1186def diff_config(old_config: dict, new_config: dict) -> str:
1187    report = f"\n# Changes to phenotype configuration\n"
1188    report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n"
1189
1190    old_concepts, old_names = extract_concepts(old_config)
1191    new_concepts, new_names = extract_concepts(new_config)
1192
1193    # Check added and removed names
1194    added_names = new_names - old_names  # Names that appear in new but not in old
1195    removed_names = old_names - new_names  # Names that were in old but not in new
1196
1197    # find file path changes for unchanged names
1198    unchanged_names = old_names & new_names  # Names that exist in both
1199    file_diff = DeepDiff(
1200        {name: old_concepts[name] for name in unchanged_names},
1201        {name: new_concepts[name] for name in unchanged_names},
1202    )
1203
1204    # Find renamed concepts (same file, different name)
1205    renamed_concepts = []
1206    for removed in removed_names:
1207        old_path = old_concepts[removed]
1208        for added in added_names:
1209            new_path = new_concepts[added]
1210            if old_path == new_path:
1211                renamed_concepts.append((removed, added))
1212
1213    # Remove renamed concepts from added and removed sets
1214    for old_name, new_name in renamed_concepts:
1215        added_names.discard(new_name)
1216        removed_names.discard(old_name)
1217
1218    # generate config report
1219    if added_names:
1220        report += "## Added Concepts\n"
1221        for name in added_names:
1222            report += f"- `{name}` (File: `{new_concepts[name]}`)\n"
1223        report += "\n"
1224
1225    if removed_names:
1226        report += "## Removed Concepts\n"
1227        for name in removed_names:
1228            report += f"- `{name}` (File: `{old_concepts[name]}`)\n"
1229        report += "\n"
1230
1231    if renamed_concepts:
1232        report += "## Renamed Concepts\n"
1233        for old_name, new_name in renamed_concepts:
1234            report += (
1235                f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n"
1236            )
1237        report += "\n"
1238
1239    if "values_changed" in file_diff:
1240        report += "## Updated File Paths\n"
1241        for name, change in file_diff["values_changed"].items():
1242            old_file = change["old_value"]
1243            new_file = change["new_value"]
1244            clean_name = name.split("root['")[1].split("']")[0]
1245            report += (
1246                f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n"
1247            )
1248        report += "\n"
1249
1250    if not (
1251        added_names
1252        or removed_names
1253        or renamed_concepts
1254        or file_diff.get("values_changed")
1255    ):
1256        report += "No changes in concept sets.\n"
1257
1258    return report
1259
1260
1261def diff_map_files(old_map_path: Path, new_map_path: Path) -> str:
1262    old_output_files = [
1263        file.name
1264        for file in old_map_path.iterdir()
1265        if file.is_file() and not file.name.startswith(".")
1266    ]
1267    new_output_files = [
1268        file.name
1269        for file in new_map_path.iterdir()
1270        if file.is_file() and not file.name.startswith(".")
1271    ]
1272
1273    # Convert the lists to sets for easy comparison
1274    old_output_set = set(old_output_files)
1275    new_output_set = set(new_output_files)
1276
1277    # Outputs that are in old_output_set but not in new_output_set (removed files)
1278    removed_outputs = old_output_set - new_output_set
1279    # Outputs that are in new_output_set but not in old_output_set (added files)
1280    added_outputs = new_output_set - old_output_set
1281    # Outputs that are the intersection of old_output_set and new_output_set
1282    common_outputs = old_output_set & new_output_set
1283
1284    report = f"\n# Changes to available translations\n"
1285    report += f"This compares the coding translations files available.\n\n"
1286    report += f"- Removed outputs: {sorted(list(removed_outputs))}\n"
1287    report += f"- Added outputs: {sorted(list(added_outputs))}\n"
1288    report += f"- Common outputs: {sorted(list(common_outputs))}\n\n"
1289
1290    # Step N: Compare common outputs between versions
1291    report += f"# Changes to concepts in translation files\n\n"
1292    report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n"
1293    for file in common_outputs:
1294        old_output = old_map_path / file
1295        new_output = new_map_path / file
1296
1297        _logger.debug(f"Old ouptput: {str(old_output.resolve())}")
1298        _logger.debug(f"New ouptput: {str(new_output.resolve())}")
1299
1300        df1 = pd.read_csv(old_output)
1301        df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1302        df2 = pd.read_csv(new_output)
1303        df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1304
1305        # Check for added and removed concepts
1306        report += f"- File {file}\n"
1307        sorted_list = sorted(list(set(df1.index) - set(df2.index)))
1308        report += f"- Removed concepts {sorted_list}\n"
1309        sorted_list = sorted(list(set(df2.index) - set(df1.index)))
1310        report += f"- Added concepts {sorted_list}\n"
1311
1312        # Check for changed concepts
1313        diff = df2 - df1  # diff in counts
1314        diff = diff[
1315            (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna()
1316        ]  # get non-zero counts
1317        s = "\n"
1318        if len(diff.index) > 0:
1319            for concept, row in diff.iterrows():
1320                s += "\t - {} {}\n".format(concept, row["CONCEPT"])
1321            report += f"- Changed concepts {s}\n\n"
1322        else:
1323            report += f"- Changed concepts []\n\n"
1324
1325    return report
1326
1327
1328def diff_phen(
1329    new_phen_path: Path,
1330    new_version: str,
1331    old_phen_path: Path,
1332    old_version: str,
1333    report_path: Path,
1334):
1335    """Compare the differences between two versions of a phenotype"""
1336
1337    # validate phenotypes
1338    _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}")
1339    validate(str(old_phen_path.resolve()))
1340    _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}")
1341    validate(str(new_phen_path.resolve()))
1342
1343    # get old and new config
1344    old_config_path = old_phen_path / CONFIG_FILE
1345    with old_config_path.open("r") as file:
1346        old_config = yaml.safe_load(file)
1347    new_config_path = new_phen_path / CONFIG_FILE
1348    with new_config_path.open("r") as file:
1349        new_config = yaml.safe_load(file)
1350
1351    # write report heading
1352    report = f"# Phenotype Comparison Report\n"
1353    report += f"## Original phenotype\n"
1354    report += f"  - {old_config['phenotype']['omop']['vocabulary_id']}\n"
1355    report += f"  - {old_version}\n"
1356    report += f"  - {str(old_phen_path.resolve())}\n"
1357    report += f"## Changed phenotype:\n"
1358    report += f"  - {new_config['phenotype']['omop']['vocabulary_id']}\n"
1359    report += f"  - {new_version}\n"
1360    report += f"  - {str(new_phen_path.resolve())}\n"
1361
1362    # Step 1: check differences configuration files
1363    # Convert list of dicts into a dict: {name: file}
1364    report += diff_config(old_config, new_config)
1365
1366    # Step 2: check differences between map files
1367    # List files from output directories
1368    old_map_path = old_phen_path / MAP_DIR
1369    new_map_path = new_phen_path / MAP_DIR
1370    report += diff_map_files(old_map_path, new_map_path)
1371
1372    # initialise report file
1373    _logger.debug(f"Writing to report file {str(report_path.resolve())}")
1374    report_file = open(report_path, "w")
1375    report_file.write(report)
1376    report_file.close()
1377
1378    _logger.info(f"Phenotypes diff'd successfully")
1379
1380
1381def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str):
1382    # make tmp directory .acmc
1383    timestamp = time.strftime("%Y%m%d_%H%M%S")
1384    temp_dir = Path(f".acmc/diff_{timestamp}")
1385
1386    changed_phen_path = Path(phen_dir)
1387    if not changed_phen_path.exists():
1388        raise ValueError(
1389            f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}"
1390        )
1391
1392    old_phen_path = Path(old_phen_dir)
1393    if not old_phen_path.exists():
1394        raise ValueError(
1395            f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}"
1396        )
1397
1398    t_path = old_phen_path / "config.yml"
1399    with t_path.open("r") as file:
1400        c = yaml.safe_load(file)
1401
1402    try:
1403        # Create the directory
1404        temp_dir.mkdir(parents=True, exist_ok=True)
1405        _logger.debug(f"Temporary directory created: {temp_dir}")
1406
1407        # Create temporary directories
1408        changed_path = temp_dir / "changed"
1409        changed_path.mkdir(parents=True, exist_ok=True)
1410        old_path = temp_dir / "old"
1411        old_path.mkdir(parents=True, exist_ok=True)
1412
1413        # checkout changed
1414        if version == "latest":
1415            _logger.debug(
1416                f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..."
1417            )
1418            shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True)
1419        else:
1420            _logger.debug(
1421                f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..."
1422            )
1423            changed_repo = git.Repo.clone_from(changed_phen_path, changed_path)
1424            changed_repo.git.checkout(version)
1425
1426        # checkout old
1427        if old_version == "latest":
1428            _logger.debug(
1429                f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1430            )
1431            shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True)
1432        else:
1433            _logger.debug(
1434                f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1435            )
1436            old_repo = git.Repo.clone_from(old_phen_dir, old_path)
1437            old_repo.git.checkout(old_version)
1438
1439        report_filename = f"{version}_{old_version}_diff.md"
1440        report_path = changed_phen_path / report_filename
1441        # diff old with new
1442        diff_phen(changed_path, version, old_path, old_version, report_path)
1443
1444    finally:
1445        # clean up tmp directory
1446        if temp_dir.exists():
1447            shutil.rmtree(temp_dir)
1448            print(f"Temporary directory removed: {temp_dir}")
PHEN_DIR = 'phen'

Default phenotype directory name

DEFAULT_PHEN_PATH = PosixPath('workspace/phen')

Default phenotype directory path

CONCEPTS_DIR = 'concepts'

Default concepts directory name

MAP_DIR = 'map'

Default map directory name

CONCEPT_SET_DIR = 'concept-sets'

Default concept set directory name

CSV_PATH = PosixPath('concept-sets/csv')

Default CSV concept set directory path

OMOP_PATH = PosixPath('concept-sets/omop')

Default OMOP concept set directory path

DEFAULT_PHEN_DIR_LIST = ['concepts', 'map', 'concept-sets']

List of default phenotype directories

CONFIG_FILE = 'config.yml'

Default configuration filename

VOCAB_VERSION_FILE = 'vocab_version.yml'

Default vocabulary version filename

SEMANTIC_VERSION_TYPES = ['major', 'minor', 'patch']

List of semantic version increment types

DEFAULT_VERSION_INC = 'patch'

Default semantic version increment type

DEFAULT_GIT_BRANCH = 'main'

Default phenotype repo branch name

SPLIT_COL_ACTION = 'split_col'

Split column preprocessing action type

CODES_COL_ACTION = 'codes_col'

Codes column preprocessing action type

DIVIDE_COL_ACTION = 'divide_col'

Divide column preprocessing action type

COL_ACTIONS = ['split_col', 'codes_col', 'divide_col']

List of column preprocessing action types

CODE_FILE_TYPES = ['.xlsx', '.xls', '.csv']

List of supported source concept coding list file types

CONFIG_SCHEMA = {'phenotype': {'type': 'dict', 'required': True, 'schema': {'version': {'type': 'string', 'required': True, 'regex': '^\\d+\\.\\d+\\.\\d+$'}, 'omop': {'type': 'dict', 'required': True, 'schema': {'vocabulary_id': {'type': 'string', 'required': True}, 'vocabulary_name': {'type': 'string', 'required': True}, 'vocabulary_reference': {'type': 'string', 'required': True, 'regex': '^https?://.*'}}}, 'map': {'type': 'list', 'schema': {'type': 'string', 'allowed': ['opcs4', 'icd10', 'atc', 'snomed', 'read2', 'read3']}}, 'concept_sets': {'type': 'list', 'required': True, 'schema': {'type': 'dict', 'schema': {'name': {'type': 'string', 'required': True}, 'file': {'type': 'dict', 'required': False, 'schema': {'path': {'type': 'string', 'required': True}, 'columns': {'type': 'dict', 'required': True}, 'category': {'type': 'string'}, 'actions': {'type': 'dict', 'schema': {'divide_col': {'type': 'string'}}}}}}}}}}}

Phenotype config.yml schema definition

class PhenValidationException(builtins.Exception):
155class PhenValidationException(Exception):
156    """Custom exception class raised when validation errors in phenotype configuration file"""
157
158    def __init__(self, message, validation_errors=None):
159        super().__init__(message)
160        self.validation_errors = validation_errors

Custom exception class raised when validation errors in phenotype configuration file

PhenValidationException(message, validation_errors=None)
158    def __init__(self, message, validation_errors=None):
159        super().__init__(message)
160        self.validation_errors = validation_errors
validation_errors
def init(phen_dir: str, remote_url: str):
229def init(phen_dir: str, remote_url: str):
230    """Initial phenotype directory as git repo with standard structure"""
231    _logger.info(f"Initialising Phenotype in directory: {phen_dir}")
232    phen_path = Path(phen_dir)
233
234    # check if directory already exists and ask user if they want to recreate it
235    if (
236        phen_path.exists() and phen_path.is_dir()
237    ):  # Check if it exists and is a directory
238        configure = _check_delete_dir(
239            phen_path,
240            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
241        )
242    else:
243        configure = True
244
245    if not configure:
246        _logger.info(f"Exiting, phenotype not initiatised")
247        return
248
249    # Initialise repo from local or remote
250    repo: Repo
251
252    # if remote then clone the repo otherwise init a local repo
253    if remote_url != None:
254        # add PAT token to the URL
255        git_url = _construct_git_url(remote_url)
256
257        # clone the repo
258        git_cmd = git.cmd.Git()
259        git_cmd.clone(git_url, phen_path)
260
261        # open repo
262        repo = Repo(phen_path)
263        # check if there are any commits (new repo has no commits)
264        if (
265            len(repo.branches) == 0 or repo.head.is_detached
266        ):  # Handle detached HEAD (e.g., after init)
267            _logger.debug("The phen repository has no commits yet.")
268            commit_count = 0
269        else:
270            # Get the total number of commits in the default branch
271            commit_count = sum(1 for _ in repo.iter_commits())
272            _logger.debug(f"Repo has previous commits: {commit_count}")
273    else:
274        # local repo, create the directories and init
275        phen_path.mkdir(parents=True, exist_ok=True)
276        _logger.debug(f"Phen directory '{phen_path}' has been created.")
277        repo = git.Repo.init(phen_path)
278        commit_count = 0
279
280    phen_path = phen_path.resolve()
281    # initialise empty repos
282    if commit_count == 0:
283        # create initial commit
284        initial_file_path = phen_path / "README.md"
285        with open(initial_file_path, "w") as file:
286            file.write(
287                "# Initial commit\nThis is the first commit in the phen repository.\n"
288            )
289        repo.index.add([initial_file_path])
290        repo.index.commit("Initial commit")
291        commit_count = 1
292
293    # Checkout the phens default branch, creating it if it does not exist
294    if DEFAULT_GIT_BRANCH in repo.branches:
295        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
296        main_branch.checkout()
297    else:
298        main_branch = repo.create_head(DEFAULT_GIT_BRANCH)
299        main_branch.checkout()
300
301    # if the phen path does not contain the config file then initialise the phen type
302    config_path = phen_path / CONFIG_FILE
303    if config_path.exists():
304        _logger.debug(f"Phenotype configuration files already exist")
305        return
306
307    _logger.info("Creating phen directory structure and config files")
308    for d in DEFAULT_PHEN_DIR_LIST:
309        _create_empty_git_dir(phen_path / d)
310
311    # create empty phen config file
312    config = {
313        "phenotype": {
314            "version": "0.0.0",
315            "omop": {
316                "vocabulary_id": "",
317                "vocabulary_name": "",
318                "vocabulary_reference": "",
319            },
320            "translate": [],
321            "concept_sets": [],
322        }
323    }
324
325    with open(phen_path / CONFIG_FILE, "w") as file:
326        yaml.dump(
327            config,
328            file,
329            Dumper=util.QuotedDumper,
330            default_flow_style=False,
331            sort_keys=False,
332            default_style='"',
333        )
334
335    # add git ignore
336    ignore_content = """# Ignore SQLite database files
337*.db
338*.sqlite3
339 
340# Ignore SQLite journal and metadata files
341*.db-journal
342*.sqlite3-journal
343
344# python
345.ipynb_checkpoints
346 """
347    ignore_path = phen_path / ".gitignore"
348    with open(ignore_path, "w") as file:
349        file.write(ignore_content)
350
351    # add to git repo and commit
352    for d in DEFAULT_PHEN_DIR_LIST:
353        repo.git.add(phen_path / d)
354    repo.git.add(all=True)
355    repo.index.commit("initialised the phen git repo.")
356
357    _logger.info(f"Phenotype initialised successfully")

Initial phenotype directory as git repo with standard structure

def fork( phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
360def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
361    """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
362
363    Args:
364        phen_dir (str): local directory path where the upstream repo is to be cloned
365        upstream_url (str): url to the upstream repo
366        upstream_version (str): version in the upstream repo to clone
367        new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
368
369    Raises:
370        ValueError: if the specified version is not in the upstream repo
371        ValueError: if the upstream repo is not a valid phenotype repo
372        ValueError: if there's any other problems with Git
373    """
374    _logger.info(
375        f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}"
376    )
377
378    phen_path = Path(phen_dir)
379    # check if directory already exists and ask user if they want to recreate it
380    if (
381        phen_path.exists() and phen_path.is_dir()
382    ):  # Check if it exists and is a directory
383        configure = _check_delete_dir(
384            phen_path,
385            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
386        )
387    else:
388        configure = True
389
390    if not configure:
391        _logger.info(f"Exiting, phenotype not initiatised")
392        return
393
394    try:
395        # Clone repo
396        git_url = _construct_git_url(upstream_url)
397        repo = git.Repo.clone_from(git_url, phen_path)
398
399        # Fetch all branches and tags
400        repo.remotes.origin.fetch()
401
402        # Check if the version exists
403        available_refs = [ref.name.split("/")[-1] for ref in repo.references]
404        if upstream_version not in available_refs:
405            raise ValueError(
406                f"Version '{upstream_version}' not found in the repository: {upstream_url}."
407            )
408
409        # Checkout the specified version
410        repo.git.checkout(upstream_version)
411        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
412        main_branch.checkout()
413
414        # Check if 'config.yml' exists in the root directory
415        config_path = phen_path / "config.yml"
416        if not os.path.isfile(config_path):
417            raise ValueError(
418                f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory."
419            )
420
421        # Validate the phenotype is compatible with the acmc tool
422        validate(str(phen_path.resolve()))
423
424        # Delete each tag locally
425        tags = repo.tags
426        for tag in tags:
427            repo.delete_tag(tag)
428            _logger.debug(f"Deleted tags from forked repo: {tag}")
429
430        # Add upstream remote
431        repo.create_remote("upstream", upstream_url)
432        remote = repo.remotes["origin"]
433        repo.delete_remote(remote)  # Remove existing origin
434
435        # Optionally set a new origin remote
436        if new_origin_url:
437            git_url = _construct_git_url(new_origin_url)
438            repo.create_remote("origin", git_url)
439            repo.git.push("--set-upstream", "origin", "main")
440
441        _logger.info(f"Repository forked successfully at {phen_path}")
442        _logger.info(f"Upstream set to {upstream_url}")
443        if new_origin_url:
444            _logger.info(f"Origin set to {new_origin_url}")
445
446    except Exception as e:
447        if phen_path.exists():
448            shutil.rmtree(phen_path)
449        raise ValueError(f"Error occurred during repository fork: {str(e)}")

Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"

Arguments:
  • phen_dir (str): local directory path where the upstream repo is to be cloned
  • upstream_url (str): url to the upstream repo
  • upstream_version (str): version in the upstream repo to clone
  • new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
Raises:
  • ValueError: if the specified version is not in the upstream repo
  • ValueError: if the upstream repo is not a valid phenotype repo
  • ValueError: if there's any other problems with Git
def validate(phen_dir: str):
452def validate(phen_dir: str):
453    """Validates the phenotype directory is a git repo with standard structure"""
454    _logger.info(f"Validating phenotype: {phen_dir}")
455    phen_path = Path(phen_dir)
456    if not phen_path.is_dir():
457        raise NotADirectoryError(
458            f"Error: '{str(phen_path.resolve())}' is not a directory"
459        )
460
461    config_path = phen_path / CONFIG_FILE
462    if not config_path.is_file():
463        raise FileNotFoundError(
464            f"Error: phen configuration file '{config_path}' does not exist."
465        )
466
467    concepts_path = phen_path / CONCEPTS_DIR
468    if not concepts_path.is_dir():
469        raise FileNotFoundError(
470            f"Error: source concepts directory {concepts_path} does not exist."
471        )
472
473    # Calidate the directory is a git repo
474    try:
475        git.Repo(phen_path)
476    except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError):
477        raise Exception(f"Phen directory {phen_path} is not a git repo")
478
479    # Load configuration File
480    if config_path.suffix == ".yml":
481        try:
482            with config_path.open("r") as file:
483                phenotype = yaml.safe_load(file)
484
485            validator = Validator(CONFIG_SCHEMA)
486            if validator.validate(phenotype):
487                _logger.debug("YAML structure is valid.")
488            else:
489                _logger.error(f"YAML structure validation failed: {validator.errors}")
490                raise Exception(f"YAML structure validation failed: {validator.errors}")
491        except yaml.YAMLError as e:
492            _logger.error(f"YAML syntax error: {e}")
493            raise e
494    else:
495        raise Exception(
496            f"Unsupported configuration filetype: {str(config_path.resolve())}"
497        )
498
499    # initiatise
500    validation_errors = []
501    phenotype = phenotype["phenotype"]
502    code_types = parse.CodeTypeParser().code_types
503
504    # check the version number is of the format vn.n.n
505    match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"])
506    if not match:
507        validation_errors.append(
508            f"Invalid version format in configuration file: {phenotype['version']}"
509        )
510
511    # create a list of all the concept set names defined in the concept set configuration
512    concept_set_names = []
513    for item in phenotype["concept_sets"]:
514        if item["name"] in concept_set_names:
515            validation_errors.append(
516                f"Duplicate concept set defined in concept sets {item['name'] }"
517            )
518        else:
519            concept_set_names.append(item["name"])
520
521    # check codes definition
522    for item in phenotype["concept_sets"]:
523        # check concepte code file exists
524        concept_code_file_path = concepts_path / item["file"]["path"]
525        if not concept_code_file_path.exists():
526            validation_errors.append(
527                f"Coding file {str(concept_code_file_path.resolve())} does not exist"
528            )
529
530        # check concepte code file is not empty
531        if concept_code_file_path.stat().st_size == 0:
532            validation_errors.append(
533                f"Coding file {str(concept_code_file_path.resolve())} is an empty file"
534            )
535
536        # check code file type is supported
537        if concept_code_file_path.suffix not in CODE_FILE_TYPES:
538            raise ValueError(
539                f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types"
540            )
541
542        # check columns specified are a supported medical coding type
543        for column in item["file"]["columns"]:
544            if column not in code_types:
545                validation_errors.append(
546                    f"Column type {column} for file {concept_code_file_path} is not supported"
547                )
548
549        # check the actions are supported
550        if "actions" in item["file"]:
551            for action in item["file"]["actions"]:
552                if action not in COL_ACTIONS:
553                    validation_errors.append(f"Action {action} is not supported")
554
555    if len(validation_errors) > 0:
556        _logger.error(validation_errors)
557        raise PhenValidationException(
558            f"Configuration file {str(config_path.resolve())} failed validation",
559            validation_errors,
560        )
561
562    _logger.info(f"Phenotype validated successfully")

Validates the phenotype directory is a git repo with standard structure

def translate_codes( source_df: pandas.core.frame.DataFrame, target_code_type: str, concept_name: str) -> pandas.core.frame.DataFrame:
656def translate_codes(
657    source_df: pd.DataFrame, target_code_type: str, concept_name: str
658) -> pd.DataFrame:
659    """Translates each source code type the source coding list into a target type and returns all conversions as a concept set"""
660
661    # codes = pd.DataFrame([], dtype=str)
662    codes = pd.DataFrame(
663        columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string"
664    )
665    # Convert codes to target type
666    _logger.info(f"Converting to target code type {target_code_type}")
667
668    for source_code_type in source_df.columns:
669        # if target code type is the same as thet source code type, no translation, just appending source as target
670        if source_code_type == target_code_type:
671            copy_df = pd.DataFrame(
672                {
673                    "SOURCE_CONCEPT": source_df[source_code_type],
674                    "SOURCE_CONCEPT_TYPE": source_code_type,
675                    "CONCEPT": source_df[source_code_type],
676                }
677            )
678            codes = pd.concat([codes, copy_df])
679            _logger.debug(
680                f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating"
681            )
682        else:
683            # get the translation filename using source to target code types
684            filename = f"{source_code_type}_to_{target_code_type}.parquet"
685            map_path = trud.PROCESSED_PATH / filename
686
687            # do the mapping if it exists
688            if map_path.exists():
689                # get mapping
690                df_map = pd.read_parquet(map_path)
691
692                # do mapping
693                translated_df = pd.merge(
694                    source_df[source_code_type], df_map, how="left"
695                )
696
697                # normalise the output
698                translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"])
699                translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type
700
701                # add to list of codes
702                codes = pd.concat([codes, translated_df])
703
704            else:
705                _logger.warning(
706                    f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist"
707                )
708
709    codes = codes.dropna()  # delete NaNs
710
711    # added concept set type to output if any translations
712    if len(codes.index) > 0:
713        codes["CONCEPT_SET"] = concept_name
714    else:
715        _logger.debug(f"No codes converted with target code type {target_code_type}")
716
717    return codes

Translates each source code type the source coding list into a target type and returns all conversions as a concept set

def write_vocab_version(phen_path: pathlib.Path):
738def write_vocab_version(phen_path: Path):
739    # write the vocab version files
740
741    if not trud.VERSION_PATH.exists():
742        raise FileNotFoundError(
743            f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed"
744        )
745
746    if not omop.VERSION_PATH.exists():
747        raise FileNotFoundError(
748            f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed"
749        )
750
751    with trud.VERSION_PATH.open("r") as file:
752        trud_version = yaml.safe_load(file)
753
754    with omop.VERSION_PATH.open("r") as file:
755        omop_version = yaml.safe_load(file)
756
757    # Create the combined YAML structure
758    version_data = {
759        "versions": {
760            "acmc": acmc.__version__,
761            "trud": trud_version,
762            "omop": omop_version,
763        }
764    }
765
766    with open(phen_path / VOCAB_VERSION_FILE, "w") as file:
767        yaml.dump(
768            version_data,
769            file,
770            Dumper=util.QuotedDumper,
771            default_flow_style=False,
772            sort_keys=False,
773            default_style='"',
774        )
def map(phen_dir: str, target_code_type: str):
777def map(phen_dir: str, target_code_type: str):
778    _logger.info(f"Processing phenotype: {phen_dir}")
779
780    # Validate configuration
781    validate(phen_dir)
782
783    # initialise paths
784    phen_path = Path(phen_dir)
785    config_path = phen_path / CONFIG_FILE
786
787    # load configuration
788    with config_path.open("r") as file:
789        config = yaml.safe_load(file)
790    phenotype = config["phenotype"]
791
792    if len(phenotype["map"]) == 0:
793        raise ValueError(f"No map codes defined in the phenotype configuration")
794
795    if target_code_type is not None and target_code_type not in phenotype["map"]:
796        raise ValueError(
797            f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}"
798        )
799
800    if target_code_type is not None:
801        _map_target_code_type(phen_path, phenotype, target_code_type)
802    else:
803        for t in phenotype["map"]:
804            _map_target_code_type(phen_path, phenotype, t)
805
806    _logger.info(f"Phenotype processed successfully")
def publish(phen_dir: str, msg: str, remote_url: str, increment: str = 'patch'):
1001def publish(
1002    phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC
1003):
1004    """Publishes updates to the phenotype by commiting all changes to the repo directory"""
1005
1006    # Validate config
1007    validate(phen_dir)
1008    phen_path = Path(phen_dir)
1009
1010    # load git repo and set the branch
1011    repo = git.Repo(phen_path)
1012    if DEFAULT_GIT_BRANCH in repo.branches:
1013        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
1014        main_branch.checkout()
1015    else:
1016        raise AttributeError(
1017            f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}"
1018        )
1019
1020    # check if any changes to publish
1021    if not repo.is_dirty() and not repo.untracked_files:
1022        if remote_url is not None and "origin" not in repo.remotes:
1023            _logger.info(f"First publish to remote url {remote_url}")
1024        else:
1025            _logger.info("Nothing to publish, no changes to the repo")
1026            return
1027
1028    # get next version
1029    new_version_str = _generate_version_tag(repo, increment)
1030    _logger.info(f"New version: {new_version_str}")
1031
1032    # Write version in configuration file
1033    config_path = phen_path / CONFIG_FILE
1034    with config_path.open("r") as file:
1035        config = yaml.safe_load(file)
1036
1037    config["phenotype"]["version"] = new_version_str
1038    with open(config_path, "w") as file:
1039        yaml.dump(
1040            config,
1041            file,
1042            Dumper=util.QuotedDumper,
1043            default_flow_style=False,
1044            sort_keys=False,
1045            default_style='"',
1046        )
1047
1048    # Add and commit changes to repo including version updates
1049    commit_message = f"Committing updates to phenotype {phen_path}"
1050    repo.git.add("--all")
1051    repo.index.commit(commit_message)
1052
1053    # Add tag to the repo
1054    repo.create_tag(new_version_str)
1055
1056    # push to origin if a remote repo
1057    if remote_url is not None and "origin" not in repo.remotes:
1058        git_url = _construct_git_url(remote_url)
1059        repo.create_remote("origin", git_url)
1060
1061    try:
1062        if "origin" in repo.remotes:
1063            _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}")
1064            origin = repo.remotes.origin
1065            _logger.info(f"Pushing main branch to remote repo")
1066            repo.git.push("--set-upstream", "origin", "main")
1067            _logger.info(f"Pushing version tags to remote git repo")
1068            origin.push(tags=True)
1069            _logger.debug("Changes pushed to 'origin'")
1070        else:
1071            _logger.debug("Remote 'origin' is not set")
1072    except Exception as e:
1073        tag_ref = repo.tags[new_version_str]
1074        repo.delete_tag(tag_ref)
1075        repo.git.reset("--soft", "HEAD~1")
1076        raise e
1077
1078    _logger.info(f"Phenotype published successfully")

Publishes updates to the phenotype by commiting all changes to the repo directory

def export(phen_dir: str, version: str):
1081def export(phen_dir: str, version: str):
1082    """Exports a phen repo at a specific tagged version into a target directory"""
1083    _logger.info(f"Exporting phenotype {phen_dir} at version {version}")
1084
1085    # validate configuration
1086    validate(phen_dir)
1087    phen_path = Path(phen_dir)
1088
1089    # load configuration
1090    config_path = phen_path / CONFIG_FILE
1091    with config_path.open("r") as file:
1092        config = yaml.safe_load(file)
1093
1094    map_path = phen_path / MAP_DIR
1095    if not map_path.exists():
1096        _logger.warning(f"Map path does not exist '{map_path}'")
1097
1098    export_path = phen_path / OMOP_PATH
1099    # check export directory exists and if not create it
1100    if not export_path.exists():
1101        export_path.mkdir(parents=True)
1102        _logger.debug(f"OMOP export directory '{export_path}' created.")
1103
1104    # omop export db
1105    export_db_path = omop.export(
1106        map_path,
1107        export_path,
1108        config["phenotype"]["version"],
1109        config["phenotype"]["omop"],
1110    )
1111
1112    # write to tables
1113    # export as csv
1114    _logger.info(f"Phenotype exported successfully")

Exports a phen repo at a specific tagged version into a target directory

def copy(phen_dir: str, target_dir: str, version: str):
1117def copy(phen_dir: str, target_dir: str, version: str):
1118    """Copys a phen repo at a specific tagged version into a target directory"""
1119
1120    # Validate
1121    validate(phen_dir)
1122    phen_path = Path(phen_dir)
1123
1124    # Check target directory exists
1125    target_path = Path(target_dir)
1126    if not target_path.exists():
1127        raise FileNotFoundError(f"The target directory {target_path} does not exist")
1128
1129    # Set copy directory
1130    copy_path = target_path / version
1131    _logger.info(f"Copying repo {phen_path} to {copy_path}")
1132
1133    if (
1134        copy_path.exists() and copy_path.is_dir()
1135    ):  # Check if it exists and is a directory
1136        copy = _check_delete_dir(
1137            copy_path,
1138            f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ",
1139        )
1140    else:
1141        copy = True
1142
1143    if not copy:
1144        _logger.info(f"Not copying the version {version}")
1145        return
1146
1147    _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...")
1148    repo = git.Repo.clone_from(phen_path, copy_path)
1149
1150    # Check out the latest commit or specified version
1151    if version:
1152        # Checkout a specific version (e.g., branch, tag, or commit hash)
1153        _logger.info(f"Checking out version {version}...")
1154        repo.git.checkout(version)
1155    else:
1156        # Checkout the latest commit (HEAD)
1157        _logger.info(f"Checking out the latest commit...")
1158        repo.git.checkout("HEAD")
1159
1160    _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}")
1161
1162    _logger.info(f"Phenotype copied successfully")

Copys a phen repo at a specific tagged version into a target directory

def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1166def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1167    """Extracts concepts as {name: file_path} dictionary and a name set."""
1168    concepts_dict = {
1169        item["name"]: item["file"]["path"]
1170        for item in config_data["phenotype"]["concept_sets"]
1171    }
1172    name_set = set(concepts_dict.keys())
1173    return concepts_dict, name_set

Extracts concepts as {name: file_path} dictionary and a name set.

def diff_config(old_config: dict, new_config: dict) -> str:
1187def diff_config(old_config: dict, new_config: dict) -> str:
1188    report = f"\n# Changes to phenotype configuration\n"
1189    report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n"
1190
1191    old_concepts, old_names = extract_concepts(old_config)
1192    new_concepts, new_names = extract_concepts(new_config)
1193
1194    # Check added and removed names
1195    added_names = new_names - old_names  # Names that appear in new but not in old
1196    removed_names = old_names - new_names  # Names that were in old but not in new
1197
1198    # find file path changes for unchanged names
1199    unchanged_names = old_names & new_names  # Names that exist in both
1200    file_diff = DeepDiff(
1201        {name: old_concepts[name] for name in unchanged_names},
1202        {name: new_concepts[name] for name in unchanged_names},
1203    )
1204
1205    # Find renamed concepts (same file, different name)
1206    renamed_concepts = []
1207    for removed in removed_names:
1208        old_path = old_concepts[removed]
1209        for added in added_names:
1210            new_path = new_concepts[added]
1211            if old_path == new_path:
1212                renamed_concepts.append((removed, added))
1213
1214    # Remove renamed concepts from added and removed sets
1215    for old_name, new_name in renamed_concepts:
1216        added_names.discard(new_name)
1217        removed_names.discard(old_name)
1218
1219    # generate config report
1220    if added_names:
1221        report += "## Added Concepts\n"
1222        for name in added_names:
1223            report += f"- `{name}` (File: `{new_concepts[name]}`)\n"
1224        report += "\n"
1225
1226    if removed_names:
1227        report += "## Removed Concepts\n"
1228        for name in removed_names:
1229            report += f"- `{name}` (File: `{old_concepts[name]}`)\n"
1230        report += "\n"
1231
1232    if renamed_concepts:
1233        report += "## Renamed Concepts\n"
1234        for old_name, new_name in renamed_concepts:
1235            report += (
1236                f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n"
1237            )
1238        report += "\n"
1239
1240    if "values_changed" in file_diff:
1241        report += "## Updated File Paths\n"
1242        for name, change in file_diff["values_changed"].items():
1243            old_file = change["old_value"]
1244            new_file = change["new_value"]
1245            clean_name = name.split("root['")[1].split("']")[0]
1246            report += (
1247                f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n"
1248            )
1249        report += "\n"
1250
1251    if not (
1252        added_names
1253        or removed_names
1254        or renamed_concepts
1255        or file_diff.get("values_changed")
1256    ):
1257        report += "No changes in concept sets.\n"
1258
1259    return report
def diff_map_files(old_map_path: pathlib.Path, new_map_path: pathlib.Path) -> str:
1262def diff_map_files(old_map_path: Path, new_map_path: Path) -> str:
1263    old_output_files = [
1264        file.name
1265        for file in old_map_path.iterdir()
1266        if file.is_file() and not file.name.startswith(".")
1267    ]
1268    new_output_files = [
1269        file.name
1270        for file in new_map_path.iterdir()
1271        if file.is_file() and not file.name.startswith(".")
1272    ]
1273
1274    # Convert the lists to sets for easy comparison
1275    old_output_set = set(old_output_files)
1276    new_output_set = set(new_output_files)
1277
1278    # Outputs that are in old_output_set but not in new_output_set (removed files)
1279    removed_outputs = old_output_set - new_output_set
1280    # Outputs that are in new_output_set but not in old_output_set (added files)
1281    added_outputs = new_output_set - old_output_set
1282    # Outputs that are the intersection of old_output_set and new_output_set
1283    common_outputs = old_output_set & new_output_set
1284
1285    report = f"\n# Changes to available translations\n"
1286    report += f"This compares the coding translations files available.\n\n"
1287    report += f"- Removed outputs: {sorted(list(removed_outputs))}\n"
1288    report += f"- Added outputs: {sorted(list(added_outputs))}\n"
1289    report += f"- Common outputs: {sorted(list(common_outputs))}\n\n"
1290
1291    # Step N: Compare common outputs between versions
1292    report += f"# Changes to concepts in translation files\n\n"
1293    report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n"
1294    for file in common_outputs:
1295        old_output = old_map_path / file
1296        new_output = new_map_path / file
1297
1298        _logger.debug(f"Old ouptput: {str(old_output.resolve())}")
1299        _logger.debug(f"New ouptput: {str(new_output.resolve())}")
1300
1301        df1 = pd.read_csv(old_output)
1302        df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1303        df2 = pd.read_csv(new_output)
1304        df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1305
1306        # Check for added and removed concepts
1307        report += f"- File {file}\n"
1308        sorted_list = sorted(list(set(df1.index) - set(df2.index)))
1309        report += f"- Removed concepts {sorted_list}\n"
1310        sorted_list = sorted(list(set(df2.index) - set(df1.index)))
1311        report += f"- Added concepts {sorted_list}\n"
1312
1313        # Check for changed concepts
1314        diff = df2 - df1  # diff in counts
1315        diff = diff[
1316            (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna()
1317        ]  # get non-zero counts
1318        s = "\n"
1319        if len(diff.index) > 0:
1320            for concept, row in diff.iterrows():
1321                s += "\t - {} {}\n".format(concept, row["CONCEPT"])
1322            report += f"- Changed concepts {s}\n\n"
1323        else:
1324            report += f"- Changed concepts []\n\n"
1325
1326    return report
def diff_phen( new_phen_path: pathlib.Path, new_version: str, old_phen_path: pathlib.Path, old_version: str, report_path: pathlib.Path):
1329def diff_phen(
1330    new_phen_path: Path,
1331    new_version: str,
1332    old_phen_path: Path,
1333    old_version: str,
1334    report_path: Path,
1335):
1336    """Compare the differences between two versions of a phenotype"""
1337
1338    # validate phenotypes
1339    _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}")
1340    validate(str(old_phen_path.resolve()))
1341    _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}")
1342    validate(str(new_phen_path.resolve()))
1343
1344    # get old and new config
1345    old_config_path = old_phen_path / CONFIG_FILE
1346    with old_config_path.open("r") as file:
1347        old_config = yaml.safe_load(file)
1348    new_config_path = new_phen_path / CONFIG_FILE
1349    with new_config_path.open("r") as file:
1350        new_config = yaml.safe_load(file)
1351
1352    # write report heading
1353    report = f"# Phenotype Comparison Report\n"
1354    report += f"## Original phenotype\n"
1355    report += f"  - {old_config['phenotype']['omop']['vocabulary_id']}\n"
1356    report += f"  - {old_version}\n"
1357    report += f"  - {str(old_phen_path.resolve())}\n"
1358    report += f"## Changed phenotype:\n"
1359    report += f"  - {new_config['phenotype']['omop']['vocabulary_id']}\n"
1360    report += f"  - {new_version}\n"
1361    report += f"  - {str(new_phen_path.resolve())}\n"
1362
1363    # Step 1: check differences configuration files
1364    # Convert list of dicts into a dict: {name: file}
1365    report += diff_config(old_config, new_config)
1366
1367    # Step 2: check differences between map files
1368    # List files from output directories
1369    old_map_path = old_phen_path / MAP_DIR
1370    new_map_path = new_phen_path / MAP_DIR
1371    report += diff_map_files(old_map_path, new_map_path)
1372
1373    # initialise report file
1374    _logger.debug(f"Writing to report file {str(report_path.resolve())}")
1375    report_file = open(report_path, "w")
1376    report_file.write(report)
1377    report_file.close()
1378
1379    _logger.info(f"Phenotypes diff'd successfully")

Compare the differences between two versions of a phenotype

def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str):
1382def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str):
1383    # make tmp directory .acmc
1384    timestamp = time.strftime("%Y%m%d_%H%M%S")
1385    temp_dir = Path(f".acmc/diff_{timestamp}")
1386
1387    changed_phen_path = Path(phen_dir)
1388    if not changed_phen_path.exists():
1389        raise ValueError(
1390            f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}"
1391        )
1392
1393    old_phen_path = Path(old_phen_dir)
1394    if not old_phen_path.exists():
1395        raise ValueError(
1396            f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}"
1397        )
1398
1399    t_path = old_phen_path / "config.yml"
1400    with t_path.open("r") as file:
1401        c = yaml.safe_load(file)
1402
1403    try:
1404        # Create the directory
1405        temp_dir.mkdir(parents=True, exist_ok=True)
1406        _logger.debug(f"Temporary directory created: {temp_dir}")
1407
1408        # Create temporary directories
1409        changed_path = temp_dir / "changed"
1410        changed_path.mkdir(parents=True, exist_ok=True)
1411        old_path = temp_dir / "old"
1412        old_path.mkdir(parents=True, exist_ok=True)
1413
1414        # checkout changed
1415        if version == "latest":
1416            _logger.debug(
1417                f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..."
1418            )
1419            shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True)
1420        else:
1421            _logger.debug(
1422                f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..."
1423            )
1424            changed_repo = git.Repo.clone_from(changed_phen_path, changed_path)
1425            changed_repo.git.checkout(version)
1426
1427        # checkout old
1428        if old_version == "latest":
1429            _logger.debug(
1430                f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1431            )
1432            shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True)
1433        else:
1434            _logger.debug(
1435                f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1436            )
1437            old_repo = git.Repo.clone_from(old_phen_dir, old_path)
1438            old_repo.git.checkout(old_version)
1439
1440        report_filename = f"{version}_{old_version}_diff.md"
1441        report_path = changed_phen_path / report_filename
1442        # diff old with new
1443        diff_phen(changed_path, version, old_path, old_version, report_path)
1444
1445    finally:
1446        # clean up tmp directory
1447        if temp_dir.exists():
1448            shutil.rmtree(temp_dir)
1449            print(f"Temporary directory removed: {temp_dir}")