acmc.phen

phenotype.py module

This module provides functionality for managing phenotypes.

   1"""
   2phenotype.py module
   3
   4This module provides functionality for managing phenotypes.
   5"""
   6
   7import argparse
   8import pandas as pd
   9import numpy as np
  10import json
  11import os
  12import sqlite3
  13import sys
  14import shutil
  15import time
  16import git
  17import re
  18import logging
  19import requests
  20import yaml
  21import semver
  22from git import Repo
  23from cerberus import Validator  # type: ignore
  24from deepdiff import DeepDiff
  25from pathlib import Path
  26from urllib.parse import urlparse, urlunparse
  27from typing import Tuple, Set, Any
  28import acmc
  29from acmc import trud, omop, parse, util, logging_config as lc
  30
  31# set up logging
  32_logger = lc.setup_logger()
  33
  34pd.set_option("mode.chained_assignment", None)
  35
  36PHEN_DIR = "phen"
  37"""Default phenotype directory name"""
  38
  39DEFAULT_PHEN_PATH = Path("./workspace") / PHEN_DIR
  40"""Default phenotype directory path"""
  41
  42CONCEPTS_DIR = "concepts"
  43"""Default concepts directory name"""
  44
  45MAP_DIR = "map"
  46"""Default map directory name"""
  47
  48CONCEPT_SET_DIR = "concept-sets"
  49"""Default concept set directory name"""
  50
  51CSV_PATH = Path(CONCEPT_SET_DIR) / "csv"
  52"""Default CSV concept set directory path"""
  53
  54OMOP_PATH = Path(CONCEPT_SET_DIR) / "omop"
  55"""Default OMOP concept set directory path"""
  56
  57DEFAULT_PHEN_DIR_LIST = [CONCEPTS_DIR, MAP_DIR, CONCEPT_SET_DIR]
  58"""List of default phenotype directories"""
  59
  60CONFIG_FILE = "config.yml"
  61"""Default configuration filename"""
  62
  63VOCAB_VERSION_FILE = "vocab_version.yml"
  64"""Default vocabulary version filename"""
  65
  66SEMANTIC_VERSION_TYPES = ["major", "minor", "patch"]
  67"""List of semantic version increment types"""
  68
  69DEFAULT_VERSION_INC = "patch"
  70"""Default semantic version increment type"""
  71
  72DEFAULT_GIT_BRANCH = "main"
  73"""Default phenotype repo branch name"""
  74
  75SPLIT_COL_ACTION = "split_col"
  76"""Split column preprocessing action type"""
  77
  78CODES_COL_ACTION = "codes_col"
  79"""Codes column preprocessing action type"""
  80
  81DIVIDE_COL_ACTION = "divide_col"
  82"""Divide column preprocessing action type"""
  83
  84COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION]
  85"""List of column preprocessing action types"""
  86
  87CODE_FILE_TYPES = [".xlsx", ".xls", ".csv"]
  88"""List of supported source concept coding list file types"""
  89
  90# config.yaml schema
  91CONFIG_SCHEMA = {
  92    "phenotype": {
  93        "type": "dict",
  94        "required": True,
  95        "schema": {
  96            "version": {
  97                "type": "string",
  98                "required": True,
  99                "regex": r"^\d+\.\d+\.\d+$",  # Enforces 'vN.N.N' format
 100            },
 101            "omop": {
 102                "type": "dict",
 103                "required": True,
 104                "schema": {
 105                    "vocabulary_id": {"type": "string", "required": True},
 106                    "vocabulary_name": {"type": "string", "required": True},
 107                    "vocabulary_reference": {
 108                        "type": "string",
 109                        "required": True,
 110                        "regex": r"^https?://.*",  # Ensures it's a URL
 111                    },
 112                },
 113            },
 114            "map": {
 115                "type": "list",
 116                "schema": {
 117                    "type": "string",
 118                    "allowed": list(
 119                        parse.SUPPORTED_CODE_TYPES
 120                    ),  # Ensure only predefined values are allowed
 121                },
 122            },
 123            "concept_sets": {
 124                "type": "list",
 125                "required": True,
 126                "schema": {
 127                    "type": "dict",
 128                    "schema": {
 129                        "name": {"type": "string", "required": True},
 130                        "files": {
 131                            "type": "list",
 132                            "required": True,
 133                            "schema": {
 134                                "type": "dict",
 135                                "schema": {
 136                                    "path": {"type": "string", "required": True},
 137                                    "columns": {"type": "dict", "required": True},
 138                                    "category": {
 139                                        "type": "string"
 140                                    },  # Optional but must be string if present
 141                                    "actions": {
 142                                        "type": "dict",
 143                                        "schema": {
 144                                            "divide_col": {"type": "string"},
 145                                            "split_col": {"type": "string"},
 146                                            "codes_col": {"type": "string"},
 147                                        },
 148                                    },
 149                                },
 150                            },
 151                        },
 152                        "metadata": {"type": "dict", "required": False},
 153                    },
 154                },
 155            },
 156        },
 157    }
 158}
 159"""Phenotype config.yml schema definition"""
 160
 161
 162class PhenValidationException(Exception):
 163    """Custom exception class raised when validation errors in phenotype configuration file"""
 164
 165    def __init__(self, message, validation_errors=None):
 166        super().__init__(message)
 167        self.validation_errors = validation_errors
 168
 169
 170def _construct_git_url(remote_url: str):
 171    """Constructs a git url for github or gitlab including a PAT token environment variable"""
 172    # check the url
 173    parsed_url = urlparse(remote_url)
 174
 175    # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd
 176    # need to update this function if the URL scheme is different.
 177    if "github.com" in parsed_url.netloc:
 178        # get GitHub PAT from environment variable
 179        auth = os.getenv("ACMC_GITHUB_PAT")
 180        if not auth:
 181            raise ValueError(
 182                "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable."
 183            )
 184    else:
 185        # get GitLab PAT from environment variable
 186        auth = os.getenv("ACMC_GITLAB_PAT")
 187        if not auth:
 188            raise ValueError(
 189                "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable."
 190            )
 191        auth = f"oauth2:{auth}"
 192
 193    # Construct the new URL with credentials
 194    new_netloc = f"{auth}@{parsed_url.netloc}"
 195    return urlunparse(
 196        (
 197            parsed_url.scheme,
 198            new_netloc,
 199            parsed_url.path,
 200            parsed_url.params,
 201            parsed_url.query,
 202            parsed_url.fragment,
 203        )
 204    )
 205
 206
 207def _create_empty_git_dir(path: Path):
 208    """Creates a directory with a .gitkeep file so that it's tracked in git"""
 209    path.mkdir(exist_ok=True)
 210    keep_path = path / ".gitkeep"
 211    keep_path.touch(exist_ok=True)
 212
 213
 214def _check_delete_dir(path: Path, msg: str) -> bool:
 215    """Checks on the command line if a user wants to delete a directory
 216
 217    Args:
 218        path (Path): path of the directory to be deleted
 219        msg (str): message to be displayed to the user
 220
 221    Returns:
 222        Boolean: True if deleted
 223    """
 224    deleted = False
 225
 226    user_input = input(f"{msg}").strip().lower()
 227    if user_input in ["yes", "y"]:
 228        shutil.rmtree(path)
 229        deleted = True
 230    else:
 231        _logger.info("Directory was not deleted.")
 232
 233    return deleted
 234
 235
 236def init(phen_dir: str, remote_url: str):
 237    """Initial phenotype directory as git repo with standard structure"""
 238    _logger.info(f"Initialising Phenotype in directory: {phen_dir}")
 239    phen_path = Path(phen_dir)
 240
 241    # check if directory already exists and ask user if they want to recreate it
 242    if (
 243        phen_path.exists() and phen_path.is_dir()
 244    ):  # Check if it exists and is a directory
 245        configure = _check_delete_dir(
 246            phen_path,
 247            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
 248        )
 249    else:
 250        configure = True
 251
 252    if not configure:
 253        _logger.info(f"Exiting, phenotype not initiatised")
 254        return
 255
 256    # Initialise repo from local or remote
 257    repo: Repo
 258
 259    # if remote then clone the repo otherwise init a local repo
 260    if remote_url != None:
 261        # add PAT token to the URL
 262        git_url = _construct_git_url(remote_url)
 263
 264        # clone the repo
 265        git_cmd = git.cmd.Git()
 266        git_cmd.clone(git_url, phen_path)
 267
 268        # open repo
 269        repo = Repo(phen_path)
 270        # check if there are any commits (new repo has no commits)
 271        if (
 272            len(repo.branches) == 0 or repo.head.is_detached
 273        ):  # Handle detached HEAD (e.g., after init)
 274            _logger.debug("The phen repository has no commits yet.")
 275            commit_count = 0
 276        else:
 277            # Get the total number of commits in the default branch
 278            commit_count = sum(1 for _ in repo.iter_commits())
 279            _logger.debug(f"Repo has previous commits: {commit_count}")
 280    else:
 281        # local repo, create the directories and init
 282        phen_path.mkdir(parents=True, exist_ok=True)
 283        _logger.debug(f"Phen directory '{phen_path}' has been created.")
 284        repo = git.Repo.init(phen_path)
 285        commit_count = 0
 286
 287    phen_path = phen_path.resolve()
 288    # initialise empty repos
 289    if commit_count == 0:
 290        # create initial commit
 291        initial_file_path = phen_path / "README.md"
 292        with open(initial_file_path, "w") as file:
 293            file.write(
 294                "# Initial commit\nThis is the first commit in the phen repository.\n"
 295            )
 296        repo.index.add([initial_file_path])
 297        repo.index.commit("Initial commit")
 298        commit_count = 1
 299
 300    # Checkout the phens default branch, creating it if it does not exist
 301    if DEFAULT_GIT_BRANCH in repo.branches:
 302        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
 303        main_branch.checkout()
 304    else:
 305        main_branch = repo.create_head(DEFAULT_GIT_BRANCH)
 306        main_branch.checkout()
 307
 308    # if the phen path does not contain the config file then initialise the phen type
 309    config_path = phen_path / CONFIG_FILE
 310    if config_path.exists():
 311        _logger.debug(f"Phenotype configuration files already exist")
 312        return
 313
 314    _logger.info("Creating phen directory structure and config files")
 315    for d in DEFAULT_PHEN_DIR_LIST:
 316        _create_empty_git_dir(phen_path / d)
 317
 318    # create empty phen config file
 319    config = {
 320        "phenotype": {
 321            "version": "0.0.0",
 322            "omop": {
 323                "vocabulary_id": "",
 324                "vocabulary_name": "",
 325                "vocabulary_reference": "",
 326            },
 327            "translate": [],
 328            "concept_sets": [],
 329        }
 330    }
 331
 332    with open(phen_path / CONFIG_FILE, "w") as file:
 333        yaml.dump(
 334            config,
 335            file,
 336            Dumper=util.QuotedDumper,
 337            default_flow_style=False,
 338            sort_keys=False,
 339            default_style='"',
 340        )
 341
 342    # add git ignore
 343    ignore_content = """# Ignore SQLite database files
 344*.db
 345*.sqlite3
 346 
 347# Ignore SQLite journal and metadata files
 348*.db-journal
 349*.sqlite3-journal
 350
 351# python
 352.ipynb_checkpoints
 353 """
 354    ignore_path = phen_path / ".gitignore"
 355    with open(ignore_path, "w") as file:
 356        file.write(ignore_content)
 357
 358    # add to git repo and commit
 359    for d in DEFAULT_PHEN_DIR_LIST:
 360        repo.git.add(phen_path / d)
 361    repo.git.add(all=True)
 362    repo.index.commit("initialised the phen git repo.")
 363
 364    _logger.info(f"Phenotype initialised successfully")
 365
 366
 367def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
 368    """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
 369
 370    Args:
 371        phen_dir (str): local directory path where the upstream repo is to be cloned
 372        upstream_url (str): url to the upstream repo
 373        upstream_version (str): version in the upstream repo to clone
 374        new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
 375
 376    Raises:
 377        ValueError: if the specified version is not in the upstream repo
 378        ValueError: if the upstream repo is not a valid phenotype repo
 379        ValueError: if there's any other problems with Git
 380    """
 381    _logger.info(
 382        f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}"
 383    )
 384
 385    phen_path = Path(phen_dir)
 386    # check if directory already exists and ask user if they want to recreate it
 387    if (
 388        phen_path.exists() and phen_path.is_dir()
 389    ):  # Check if it exists and is a directory
 390        configure = _check_delete_dir(
 391            phen_path,
 392            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
 393        )
 394    else:
 395        configure = True
 396
 397    if not configure:
 398        _logger.info(f"Exiting, phenotype not initiatised")
 399        return
 400
 401    try:
 402        # Clone repo
 403        git_url = _construct_git_url(upstream_url)
 404        repo = git.Repo.clone_from(git_url, phen_path)
 405
 406        # Fetch all branches and tags
 407        repo.remotes.origin.fetch()
 408
 409        # Check if the version exists
 410        available_refs = [ref.name.split("/")[-1] for ref in repo.references]
 411        if upstream_version not in available_refs:
 412            raise ValueError(
 413                f"Version '{upstream_version}' not found in the repository: {upstream_url}."
 414            )
 415
 416        # Checkout the specified version
 417        repo.git.checkout(upstream_version)
 418        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
 419        main_branch.checkout()
 420
 421        # Check if 'config.yml' exists in the root directory
 422        config_path = phen_path / "config.yml"
 423        if not os.path.isfile(config_path):
 424            raise ValueError(
 425                f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory."
 426            )
 427
 428        # Validate the phenotype is compatible with the acmc tool
 429        validate(str(phen_path.resolve()))
 430
 431        # Delete each tag locally
 432        tags = repo.tags
 433        for tag in tags:
 434            repo.delete_tag(tag)
 435            _logger.debug(f"Deleted tags from forked repo: {tag}")
 436
 437        # Add upstream remote
 438        repo.create_remote("upstream", upstream_url)
 439        remote = repo.remotes["origin"]
 440        repo.delete_remote(remote)  # Remove existing origin
 441
 442        # Optionally set a new origin remote
 443        if new_origin_url:
 444            git_url = _construct_git_url(new_origin_url)
 445            repo.create_remote("origin", git_url)
 446            repo.git.push("--set-upstream", "origin", "main")
 447
 448        _logger.info(f"Repository forked successfully at {phen_path}")
 449        _logger.info(f"Upstream set to {upstream_url}")
 450        if new_origin_url:
 451            _logger.info(f"Origin set to {new_origin_url}")
 452
 453    except Exception as e:
 454        if phen_path.exists():
 455            shutil.rmtree(phen_path)
 456        raise ValueError(f"Error occurred during repository fork: {str(e)}")
 457
 458
 459def validate(phen_dir: str):
 460    """Validates the phenotype directory is a git repo with standard structure"""
 461    _logger.info(f"Validating phenotype: {phen_dir}")
 462    phen_path = Path(phen_dir)
 463    if not phen_path.is_dir():
 464        raise NotADirectoryError(
 465            f"Error: '{str(phen_path.resolve())}' is not a directory"
 466        )
 467
 468    config_path = phen_path / CONFIG_FILE
 469    if not config_path.is_file():
 470        raise FileNotFoundError(
 471            f"Error: phen configuration file '{config_path}' does not exist."
 472        )
 473
 474    concepts_path = phen_path / CONCEPTS_DIR
 475    if not concepts_path.is_dir():
 476        raise FileNotFoundError(
 477            f"Error: source concepts directory {concepts_path} does not exist."
 478        )
 479
 480    # Calidate the directory is a git repo
 481    try:
 482        git.Repo(phen_path)
 483    except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError):
 484        raise Exception(f"Phen directory {phen_path} is not a git repo")
 485
 486    # Load configuration File
 487    if config_path.suffix == ".yml":
 488        try:
 489            with config_path.open("r") as file:
 490                phenotype = yaml.safe_load(file)
 491
 492            validator = Validator(CONFIG_SCHEMA)
 493            if validator.validate(phenotype):
 494                _logger.debug("YAML structure is valid.")
 495            else:
 496                _logger.error(f"YAML structure validation failed: {validator.errors}")
 497                raise Exception(f"YAML structure validation failed: {validator.errors}")
 498        except yaml.YAMLError as e:
 499            _logger.error(f"YAML syntax error: {e}")
 500            raise e
 501    else:
 502        raise Exception(
 503            f"Unsupported configuration filetype: {str(config_path.resolve())}"
 504        )
 505
 506    # initiatise
 507    validation_errors = []
 508    phenotype = phenotype["phenotype"]
 509    code_types = parse.CodeTypeParser().code_types
 510
 511    # check the version number is of the format vn.n.n
 512    match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"])
 513    if not match:
 514        validation_errors.append(
 515            f"Invalid version format in configuration file: {phenotype['version']}"
 516        )
 517
 518    # create a list of all the concept set names defined in the concept set configuration
 519    concept_set_names = []
 520    for item in phenotype["concept_sets"]:
 521        if item["name"] in concept_set_names:
 522            validation_errors.append(
 523                f"Duplicate concept set defined in concept sets {item['name'] }"
 524            )
 525        else:
 526            concept_set_names.append(item["name"])
 527
 528    # check codes definition
 529    for files in phenotype["concept_sets"]:
 530        for item in files["files"]:
 531            # check concepte code file exists
 532            concept_code_file_path = concepts_path / item["path"]
 533            if not concept_code_file_path.exists():
 534                validation_errors.append(
 535                    f"Coding file {str(concept_code_file_path.resolve())} does not exist"
 536                )
 537
 538            # check concepte code file is not empty
 539            if concept_code_file_path.stat().st_size == 0:
 540                validation_errors.append(
 541                    f"Coding file {str(concept_code_file_path.resolve())} is an empty file"
 542                )
 543
 544            # check code file type is supported
 545            if concept_code_file_path.suffix not in CODE_FILE_TYPES:
 546                raise ValueError(
 547                    f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types"
 548                )
 549
 550            # check columns specified are a supported medical coding type
 551            for column in item["columns"]:
 552                if column not in code_types:
 553                    validation_errors.append(
 554                        f"Column type {column} for file {concept_code_file_path} is not supported"
 555                    )
 556
 557            # check the actions are supported
 558            if "actions" in item:
 559                for action in item["actions"]:
 560                    if action not in COL_ACTIONS:
 561                        validation_errors.append(f"Action {action} is not supported")
 562
 563    if len(validation_errors) > 0:
 564        _logger.error(validation_errors)
 565        raise PhenValidationException(
 566            f"Configuration file {str(config_path.resolve())} failed validation",
 567            validation_errors,
 568        )
 569
 570    _logger.info(f"Phenotype validated successfully")
 571
 572
 573def _read_table_file(path: Path, excel_sheet: str = ""):
 574    """
 575    Load Code List File
 576    """
 577
 578    path = path.resolve()
 579    if path.suffix == ".csv":
 580        df = pd.read_csv(path, dtype=str)
 581    elif path.suffix == ".xlsx" or path.suffix == ".xls":
 582        if excel_sheet != "":
 583            df = pd.read_excel(path, sheet_name=excel_sheet, dtype=str)
 584        else:
 585            df = pd.read_excel(path, dtype=str)
 586    elif path.suffix == ".dta":
 587        df = pd.read_stata(path)
 588    else:
 589        raise ValueError(
 590            f"Unsupported filetype {path.suffix}, only support{CODE_FILE_TYPES} code file types"
 591        )
 592
 593    return df
 594
 595
 596def _process_actions(df: pd.DataFrame, concept_set: dict) -> pd.DataFrame:
 597    # Perform Structural Changes to file before preprocessing
 598    _logger.debug("Processing file structural actions")
 599    if (
 600        "actions" in concept_set
 601        and "split_col" in concept_set["actions"]
 602        and "codes_col" in concept_set["actions"]
 603    ):
 604        split_col = concept_set["actions"]["split_col"]
 605        codes_col = concept_set["actions"]["codes_col"]
 606        _logger.debug(
 607            "Action: Splitting",
 608            split_col,
 609            "column into:",
 610            df[split_col].unique(),
 611        )
 612        codes = df[codes_col]
 613        oh = pd.get_dummies(df[split_col], dtype=bool)  # one hot encode
 614        oh = oh.where((oh != True), codes, axis=0)  # fill in 1s with codes
 615        oh[oh == False] = np.nan  # replace 0s with None
 616        df = pd.concat([df, oh], axis=1)  # merge in new columns
 617
 618    return df
 619
 620
 621def _preprocess_source_concepts(
 622    df: pd.DataFrame, concept_set: dict, code_file_path: Path
 623) -> Tuple[pd.DataFrame, list]:
 624    """Perform QA Checks on columns individually and append to df"""
 625    out = pd.DataFrame([])  # create output df to append to
 626    code_errors = []  # list of errors from processing
 627
 628    # remove unnamed columns due to extra commas, missing headers, or incorrect parsing
 629    df = df.drop(columns=[col for col in df.columns if "Unnamed" in col])
 630
 631    # Preprocess codes
 632    code_types = parse.CodeTypeParser().code_types
 633    for code_type in concept_set["columns"]:
 634        parser = code_types[code_type]
 635        _logger.info(f"Processing {code_type} codes for {code_file_path}")
 636
 637        # get codes by column name
 638        source_col_name = concept_set["columns"][code_type]
 639        codes = df[source_col_name].dropna()
 640        codes = codes.astype(str)  # convert to string
 641        codes = codes.str.strip()  # remove excess spaces
 642
 643        # process codes, validating them using parser and returning the errors
 644        codes, errors = parser.process(codes, code_file_path)
 645        if len(errors) > 0:
 646            code_errors.extend(errors)
 647            _logger.warning(f"Codes validation failed with {len(errors)} errors")
 648
 649        # add processed codes to df
 650        new_col_name = f"{source_col_name}_SOURCE"
 651        df = df.rename(columns={source_col_name: new_col_name})
 652        process_codes = pd.DataFrame({code_type: codes}).join(df)
 653        out = pd.concat(
 654            [out, process_codes],
 655            ignore_index=True,
 656        )
 657
 658    _logger.debug(out.head())
 659
 660    return out, code_errors
 661
 662
 663# Translate Df with multiple codes into single code type Series
 664def translate_codes(
 665    source_df: pd.DataFrame,
 666    target_code_type: str,
 667    concept_name: str,
 668    not_translate: bool,
 669) -> pd.DataFrame:
 670    """Translates each source code type the source coding list into a target type and returns all conversions as a concept set"""
 671
 672    # codes = pd.DataFrame([], dtype=str)
 673    codes = pd.DataFrame(
 674        columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string"
 675    )
 676    # Convert codes to target type
 677    _logger.info(f"Converting to target code type {target_code_type}")
 678
 679    for source_code_type in source_df.columns:
 680        # if target code type is the same as thet source code type, no translation, just appending source as target
 681        if source_code_type == target_code_type:
 682            copy_df = pd.DataFrame(
 683                {
 684                    "SOURCE_CONCEPT": source_df[source_code_type],
 685                    "SOURCE_CONCEPT_TYPE": source_code_type,
 686                    "CONCEPT": source_df[source_code_type],
 687                }
 688            )
 689            codes = pd.concat([codes, copy_df])
 690            _logger.debug(
 691                f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating"
 692            )
 693        elif not not_translate:
 694            # get the translation filename using source to target code types
 695            filename = f"{source_code_type}_to_{target_code_type}.parquet"
 696            map_path = trud.PROCESSED_PATH / filename
 697
 698            # do the mapping if it exists
 699            if map_path.exists():
 700                # get mapping
 701                df_map = pd.read_parquet(map_path)
 702
 703                # do mapping
 704                translated_df = pd.merge(
 705                    source_df[source_code_type], df_map, how="left"
 706                )
 707
 708                # normalise the output
 709                translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"])
 710                translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type
 711
 712                # add to list of codes
 713                codes = pd.concat([codes, translated_df])
 714
 715            else:
 716                _logger.warning(
 717                    f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist"
 718                )
 719
 720    codes = codes.dropna()  # delete NaNs
 721
 722    # added concept set type to output if any translations
 723    if len(codes.index) > 0:
 724        codes["CONCEPT_SET"] = concept_name
 725    else:
 726        _logger.debug(f"No codes converted with target code type {target_code_type}")
 727
 728    return codes
 729
 730
 731def _write_code_errors(code_errors: list, code_errors_path: Path):
 732    err_df = pd.DataFrame(
 733        [
 734            {
 735                "CONCEPT": ", ".join(err.codes[~err.mask].tolist()),
 736                "VOCABULARY": err.code_type,
 737                "SOURCE": err.codes_file,
 738                "CAUSE": err.message,
 739            }
 740            for err in code_errors
 741            if err.mask is not None
 742        ]
 743    )
 744
 745    err_df = err_df.drop_duplicates()  # Remove Duplicates from Error file
 746    err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"])
 747    err_df.to_csv(code_errors_path, index=False, mode="w")
 748
 749
 750def write_vocab_version(phen_path: Path):
 751    # write the vocab version files
 752
 753    if not trud.VERSION_PATH.exists():
 754        raise FileNotFoundError(
 755            f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed"
 756        )
 757
 758    if not omop.VERSION_PATH.exists():
 759        raise FileNotFoundError(
 760            f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed"
 761        )
 762
 763    with trud.VERSION_PATH.open("r") as file:
 764        trud_version = yaml.safe_load(file)
 765
 766    with omop.VERSION_PATH.open("r") as file:
 767        omop_version = yaml.safe_load(file)
 768
 769    # Create the combined YAML structure
 770    version_data = {
 771        "versions": {
 772            "acmc": acmc.__version__,
 773            "trud": trud_version,
 774            "omop": omop_version,
 775        }
 776    }
 777
 778    with open(phen_path / VOCAB_VERSION_FILE, "w") as file:
 779        yaml.dump(
 780            version_data,
 781            file,
 782            Dumper=util.QuotedDumper,
 783            default_flow_style=False,
 784            sort_keys=False,
 785            default_style='"',
 786        )
 787
 788
 789def map(phen_dir: str, target_code_type: str, not_translate: bool, no_metadata: bool):
 790    _logger.info(f"Processing phenotype: {phen_dir}")
 791
 792    # Validate configuration
 793    validate(phen_dir)
 794
 795    # initialise paths
 796    phen_path = Path(phen_dir)
 797    config_path = phen_path / CONFIG_FILE
 798
 799    # load configuration
 800    with config_path.open("r") as file:
 801        config = yaml.safe_load(file)
 802    phenotype = config["phenotype"]
 803
 804    if len(phenotype["map"]) == 0:
 805        raise ValueError(f"No map codes defined in the phenotype configuration")
 806
 807    if target_code_type is not None and target_code_type not in phenotype["map"]:
 808        raise ValueError(
 809            f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}"
 810        )
 811
 812    if target_code_type is not None:
 813        _map_target_code_type(
 814            phen_path, phenotype, target_code_type, not_translate, no_metadata
 815        )
 816    else:
 817        for t in phenotype["map"]:
 818            _map_target_code_type(phen_path, phenotype, t, not_translate, no_metadata)
 819
 820    _logger.info(f"Phenotype processed successfully")
 821
 822
 823def _map_target_code_type(
 824    phen_path: Path,
 825    phenotype: dict,
 826    target_code_type: str,
 827    not_translate: bool,
 828    no_metadata: bool,
 829):
 830    _logger.debug(f"Target coding format: {target_code_type}")
 831    concepts_path = phen_path / CONCEPTS_DIR
 832    # Create output dataframe
 833    out = pd.DataFrame([])
 834    code_errors = []
 835
 836    # Process each folder in codes section
 837    for files in phenotype["concept_sets"]:
 838        concept_set_name = files["name"]
 839        if "metadata" in files:
 840            concept_set_metadata = files["metadata"]
 841        else:
 842            concept_set_metadata = {}
 843        for concept_set in files["files"]:
 844            _logger.debug(f"--- {concept_set} ---")
 845
 846            # Load code file
 847            codes_file_path = Path(concepts_path / concept_set["path"])
 848            df = _read_table_file(codes_file_path)
 849
 850            # process structural actions
 851            df = _process_actions(df, concept_set)
 852
 853            # preprocessing and validate of source concepts
 854            _logger.debug("Processing and validating source concept codes")
 855            df, errors = _preprocess_source_concepts(
 856                df,
 857                concept_set,
 858                codes_file_path,
 859            )
 860
 861            # create df with just the source code columns
 862            source_column_names = list(concept_set["columns"].keys())
 863            source_df = df[source_column_names]
 864
 865            _logger.debug(source_df.columns)
 866            _logger.debug(source_df.head())
 867
 868            _logger.debug(
 869                f"Length of errors from _preprocess_source_concepts {len(errors)}"
 870            )
 871            if len(errors) > 0:
 872                code_errors.extend(errors)
 873            _logger.debug(f" Length of code_errors {len(code_errors)}")
 874
 875            # Map source concepts codes to target codes
 876            # if processing a source coding list with categorical data
 877            if (
 878                "actions" in concept_set
 879                and "divide_col" in concept_set["actions"]
 880                and len(df) > 0
 881            ):
 882                divide_col = concept_set["actions"]["divide_col"]
 883                _logger.debug(f"Action: Dividing Table by {divide_col}")
 884                _logger.debug(f"column into: {df[divide_col].unique()}")
 885                df_grp = df.groupby(divide_col)
 886                for cat, grp in df_grp:
 887                    if cat == concept_set["category"]:
 888                        grp = grp.drop(
 889                            columns=[divide_col]
 890                        )  # delete categorical column
 891                        source_df = grp[source_column_names]
 892                        trans_out = translate_codes(
 893                            source_df,
 894                            target_code_type=target_code_type,
 895                            concept_name=concept_set_name,
 896                            not_translate=not_translate,
 897                        )
 898                        trans_out = add_metadata(
 899                            codes=trans_out,
 900                            metadata=concept_set_metadata,
 901                            no_metadata=no_metadata,
 902                        )
 903                        out = pd.concat([out, trans_out])
 904            else:
 905                source_df = df[source_column_names]
 906                trans_out = translate_codes(
 907                    source_df,
 908                    target_code_type=target_code_type,
 909                    concept_name=concept_set_name,
 910                    not_translate=not_translate,
 911                )
 912                trans_out = add_metadata(
 913                    codes=trans_out,
 914                    metadata=concept_set_metadata,
 915                    no_metadata=no_metadata,
 916                )
 917                out = pd.concat([out, trans_out])
 918
 919    if len(code_errors) > 0:
 920        _logger.error(f"The map processing has {len(code_errors)} errors")
 921        error_path = phen_path / MAP_DIR / "errors"
 922        error_path.mkdir(parents=True, exist_ok=True)
 923        error_filename = f"{target_code_type}-code-errors.csv"
 924        _write_code_errors(code_errors, error_path / error_filename)
 925
 926    # Check there is output from processing
 927    if len(out.index) == 0:
 928        _logger.error(f"No output after map processing")
 929        raise Exception(
 930            f"No output after map processing, check config {str(phen_path.resolve())}"
 931        )
 932
 933    # final processing
 934    out = out.reset_index(drop=True)
 935    out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
 936    out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"])
 937
 938    # out_count = len(out.index)
 939    # added metadata
 940    # Loop over each source_concept_type and perform the left join on all columns apart from source code columns
 941    # result_list = []
 942    # for files in phenotype["concept_sets"]:
 943    #     concept_set_name = files["name"]
 944    #     for concept_set in files["files"]:
 945    #         source_column_names = list(concept_set["columns"].keys())
 946    #         for source_concept_type in source_column_names:
 947    #             # Filter output based on the current source_concept_type
 948    #             out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type]
 949    #             filtered_count = len(out_filtered_df.index)
 950
 951    #             # Remove the source type columns except the current type will leave the metadata and the join
 952    #             remove_types = [
 953    #                 type for type in source_column_names if type != source_concept_type
 954    #             ]
 955    #             metadata_df = df.drop(columns=remove_types)
 956    #             metadata_df = metadata_df.rename(
 957    #                 columns={source_concept_type: "SOURCE_CONCEPT"}
 958    #             )
 959    #             metadata_df_count = len(metadata_df.index)
 960
 961    # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata
 962    # result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT")
 963    # result_count = len(result.index)
 964
 965    #             _logger.debug(
 966    #                 f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}"
 967    #             )
 968
 969    #             # Append the result to the result_list
 970    #             result_list.append(result)
 971
 972    # Concatenate all the results into a single DataFrame
 973    # final_out = pd.concat(result_list, ignore_index=True)
 974    # final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
 975    # _logger.debug(
 976    #     f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}"
 977    # )
 978
 979    # Save output to map directory
 980    output_filename = target_code_type + ".csv"
 981    map_path = phen_path / MAP_DIR / output_filename
 982    out.to_csv(map_path, index=False)
 983    _logger.info(f"Saved mapped concepts to {str(map_path.resolve())}")
 984
 985    # save concept sets as separate files
 986    concept_set_path = phen_path / CSV_PATH / target_code_type
 987
 988    # empty the concept-set directory except for hiddle files, e.g. .git
 989    if concept_set_path.exists():
 990        for item in concept_set_path.iterdir():
 991            if not item.name.startswith("."):
 992                item.unlink()
 993    else:
 994        concept_set_path.mkdir(parents=True, exist_ok=True)
 995
 996    # write each concept as a separate file
 997    for name, concept in out.groupby("CONCEPT_SET"):
 998        concept = concept.sort_values(by="CONCEPT")  # sort rows
 999        concept = concept.dropna(how="all", axis=1)  # remove empty cols
1000        concept = concept.reindex(
1001            sorted(concept.columns), axis=1
1002        )  # sort cols alphabetically
1003        filename = f"{name}.csv"
1004        concept_path = concept_set_path / filename
1005        concept.to_csv(concept_path, index=False)
1006
1007    write_vocab_version(phen_path)
1008
1009    _logger.info(f"Phenotype processed target code type {target_code_type}")
1010
1011
1012# Add metadata dict to each row of Df codes
1013def add_metadata(
1014    codes: pd.DataFrame,
1015    metadata: dict,
1016    no_metadata: bool,
1017) -> pd.DataFrame:
1018    """Add concept set metadata, stored as a dictionary, to each concept row"""
1019
1020    if not no_metadata:
1021        for meta_name, meta_value in metadata.items():
1022            codes[meta_name] = meta_value
1023            _logger.debug(
1024                f"Adding metadata for concept set: metadata name {meta_name}, metadata value {meta_value}"
1025            )
1026
1027    return codes
1028
1029
1030def _generate_version_tag(
1031    repo: git.Repo, increment: str = DEFAULT_VERSION_INC, use_v_prefix: bool = False
1032) -> str:
1033    # Get all valid semantic version tags
1034    versions = []
1035    for tag in repo.tags:
1036        if tag.name.startswith("v"):
1037            tag_name = tag.name[1:]  # Remove the first character
1038        else:
1039            tag_name = tag.name
1040        if semver.Version.is_valid(tag_name):
1041            versions.append(semver.Version.parse(tag_name))
1042
1043    _logger.debug(f"Versions: {versions}")
1044    # Determine the next version
1045    if not versions:
1046        new_version = semver.Version(0, 0, 1)
1047    else:
1048        latest_version = max(versions)
1049        if increment == "major":
1050            new_version = latest_version.bump_major()
1051        elif increment == "minor":
1052            new_version = latest_version.bump_minor()
1053        else:
1054            new_version = latest_version.bump_patch()
1055
1056    # Create the new tag
1057    new_version_str = f"v{new_version}" if use_v_prefix else str(new_version)
1058
1059    return new_version_str
1060
1061
1062def publish(
1063    phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC
1064):
1065    """Publishes updates to the phenotype by commiting all changes to the repo directory"""
1066
1067    # Validate config
1068    validate(phen_dir)
1069    phen_path = Path(phen_dir)
1070
1071    # load git repo and set the branch
1072    repo = git.Repo(phen_path)
1073    if DEFAULT_GIT_BRANCH in repo.branches:
1074        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
1075        main_branch.checkout()
1076    else:
1077        raise AttributeError(
1078            f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}"
1079        )
1080
1081    # check if any changes to publish
1082    if not repo.is_dirty() and not repo.untracked_files:
1083        if remote_url is not None and "origin" not in repo.remotes:
1084            _logger.info(f"First publish to remote url {remote_url}")
1085        else:
1086            _logger.info("Nothing to publish, no changes to the repo")
1087            return
1088
1089    # get next version
1090    new_version_str = _generate_version_tag(repo, increment)
1091    _logger.info(f"New version: {new_version_str}")
1092
1093    # Write version in configuration file
1094    config_path = phen_path / CONFIG_FILE
1095    with config_path.open("r") as file:
1096        config = yaml.safe_load(file)
1097
1098    config["phenotype"]["version"] = new_version_str
1099    with open(config_path, "w") as file:
1100        yaml.dump(
1101            config,
1102            file,
1103            Dumper=util.QuotedDumper,
1104            default_flow_style=False,
1105            sort_keys=False,
1106            default_style='"',
1107        )
1108
1109    # Add and commit changes to repo including version updates
1110    commit_message = f"Committing updates to phenotype {phen_path}"
1111    repo.git.add("--all")
1112    repo.index.commit(commit_message)
1113
1114    # Add tag to the repo
1115    repo.create_tag(new_version_str)
1116
1117    # push to origin if a remote repo
1118    if remote_url is not None and "origin" not in repo.remotes:
1119        git_url = _construct_git_url(remote_url)
1120        repo.create_remote("origin", git_url)
1121
1122    try:
1123        if "origin" in repo.remotes:
1124            _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}")
1125            origin = repo.remotes.origin
1126            _logger.info(f"Pushing main branch to remote repo")
1127            repo.git.push("--set-upstream", "origin", "main")
1128            _logger.info(f"Pushing version tags to remote git repo")
1129            origin.push(tags=True)
1130            _logger.debug("Changes pushed to 'origin'")
1131        else:
1132            _logger.debug("Remote 'origin' is not set")
1133    except Exception as e:
1134        tag_ref = repo.tags[new_version_str]
1135        repo.delete_tag(tag_ref)
1136        repo.git.reset("--soft", "HEAD~1")
1137        raise e
1138
1139    _logger.info(f"Phenotype published successfully")
1140
1141
1142def export(phen_dir: str, version: str):
1143    """Exports a phen repo at a specific tagged version into a target directory"""
1144    _logger.info(f"Exporting phenotype {phen_dir} at version {version}")
1145
1146    # validate configuration
1147    validate(phen_dir)
1148    phen_path = Path(phen_dir)
1149
1150    # load configuration
1151    config_path = phen_path / CONFIG_FILE
1152    with config_path.open("r") as file:
1153        config = yaml.safe_load(file)
1154
1155    map_path = phen_path / MAP_DIR
1156    if not map_path.exists():
1157        _logger.warning(f"Map path does not exist '{map_path}'")
1158
1159    export_path = phen_path / OMOP_PATH
1160    # check export directory exists and if not create it
1161    if not export_path.exists():
1162        export_path.mkdir(parents=True)
1163        _logger.debug(f"OMOP export directory '{export_path}' created.")
1164
1165    # omop export db
1166    export_db_path = omop.export(
1167        map_path,
1168        export_path,
1169        config["phenotype"]["version"],
1170        config["phenotype"]["omop"],
1171    )
1172
1173    # write to tables
1174    # export as csv
1175    _logger.info(f"Phenotype exported successfully")
1176
1177
1178def copy(phen_dir: str, target_dir: str, version: str):
1179    """Copys a phen repo at a specific tagged version into a target directory"""
1180
1181    # Validate
1182    validate(phen_dir)
1183    phen_path = Path(phen_dir)
1184
1185    # Check target directory exists
1186    target_path = Path(target_dir)
1187    if not target_path.exists():
1188        raise FileNotFoundError(f"The target directory {target_path} does not exist")
1189
1190    # Set copy directory
1191    copy_path = target_path / version
1192    _logger.info(f"Copying repo {phen_path} to {copy_path}")
1193
1194    if (
1195        copy_path.exists() and copy_path.is_dir()
1196    ):  # Check if it exists and is a directory
1197        copy = _check_delete_dir(
1198            copy_path,
1199            f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ",
1200        )
1201    else:
1202        copy = True
1203
1204    if not copy:
1205        _logger.info(f"Not copying the version {version}")
1206        return
1207
1208    _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...")
1209    repo = git.Repo.clone_from(phen_path, copy_path)
1210
1211    # Check out the latest commit or specified version
1212    if version:
1213        # Checkout a specific version (e.g., branch, tag, or commit hash)
1214        _logger.info(f"Checking out version {version}...")
1215        repo.git.checkout(version)
1216    else:
1217        # Checkout the latest commit (HEAD)
1218        _logger.info(f"Checking out the latest commit...")
1219        repo.git.checkout("HEAD")
1220
1221    _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}")
1222
1223    _logger.info(f"Phenotype copied successfully")
1224
1225
1226# Convert concept_sets list into dictionaries
1227def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1228    """Extracts concepts as {name: file_path} dictionary and a name set."""
1229    concepts_dict = {
1230        item["name"]: [file["path"] for file in item["files"]]
1231        for item in config_data["phenotype"]["concept_sets"]
1232    }
1233    name_set = set(concepts_dict.keys())
1234    return concepts_dict, name_set
1235
1236
1237def _extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]:
1238    """
1239    Extracts clean keys from a DeepDiff dictionary.
1240
1241    :param diff: DeepDiff result dictionary
1242    :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed")
1243    :return: A set of clean key names
1244    """
1245    return {key.split("root['")[1].split("']")[0] for key in diff.get(key_type, [])}
1246
1247
1248def diff_config(old_config: dict, new_config: dict) -> str:
1249    report = f"\n# Changes to phenotype configuration\n"
1250    report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n"
1251
1252    old_concepts, old_names = extract_concepts(old_config)
1253    new_concepts, new_names = extract_concepts(new_config)
1254
1255    # Check added and removed concept set names
1256    added_names = new_names - old_names  # Names that appear in new but not in old
1257    removed_names = old_names - new_names  # Names that were in old but not in new
1258
1259    # find file path changes for unchanged names
1260    unchanged_names = old_names & new_names  # Names that exist in both
1261    file_diff = DeepDiff(
1262        {name: old_concepts[name] for name in unchanged_names},
1263        {name: new_concepts[name] for name in unchanged_names},
1264    )
1265
1266    # Find renamed concepts (same file, different name)
1267    renamed_concepts = []
1268    for removed in removed_names:
1269        old_path = old_concepts[removed]
1270        for added in added_names:
1271            new_path = new_concepts[added]
1272            if old_path == new_path:
1273                renamed_concepts.append((removed, added))
1274
1275    # Remove renamed concepts from added and removed sets
1276    for old_name, new_name in renamed_concepts:
1277        added_names.discard(new_name)
1278        removed_names.discard(old_name)
1279
1280    # generate config report
1281    if added_names:
1282        report += "## Added Concepts\n"
1283        for name in added_names:
1284            report += f"- `{name}` (File: `{new_concepts[name]}`)\n"
1285        report += "\n"
1286
1287    if removed_names:
1288        report += "## Removed Concepts\n"
1289        for name in removed_names:
1290            report += f"- `{name}` (File: `{old_concepts[name]}`)\n"
1291        report += "\n"
1292
1293    if renamed_concepts:
1294        report += "## Renamed Concepts\n"
1295        for old_name, new_name in renamed_concepts:
1296            report += (
1297                f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n"
1298            )
1299        report += "\n"
1300
1301    if "values_changed" in file_diff:
1302        report += "## Updated File Paths\n"
1303        for name, change in file_diff["values_changed"].items():
1304            old_file = change["old_value"]
1305            new_file = change["new_value"]
1306            clean_name = name.split("root['")[1].split("']")[0]
1307            report += (
1308                f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n"
1309            )
1310        report += "\n"
1311
1312    if not (
1313        added_names
1314        or removed_names
1315        or renamed_concepts
1316        or file_diff.get("values_changed")
1317    ):
1318        report += "No changes in concept sets.\n"
1319
1320    return report
1321
1322
1323def diff_map_files(old_map_path: Path, new_map_path: Path) -> str:
1324    old_output_files = [
1325        file.name
1326        for file in old_map_path.iterdir()
1327        if file.is_file() and not file.name.startswith(".")
1328    ]
1329    new_output_files = [
1330        file.name
1331        for file in new_map_path.iterdir()
1332        if file.is_file() and not file.name.startswith(".")
1333    ]
1334
1335    # Convert the lists to sets for easy comparison
1336    old_output_set = set(old_output_files)
1337    new_output_set = set(new_output_files)
1338
1339    # Outputs that are in old_output_set but not in new_output_set (removed files)
1340    removed_outputs = old_output_set - new_output_set
1341    # Outputs that are in new_output_set but not in old_output_set (added files)
1342    added_outputs = new_output_set - old_output_set
1343    # Outputs that are the intersection of old_output_set and new_output_set
1344    common_outputs = old_output_set & new_output_set
1345
1346    report = f"\n# Changes to available translations\n"
1347    report += f"This compares the coding translations files available.\n\n"
1348    report += f"- Removed outputs: {sorted(list(removed_outputs))}\n"
1349    report += f"- Added outputs: {sorted(list(added_outputs))}\n"
1350    report += f"- Common outputs: {sorted(list(common_outputs))}\n\n"
1351
1352    # Step N: Compare common outputs between versions
1353    report += f"# Changes to concepts in translation files\n\n"
1354    report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n"
1355    for file in common_outputs:
1356        old_output = old_map_path / file
1357        new_output = new_map_path / file
1358
1359        _logger.debug(f"Old ouptput: {str(old_output.resolve())}")
1360        _logger.debug(f"New ouptput: {str(new_output.resolve())}")
1361
1362        df1 = pd.read_csv(old_output)
1363        df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1364        df2 = pd.read_csv(new_output)
1365        df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1366
1367        # Check for added and removed concepts
1368        report += f"- File {file}\n"
1369        sorted_list = sorted(list(set(df1.index) - set(df2.index)))
1370        report += f"- Removed concepts {sorted_list}\n"
1371        sorted_list = sorted(list(set(df2.index) - set(df1.index)))
1372        report += f"- Added concepts {sorted_list}\n"
1373
1374        # Check for changed concepts
1375        diff = df2 - df1  # diff in counts
1376        diff = diff[
1377            (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna()
1378        ]  # get non-zero counts
1379        s = "\n"
1380        if len(diff.index) > 0:
1381            for concept, row in diff.iterrows():
1382                s += "\t - {} {}\n".format(concept, row["CONCEPT"])
1383            report += f"- Changed concepts {s}\n\n"
1384        else:
1385            report += f"- Changed concepts []\n\n"
1386
1387    return report
1388
1389
1390def diff_phen(
1391    new_phen_path: Path,
1392    new_version: str,
1393    old_phen_path: Path,
1394    old_version: str,
1395    report_path: Path,
1396    not_check_config: bool,
1397):
1398    """Compare the differences between two versions of a phenotype"""
1399
1400    # write report heading
1401    report = f"# Phenotype Comparison Report\n"
1402
1403    # Step 1: check differences configuration files
1404    if not not_check_config:
1405        # validate phenotypes
1406        _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}")
1407        validate(str(old_phen_path.resolve()))
1408        _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}")
1409        validate(str(new_phen_path.resolve()))
1410
1411        # get old and new config
1412        old_config_path = old_phen_path / CONFIG_FILE
1413        with old_config_path.open("r") as file:
1414            old_config = yaml.safe_load(file)
1415        new_config_path = new_phen_path / CONFIG_FILE
1416        with new_config_path.open("r") as file:
1417            new_config = yaml.safe_load(file)
1418
1419        # write report
1420        report += f"## Original phenotype\n"
1421        report += f"  - {old_config['phenotype']['omop']['vocabulary_id']}\n"
1422        report += f"  - {old_version}\n"
1423        report += f"  - {str(old_phen_path.resolve())}\n"
1424        report += f"## Changed phenotype:\n"
1425        report += f"  - {new_config['phenotype']['omop']['vocabulary_id']}\n"
1426        report += f"  - {new_version}\n"
1427        report += f"  - {str(new_phen_path.resolve())}\n"
1428
1429        # Convert list of dicts into a dict: {name: file}
1430        report += diff_config(old_config, new_config)
1431
1432    # Step 2: check differences between map files
1433    # List files from output directories
1434    old_map_path = old_phen_path / MAP_DIR
1435    new_map_path = new_phen_path / MAP_DIR
1436    report += diff_map_files(old_map_path, new_map_path)
1437
1438    # initialise report file
1439    _logger.debug(f"Writing to report file {str(report_path.resolve())}")
1440    report_file = open(report_path, "w")
1441    report_file.write(report)
1442    report_file.close()
1443
1444    _logger.info(f"Phenotypes diff'd successfully")
1445
1446
1447def diff(
1448    phen_dir: str,
1449    version: str,
1450    old_phen_dir: str,
1451    old_version: str,
1452    not_check_config: bool,
1453):
1454    # make tmp directory .acmc
1455    timestamp = time.strftime("%Y%m%d_%H%M%S")
1456    temp_dir = Path(f".acmc/diff_{timestamp}")
1457
1458    changed_phen_path = Path(phen_dir)
1459    if not changed_phen_path.exists():
1460        raise ValueError(
1461            f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}"
1462        )
1463
1464    old_phen_path = Path(old_phen_dir)
1465    if not old_phen_path.exists():
1466        raise ValueError(
1467            f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}"
1468        )
1469
1470    #    t_path = old_phen_path / "config.yml"
1471    #    with t_path.open("r") as file:
1472    #        c = yaml.safe_load(file)
1473
1474    try:
1475        # Create the directory
1476        temp_dir.mkdir(parents=True, exist_ok=True)
1477        _logger.debug(f"Temporary directory created: {temp_dir}")
1478
1479        # Create temporary directories
1480        changed_path = temp_dir / "changed"
1481        changed_path.mkdir(parents=True, exist_ok=True)
1482        old_path = temp_dir / "old"
1483        old_path.mkdir(parents=True, exist_ok=True)
1484
1485        # checkout changed
1486        if version == "latest":
1487            _logger.debug(
1488                f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..."
1489            )
1490            shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True)
1491        else:
1492            _logger.debug(
1493                f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..."
1494            )
1495            changed_repo = git.Repo.clone_from(changed_phen_path, changed_path)
1496            changed_repo.git.checkout(version)
1497
1498        # checkout old
1499        if old_version == "latest":
1500            _logger.debug(
1501                f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1502            )
1503            shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True)
1504        else:
1505            _logger.debug(
1506                f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1507            )
1508            old_repo = git.Repo.clone_from(old_phen_dir, old_path)
1509            old_repo.git.checkout(old_version)
1510
1511        report_filename = f"{version}_{old_version}_diff.md"
1512        report_path = changed_phen_path / report_filename
1513        # diff old with new
1514        diff_phen(
1515            changed_path, version, old_path, old_version, report_path, not_check_config
1516        )
1517
1518    finally:
1519        # clean up tmp directory
1520        if temp_dir.exists():
1521            shutil.rmtree(temp_dir)
PHEN_DIR = 'phen'

Default phenotype directory name

DEFAULT_PHEN_PATH = PosixPath('workspace/phen')

Default phenotype directory path

CONCEPTS_DIR = 'concepts'

Default concepts directory name

MAP_DIR = 'map'

Default map directory name

CONCEPT_SET_DIR = 'concept-sets'

Default concept set directory name

CSV_PATH = PosixPath('concept-sets/csv')

Default CSV concept set directory path

OMOP_PATH = PosixPath('concept-sets/omop')

Default OMOP concept set directory path

DEFAULT_PHEN_DIR_LIST = ['concepts', 'map', 'concept-sets']

List of default phenotype directories

CONFIG_FILE = 'config.yml'

Default configuration filename

VOCAB_VERSION_FILE = 'vocab_version.yml'

Default vocabulary version filename

SEMANTIC_VERSION_TYPES = ['major', 'minor', 'patch']

List of semantic version increment types

DEFAULT_VERSION_INC = 'patch'

Default semantic version increment type

DEFAULT_GIT_BRANCH = 'main'

Default phenotype repo branch name

SPLIT_COL_ACTION = 'split_col'

Split column preprocessing action type

CODES_COL_ACTION = 'codes_col'

Codes column preprocessing action type

DIVIDE_COL_ACTION = 'divide_col'

Divide column preprocessing action type

COL_ACTIONS = ['split_col', 'codes_col', 'divide_col']

List of column preprocessing action types

CODE_FILE_TYPES = ['.xlsx', '.xls', '.csv']

List of supported source concept coding list file types

CONFIG_SCHEMA = {'phenotype': {'type': 'dict', 'required': True, 'schema': {'version': {'type': 'string', 'required': True, 'regex': '^\\d+\\.\\d+\\.\\d+$'}, 'omop': {'type': 'dict', 'required': True, 'schema': {'vocabulary_id': {'type': 'string', 'required': True}, 'vocabulary_name': {'type': 'string', 'required': True}, 'vocabulary_reference': {'type': 'string', 'required': True, 'regex': '^https?://.*'}}}, 'map': {'type': 'list', 'schema': {'type': 'string', 'allowed': ['read3', 'read2', 'snomed', 'opcs4', 'icd10', 'atc']}}, 'concept_sets': {'type': 'list', 'required': True, 'schema': {'type': 'dict', 'schema': {'name': {'type': 'string', 'required': True}, 'files': {'type': 'list', 'required': True, 'schema': {'type': 'dict', 'schema': {'path': {'type': 'string', 'required': True}, 'columns': {'type': 'dict', 'required': True}, 'category': {'type': 'string'}, 'actions': {'type': 'dict', 'schema': {'divide_col': {'type': 'string'}, 'split_col': {'type': 'string'}, 'codes_col': {'type': 'string'}}}}}}, 'metadata': {'type': 'dict', 'required': False}}}}}}}

Phenotype config.yml schema definition

class PhenValidationException(builtins.Exception):
163class PhenValidationException(Exception):
164    """Custom exception class raised when validation errors in phenotype configuration file"""
165
166    def __init__(self, message, validation_errors=None):
167        super().__init__(message)
168        self.validation_errors = validation_errors

Custom exception class raised when validation errors in phenotype configuration file

PhenValidationException(message, validation_errors=None)
166    def __init__(self, message, validation_errors=None):
167        super().__init__(message)
168        self.validation_errors = validation_errors
validation_errors
def init(phen_dir: str, remote_url: str):
237def init(phen_dir: str, remote_url: str):
238    """Initial phenotype directory as git repo with standard structure"""
239    _logger.info(f"Initialising Phenotype in directory: {phen_dir}")
240    phen_path = Path(phen_dir)
241
242    # check if directory already exists and ask user if they want to recreate it
243    if (
244        phen_path.exists() and phen_path.is_dir()
245    ):  # Check if it exists and is a directory
246        configure = _check_delete_dir(
247            phen_path,
248            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
249        )
250    else:
251        configure = True
252
253    if not configure:
254        _logger.info(f"Exiting, phenotype not initiatised")
255        return
256
257    # Initialise repo from local or remote
258    repo: Repo
259
260    # if remote then clone the repo otherwise init a local repo
261    if remote_url != None:
262        # add PAT token to the URL
263        git_url = _construct_git_url(remote_url)
264
265        # clone the repo
266        git_cmd = git.cmd.Git()
267        git_cmd.clone(git_url, phen_path)
268
269        # open repo
270        repo = Repo(phen_path)
271        # check if there are any commits (new repo has no commits)
272        if (
273            len(repo.branches) == 0 or repo.head.is_detached
274        ):  # Handle detached HEAD (e.g., after init)
275            _logger.debug("The phen repository has no commits yet.")
276            commit_count = 0
277        else:
278            # Get the total number of commits in the default branch
279            commit_count = sum(1 for _ in repo.iter_commits())
280            _logger.debug(f"Repo has previous commits: {commit_count}")
281    else:
282        # local repo, create the directories and init
283        phen_path.mkdir(parents=True, exist_ok=True)
284        _logger.debug(f"Phen directory '{phen_path}' has been created.")
285        repo = git.Repo.init(phen_path)
286        commit_count = 0
287
288    phen_path = phen_path.resolve()
289    # initialise empty repos
290    if commit_count == 0:
291        # create initial commit
292        initial_file_path = phen_path / "README.md"
293        with open(initial_file_path, "w") as file:
294            file.write(
295                "# Initial commit\nThis is the first commit in the phen repository.\n"
296            )
297        repo.index.add([initial_file_path])
298        repo.index.commit("Initial commit")
299        commit_count = 1
300
301    # Checkout the phens default branch, creating it if it does not exist
302    if DEFAULT_GIT_BRANCH in repo.branches:
303        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
304        main_branch.checkout()
305    else:
306        main_branch = repo.create_head(DEFAULT_GIT_BRANCH)
307        main_branch.checkout()
308
309    # if the phen path does not contain the config file then initialise the phen type
310    config_path = phen_path / CONFIG_FILE
311    if config_path.exists():
312        _logger.debug(f"Phenotype configuration files already exist")
313        return
314
315    _logger.info("Creating phen directory structure and config files")
316    for d in DEFAULT_PHEN_DIR_LIST:
317        _create_empty_git_dir(phen_path / d)
318
319    # create empty phen config file
320    config = {
321        "phenotype": {
322            "version": "0.0.0",
323            "omop": {
324                "vocabulary_id": "",
325                "vocabulary_name": "",
326                "vocabulary_reference": "",
327            },
328            "translate": [],
329            "concept_sets": [],
330        }
331    }
332
333    with open(phen_path / CONFIG_FILE, "w") as file:
334        yaml.dump(
335            config,
336            file,
337            Dumper=util.QuotedDumper,
338            default_flow_style=False,
339            sort_keys=False,
340            default_style='"',
341        )
342
343    # add git ignore
344    ignore_content = """# Ignore SQLite database files
345*.db
346*.sqlite3
347 
348# Ignore SQLite journal and metadata files
349*.db-journal
350*.sqlite3-journal
351
352# python
353.ipynb_checkpoints
354 """
355    ignore_path = phen_path / ".gitignore"
356    with open(ignore_path, "w") as file:
357        file.write(ignore_content)
358
359    # add to git repo and commit
360    for d in DEFAULT_PHEN_DIR_LIST:
361        repo.git.add(phen_path / d)
362    repo.git.add(all=True)
363    repo.index.commit("initialised the phen git repo.")
364
365    _logger.info(f"Phenotype initialised successfully")

Initial phenotype directory as git repo with standard structure

def fork( phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
368def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
369    """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
370
371    Args:
372        phen_dir (str): local directory path where the upstream repo is to be cloned
373        upstream_url (str): url to the upstream repo
374        upstream_version (str): version in the upstream repo to clone
375        new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
376
377    Raises:
378        ValueError: if the specified version is not in the upstream repo
379        ValueError: if the upstream repo is not a valid phenotype repo
380        ValueError: if there's any other problems with Git
381    """
382    _logger.info(
383        f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}"
384    )
385
386    phen_path = Path(phen_dir)
387    # check if directory already exists and ask user if they want to recreate it
388    if (
389        phen_path.exists() and phen_path.is_dir()
390    ):  # Check if it exists and is a directory
391        configure = _check_delete_dir(
392            phen_path,
393            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
394        )
395    else:
396        configure = True
397
398    if not configure:
399        _logger.info(f"Exiting, phenotype not initiatised")
400        return
401
402    try:
403        # Clone repo
404        git_url = _construct_git_url(upstream_url)
405        repo = git.Repo.clone_from(git_url, phen_path)
406
407        # Fetch all branches and tags
408        repo.remotes.origin.fetch()
409
410        # Check if the version exists
411        available_refs = [ref.name.split("/")[-1] for ref in repo.references]
412        if upstream_version not in available_refs:
413            raise ValueError(
414                f"Version '{upstream_version}' not found in the repository: {upstream_url}."
415            )
416
417        # Checkout the specified version
418        repo.git.checkout(upstream_version)
419        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
420        main_branch.checkout()
421
422        # Check if 'config.yml' exists in the root directory
423        config_path = phen_path / "config.yml"
424        if not os.path.isfile(config_path):
425            raise ValueError(
426                f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory."
427            )
428
429        # Validate the phenotype is compatible with the acmc tool
430        validate(str(phen_path.resolve()))
431
432        # Delete each tag locally
433        tags = repo.tags
434        for tag in tags:
435            repo.delete_tag(tag)
436            _logger.debug(f"Deleted tags from forked repo: {tag}")
437
438        # Add upstream remote
439        repo.create_remote("upstream", upstream_url)
440        remote = repo.remotes["origin"]
441        repo.delete_remote(remote)  # Remove existing origin
442
443        # Optionally set a new origin remote
444        if new_origin_url:
445            git_url = _construct_git_url(new_origin_url)
446            repo.create_remote("origin", git_url)
447            repo.git.push("--set-upstream", "origin", "main")
448
449        _logger.info(f"Repository forked successfully at {phen_path}")
450        _logger.info(f"Upstream set to {upstream_url}")
451        if new_origin_url:
452            _logger.info(f"Origin set to {new_origin_url}")
453
454    except Exception as e:
455        if phen_path.exists():
456            shutil.rmtree(phen_path)
457        raise ValueError(f"Error occurred during repository fork: {str(e)}")

Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"

Arguments:
  • phen_dir (str): local directory path where the upstream repo is to be cloned
  • upstream_url (str): url to the upstream repo
  • upstream_version (str): version in the upstream repo to clone
  • new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
Raises:
  • ValueError: if the specified version is not in the upstream repo
  • ValueError: if the upstream repo is not a valid phenotype repo
  • ValueError: if there's any other problems with Git
def validate(phen_dir: str):
460def validate(phen_dir: str):
461    """Validates the phenotype directory is a git repo with standard structure"""
462    _logger.info(f"Validating phenotype: {phen_dir}")
463    phen_path = Path(phen_dir)
464    if not phen_path.is_dir():
465        raise NotADirectoryError(
466            f"Error: '{str(phen_path.resolve())}' is not a directory"
467        )
468
469    config_path = phen_path / CONFIG_FILE
470    if not config_path.is_file():
471        raise FileNotFoundError(
472            f"Error: phen configuration file '{config_path}' does not exist."
473        )
474
475    concepts_path = phen_path / CONCEPTS_DIR
476    if not concepts_path.is_dir():
477        raise FileNotFoundError(
478            f"Error: source concepts directory {concepts_path} does not exist."
479        )
480
481    # Calidate the directory is a git repo
482    try:
483        git.Repo(phen_path)
484    except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError):
485        raise Exception(f"Phen directory {phen_path} is not a git repo")
486
487    # Load configuration File
488    if config_path.suffix == ".yml":
489        try:
490            with config_path.open("r") as file:
491                phenotype = yaml.safe_load(file)
492
493            validator = Validator(CONFIG_SCHEMA)
494            if validator.validate(phenotype):
495                _logger.debug("YAML structure is valid.")
496            else:
497                _logger.error(f"YAML structure validation failed: {validator.errors}")
498                raise Exception(f"YAML structure validation failed: {validator.errors}")
499        except yaml.YAMLError as e:
500            _logger.error(f"YAML syntax error: {e}")
501            raise e
502    else:
503        raise Exception(
504            f"Unsupported configuration filetype: {str(config_path.resolve())}"
505        )
506
507    # initiatise
508    validation_errors = []
509    phenotype = phenotype["phenotype"]
510    code_types = parse.CodeTypeParser().code_types
511
512    # check the version number is of the format vn.n.n
513    match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"])
514    if not match:
515        validation_errors.append(
516            f"Invalid version format in configuration file: {phenotype['version']}"
517        )
518
519    # create a list of all the concept set names defined in the concept set configuration
520    concept_set_names = []
521    for item in phenotype["concept_sets"]:
522        if item["name"] in concept_set_names:
523            validation_errors.append(
524                f"Duplicate concept set defined in concept sets {item['name'] }"
525            )
526        else:
527            concept_set_names.append(item["name"])
528
529    # check codes definition
530    for files in phenotype["concept_sets"]:
531        for item in files["files"]:
532            # check concepte code file exists
533            concept_code_file_path = concepts_path / item["path"]
534            if not concept_code_file_path.exists():
535                validation_errors.append(
536                    f"Coding file {str(concept_code_file_path.resolve())} does not exist"
537                )
538
539            # check concepte code file is not empty
540            if concept_code_file_path.stat().st_size == 0:
541                validation_errors.append(
542                    f"Coding file {str(concept_code_file_path.resolve())} is an empty file"
543                )
544
545            # check code file type is supported
546            if concept_code_file_path.suffix not in CODE_FILE_TYPES:
547                raise ValueError(
548                    f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types"
549                )
550
551            # check columns specified are a supported medical coding type
552            for column in item["columns"]:
553                if column not in code_types:
554                    validation_errors.append(
555                        f"Column type {column} for file {concept_code_file_path} is not supported"
556                    )
557
558            # check the actions are supported
559            if "actions" in item:
560                for action in item["actions"]:
561                    if action not in COL_ACTIONS:
562                        validation_errors.append(f"Action {action} is not supported")
563
564    if len(validation_errors) > 0:
565        _logger.error(validation_errors)
566        raise PhenValidationException(
567            f"Configuration file {str(config_path.resolve())} failed validation",
568            validation_errors,
569        )
570
571    _logger.info(f"Phenotype validated successfully")

Validates the phenotype directory is a git repo with standard structure

def translate_codes( source_df: pandas.core.frame.DataFrame, target_code_type: str, concept_name: str, not_translate: bool) -> pandas.core.frame.DataFrame:
665def translate_codes(
666    source_df: pd.DataFrame,
667    target_code_type: str,
668    concept_name: str,
669    not_translate: bool,
670) -> pd.DataFrame:
671    """Translates each source code type the source coding list into a target type and returns all conversions as a concept set"""
672
673    # codes = pd.DataFrame([], dtype=str)
674    codes = pd.DataFrame(
675        columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string"
676    )
677    # Convert codes to target type
678    _logger.info(f"Converting to target code type {target_code_type}")
679
680    for source_code_type in source_df.columns:
681        # if target code type is the same as thet source code type, no translation, just appending source as target
682        if source_code_type == target_code_type:
683            copy_df = pd.DataFrame(
684                {
685                    "SOURCE_CONCEPT": source_df[source_code_type],
686                    "SOURCE_CONCEPT_TYPE": source_code_type,
687                    "CONCEPT": source_df[source_code_type],
688                }
689            )
690            codes = pd.concat([codes, copy_df])
691            _logger.debug(
692                f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating"
693            )
694        elif not not_translate:
695            # get the translation filename using source to target code types
696            filename = f"{source_code_type}_to_{target_code_type}.parquet"
697            map_path = trud.PROCESSED_PATH / filename
698
699            # do the mapping if it exists
700            if map_path.exists():
701                # get mapping
702                df_map = pd.read_parquet(map_path)
703
704                # do mapping
705                translated_df = pd.merge(
706                    source_df[source_code_type], df_map, how="left"
707                )
708
709                # normalise the output
710                translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"])
711                translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type
712
713                # add to list of codes
714                codes = pd.concat([codes, translated_df])
715
716            else:
717                _logger.warning(
718                    f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist"
719                )
720
721    codes = codes.dropna()  # delete NaNs
722
723    # added concept set type to output if any translations
724    if len(codes.index) > 0:
725        codes["CONCEPT_SET"] = concept_name
726    else:
727        _logger.debug(f"No codes converted with target code type {target_code_type}")
728
729    return codes

Translates each source code type the source coding list into a target type and returns all conversions as a concept set

def write_vocab_version(phen_path: pathlib.Path):
751def write_vocab_version(phen_path: Path):
752    # write the vocab version files
753
754    if not trud.VERSION_PATH.exists():
755        raise FileNotFoundError(
756            f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed"
757        )
758
759    if not omop.VERSION_PATH.exists():
760        raise FileNotFoundError(
761            f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed"
762        )
763
764    with trud.VERSION_PATH.open("r") as file:
765        trud_version = yaml.safe_load(file)
766
767    with omop.VERSION_PATH.open("r") as file:
768        omop_version = yaml.safe_load(file)
769
770    # Create the combined YAML structure
771    version_data = {
772        "versions": {
773            "acmc": acmc.__version__,
774            "trud": trud_version,
775            "omop": omop_version,
776        }
777    }
778
779    with open(phen_path / VOCAB_VERSION_FILE, "w") as file:
780        yaml.dump(
781            version_data,
782            file,
783            Dumper=util.QuotedDumper,
784            default_flow_style=False,
785            sort_keys=False,
786            default_style='"',
787        )
def map( phen_dir: str, target_code_type: str, not_translate: bool, no_metadata: bool):
790def map(phen_dir: str, target_code_type: str, not_translate: bool, no_metadata: bool):
791    _logger.info(f"Processing phenotype: {phen_dir}")
792
793    # Validate configuration
794    validate(phen_dir)
795
796    # initialise paths
797    phen_path = Path(phen_dir)
798    config_path = phen_path / CONFIG_FILE
799
800    # load configuration
801    with config_path.open("r") as file:
802        config = yaml.safe_load(file)
803    phenotype = config["phenotype"]
804
805    if len(phenotype["map"]) == 0:
806        raise ValueError(f"No map codes defined in the phenotype configuration")
807
808    if target_code_type is not None and target_code_type not in phenotype["map"]:
809        raise ValueError(
810            f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}"
811        )
812
813    if target_code_type is not None:
814        _map_target_code_type(
815            phen_path, phenotype, target_code_type, not_translate, no_metadata
816        )
817    else:
818        for t in phenotype["map"]:
819            _map_target_code_type(phen_path, phenotype, t, not_translate, no_metadata)
820
821    _logger.info(f"Phenotype processed successfully")
def add_metadata( codes: pandas.core.frame.DataFrame, metadata: dict, no_metadata: bool) -> pandas.core.frame.DataFrame:
1014def add_metadata(
1015    codes: pd.DataFrame,
1016    metadata: dict,
1017    no_metadata: bool,
1018) -> pd.DataFrame:
1019    """Add concept set metadata, stored as a dictionary, to each concept row"""
1020
1021    if not no_metadata:
1022        for meta_name, meta_value in metadata.items():
1023            codes[meta_name] = meta_value
1024            _logger.debug(
1025                f"Adding metadata for concept set: metadata name {meta_name}, metadata value {meta_value}"
1026            )
1027
1028    return codes

Add concept set metadata, stored as a dictionary, to each concept row

def publish(phen_dir: str, msg: str, remote_url: str, increment: str = 'patch'):
1063def publish(
1064    phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC
1065):
1066    """Publishes updates to the phenotype by commiting all changes to the repo directory"""
1067
1068    # Validate config
1069    validate(phen_dir)
1070    phen_path = Path(phen_dir)
1071
1072    # load git repo and set the branch
1073    repo = git.Repo(phen_path)
1074    if DEFAULT_GIT_BRANCH in repo.branches:
1075        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
1076        main_branch.checkout()
1077    else:
1078        raise AttributeError(
1079            f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}"
1080        )
1081
1082    # check if any changes to publish
1083    if not repo.is_dirty() and not repo.untracked_files:
1084        if remote_url is not None and "origin" not in repo.remotes:
1085            _logger.info(f"First publish to remote url {remote_url}")
1086        else:
1087            _logger.info("Nothing to publish, no changes to the repo")
1088            return
1089
1090    # get next version
1091    new_version_str = _generate_version_tag(repo, increment)
1092    _logger.info(f"New version: {new_version_str}")
1093
1094    # Write version in configuration file
1095    config_path = phen_path / CONFIG_FILE
1096    with config_path.open("r") as file:
1097        config = yaml.safe_load(file)
1098
1099    config["phenotype"]["version"] = new_version_str
1100    with open(config_path, "w") as file:
1101        yaml.dump(
1102            config,
1103            file,
1104            Dumper=util.QuotedDumper,
1105            default_flow_style=False,
1106            sort_keys=False,
1107            default_style='"',
1108        )
1109
1110    # Add and commit changes to repo including version updates
1111    commit_message = f"Committing updates to phenotype {phen_path}"
1112    repo.git.add("--all")
1113    repo.index.commit(commit_message)
1114
1115    # Add tag to the repo
1116    repo.create_tag(new_version_str)
1117
1118    # push to origin if a remote repo
1119    if remote_url is not None and "origin" not in repo.remotes:
1120        git_url = _construct_git_url(remote_url)
1121        repo.create_remote("origin", git_url)
1122
1123    try:
1124        if "origin" in repo.remotes:
1125            _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}")
1126            origin = repo.remotes.origin
1127            _logger.info(f"Pushing main branch to remote repo")
1128            repo.git.push("--set-upstream", "origin", "main")
1129            _logger.info(f"Pushing version tags to remote git repo")
1130            origin.push(tags=True)
1131            _logger.debug("Changes pushed to 'origin'")
1132        else:
1133            _logger.debug("Remote 'origin' is not set")
1134    except Exception as e:
1135        tag_ref = repo.tags[new_version_str]
1136        repo.delete_tag(tag_ref)
1137        repo.git.reset("--soft", "HEAD~1")
1138        raise e
1139
1140    _logger.info(f"Phenotype published successfully")

Publishes updates to the phenotype by commiting all changes to the repo directory

def export(phen_dir: str, version: str):
1143def export(phen_dir: str, version: str):
1144    """Exports a phen repo at a specific tagged version into a target directory"""
1145    _logger.info(f"Exporting phenotype {phen_dir} at version {version}")
1146
1147    # validate configuration
1148    validate(phen_dir)
1149    phen_path = Path(phen_dir)
1150
1151    # load configuration
1152    config_path = phen_path / CONFIG_FILE
1153    with config_path.open("r") as file:
1154        config = yaml.safe_load(file)
1155
1156    map_path = phen_path / MAP_DIR
1157    if not map_path.exists():
1158        _logger.warning(f"Map path does not exist '{map_path}'")
1159
1160    export_path = phen_path / OMOP_PATH
1161    # check export directory exists and if not create it
1162    if not export_path.exists():
1163        export_path.mkdir(parents=True)
1164        _logger.debug(f"OMOP export directory '{export_path}' created.")
1165
1166    # omop export db
1167    export_db_path = omop.export(
1168        map_path,
1169        export_path,
1170        config["phenotype"]["version"],
1171        config["phenotype"]["omop"],
1172    )
1173
1174    # write to tables
1175    # export as csv
1176    _logger.info(f"Phenotype exported successfully")

Exports a phen repo at a specific tagged version into a target directory

def copy(phen_dir: str, target_dir: str, version: str):
1179def copy(phen_dir: str, target_dir: str, version: str):
1180    """Copys a phen repo at a specific tagged version into a target directory"""
1181
1182    # Validate
1183    validate(phen_dir)
1184    phen_path = Path(phen_dir)
1185
1186    # Check target directory exists
1187    target_path = Path(target_dir)
1188    if not target_path.exists():
1189        raise FileNotFoundError(f"The target directory {target_path} does not exist")
1190
1191    # Set copy directory
1192    copy_path = target_path / version
1193    _logger.info(f"Copying repo {phen_path} to {copy_path}")
1194
1195    if (
1196        copy_path.exists() and copy_path.is_dir()
1197    ):  # Check if it exists and is a directory
1198        copy = _check_delete_dir(
1199            copy_path,
1200            f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ",
1201        )
1202    else:
1203        copy = True
1204
1205    if not copy:
1206        _logger.info(f"Not copying the version {version}")
1207        return
1208
1209    _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...")
1210    repo = git.Repo.clone_from(phen_path, copy_path)
1211
1212    # Check out the latest commit or specified version
1213    if version:
1214        # Checkout a specific version (e.g., branch, tag, or commit hash)
1215        _logger.info(f"Checking out version {version}...")
1216        repo.git.checkout(version)
1217    else:
1218        # Checkout the latest commit (HEAD)
1219        _logger.info(f"Checking out the latest commit...")
1220        repo.git.checkout("HEAD")
1221
1222    _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}")
1223
1224    _logger.info(f"Phenotype copied successfully")

Copys a phen repo at a specific tagged version into a target directory

def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1228def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1229    """Extracts concepts as {name: file_path} dictionary and a name set."""
1230    concepts_dict = {
1231        item["name"]: [file["path"] for file in item["files"]]
1232        for item in config_data["phenotype"]["concept_sets"]
1233    }
1234    name_set = set(concepts_dict.keys())
1235    return concepts_dict, name_set

Extracts concepts as {name: file_path} dictionary and a name set.

def diff_config(old_config: dict, new_config: dict) -> str:
1249def diff_config(old_config: dict, new_config: dict) -> str:
1250    report = f"\n# Changes to phenotype configuration\n"
1251    report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n"
1252
1253    old_concepts, old_names = extract_concepts(old_config)
1254    new_concepts, new_names = extract_concepts(new_config)
1255
1256    # Check added and removed concept set names
1257    added_names = new_names - old_names  # Names that appear in new but not in old
1258    removed_names = old_names - new_names  # Names that were in old but not in new
1259
1260    # find file path changes for unchanged names
1261    unchanged_names = old_names & new_names  # Names that exist in both
1262    file_diff = DeepDiff(
1263        {name: old_concepts[name] for name in unchanged_names},
1264        {name: new_concepts[name] for name in unchanged_names},
1265    )
1266
1267    # Find renamed concepts (same file, different name)
1268    renamed_concepts = []
1269    for removed in removed_names:
1270        old_path = old_concepts[removed]
1271        for added in added_names:
1272            new_path = new_concepts[added]
1273            if old_path == new_path:
1274                renamed_concepts.append((removed, added))
1275
1276    # Remove renamed concepts from added and removed sets
1277    for old_name, new_name in renamed_concepts:
1278        added_names.discard(new_name)
1279        removed_names.discard(old_name)
1280
1281    # generate config report
1282    if added_names:
1283        report += "## Added Concepts\n"
1284        for name in added_names:
1285            report += f"- `{name}` (File: `{new_concepts[name]}`)\n"
1286        report += "\n"
1287
1288    if removed_names:
1289        report += "## Removed Concepts\n"
1290        for name in removed_names:
1291            report += f"- `{name}` (File: `{old_concepts[name]}`)\n"
1292        report += "\n"
1293
1294    if renamed_concepts:
1295        report += "## Renamed Concepts\n"
1296        for old_name, new_name in renamed_concepts:
1297            report += (
1298                f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n"
1299            )
1300        report += "\n"
1301
1302    if "values_changed" in file_diff:
1303        report += "## Updated File Paths\n"
1304        for name, change in file_diff["values_changed"].items():
1305            old_file = change["old_value"]
1306            new_file = change["new_value"]
1307            clean_name = name.split("root['")[1].split("']")[0]
1308            report += (
1309                f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n"
1310            )
1311        report += "\n"
1312
1313    if not (
1314        added_names
1315        or removed_names
1316        or renamed_concepts
1317        or file_diff.get("values_changed")
1318    ):
1319        report += "No changes in concept sets.\n"
1320
1321    return report
def diff_map_files(old_map_path: pathlib.Path, new_map_path: pathlib.Path) -> str:
1324def diff_map_files(old_map_path: Path, new_map_path: Path) -> str:
1325    old_output_files = [
1326        file.name
1327        for file in old_map_path.iterdir()
1328        if file.is_file() and not file.name.startswith(".")
1329    ]
1330    new_output_files = [
1331        file.name
1332        for file in new_map_path.iterdir()
1333        if file.is_file() and not file.name.startswith(".")
1334    ]
1335
1336    # Convert the lists to sets for easy comparison
1337    old_output_set = set(old_output_files)
1338    new_output_set = set(new_output_files)
1339
1340    # Outputs that are in old_output_set but not in new_output_set (removed files)
1341    removed_outputs = old_output_set - new_output_set
1342    # Outputs that are in new_output_set but not in old_output_set (added files)
1343    added_outputs = new_output_set - old_output_set
1344    # Outputs that are the intersection of old_output_set and new_output_set
1345    common_outputs = old_output_set & new_output_set
1346
1347    report = f"\n# Changes to available translations\n"
1348    report += f"This compares the coding translations files available.\n\n"
1349    report += f"- Removed outputs: {sorted(list(removed_outputs))}\n"
1350    report += f"- Added outputs: {sorted(list(added_outputs))}\n"
1351    report += f"- Common outputs: {sorted(list(common_outputs))}\n\n"
1352
1353    # Step N: Compare common outputs between versions
1354    report += f"# Changes to concepts in translation files\n\n"
1355    report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n"
1356    for file in common_outputs:
1357        old_output = old_map_path / file
1358        new_output = new_map_path / file
1359
1360        _logger.debug(f"Old ouptput: {str(old_output.resolve())}")
1361        _logger.debug(f"New ouptput: {str(new_output.resolve())}")
1362
1363        df1 = pd.read_csv(old_output)
1364        df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1365        df2 = pd.read_csv(new_output)
1366        df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1367
1368        # Check for added and removed concepts
1369        report += f"- File {file}\n"
1370        sorted_list = sorted(list(set(df1.index) - set(df2.index)))
1371        report += f"- Removed concepts {sorted_list}\n"
1372        sorted_list = sorted(list(set(df2.index) - set(df1.index)))
1373        report += f"- Added concepts {sorted_list}\n"
1374
1375        # Check for changed concepts
1376        diff = df2 - df1  # diff in counts
1377        diff = diff[
1378            (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna()
1379        ]  # get non-zero counts
1380        s = "\n"
1381        if len(diff.index) > 0:
1382            for concept, row in diff.iterrows():
1383                s += "\t - {} {}\n".format(concept, row["CONCEPT"])
1384            report += f"- Changed concepts {s}\n\n"
1385        else:
1386            report += f"- Changed concepts []\n\n"
1387
1388    return report
def diff_phen( new_phen_path: pathlib.Path, new_version: str, old_phen_path: pathlib.Path, old_version: str, report_path: pathlib.Path, not_check_config: bool):
1391def diff_phen(
1392    new_phen_path: Path,
1393    new_version: str,
1394    old_phen_path: Path,
1395    old_version: str,
1396    report_path: Path,
1397    not_check_config: bool,
1398):
1399    """Compare the differences between two versions of a phenotype"""
1400
1401    # write report heading
1402    report = f"# Phenotype Comparison Report\n"
1403
1404    # Step 1: check differences configuration files
1405    if not not_check_config:
1406        # validate phenotypes
1407        _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}")
1408        validate(str(old_phen_path.resolve()))
1409        _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}")
1410        validate(str(new_phen_path.resolve()))
1411
1412        # get old and new config
1413        old_config_path = old_phen_path / CONFIG_FILE
1414        with old_config_path.open("r") as file:
1415            old_config = yaml.safe_load(file)
1416        new_config_path = new_phen_path / CONFIG_FILE
1417        with new_config_path.open("r") as file:
1418            new_config = yaml.safe_load(file)
1419
1420        # write report
1421        report += f"## Original phenotype\n"
1422        report += f"  - {old_config['phenotype']['omop']['vocabulary_id']}\n"
1423        report += f"  - {old_version}\n"
1424        report += f"  - {str(old_phen_path.resolve())}\n"
1425        report += f"## Changed phenotype:\n"
1426        report += f"  - {new_config['phenotype']['omop']['vocabulary_id']}\n"
1427        report += f"  - {new_version}\n"
1428        report += f"  - {str(new_phen_path.resolve())}\n"
1429
1430        # Convert list of dicts into a dict: {name: file}
1431        report += diff_config(old_config, new_config)
1432
1433    # Step 2: check differences between map files
1434    # List files from output directories
1435    old_map_path = old_phen_path / MAP_DIR
1436    new_map_path = new_phen_path / MAP_DIR
1437    report += diff_map_files(old_map_path, new_map_path)
1438
1439    # initialise report file
1440    _logger.debug(f"Writing to report file {str(report_path.resolve())}")
1441    report_file = open(report_path, "w")
1442    report_file.write(report)
1443    report_file.close()
1444
1445    _logger.info(f"Phenotypes diff'd successfully")

Compare the differences between two versions of a phenotype

def diff( phen_dir: str, version: str, old_phen_dir: str, old_version: str, not_check_config: bool):
1448def diff(
1449    phen_dir: str,
1450    version: str,
1451    old_phen_dir: str,
1452    old_version: str,
1453    not_check_config: bool,
1454):
1455    # make tmp directory .acmc
1456    timestamp = time.strftime("%Y%m%d_%H%M%S")
1457    temp_dir = Path(f".acmc/diff_{timestamp}")
1458
1459    changed_phen_path = Path(phen_dir)
1460    if not changed_phen_path.exists():
1461        raise ValueError(
1462            f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}"
1463        )
1464
1465    old_phen_path = Path(old_phen_dir)
1466    if not old_phen_path.exists():
1467        raise ValueError(
1468            f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}"
1469        )
1470
1471    #    t_path = old_phen_path / "config.yml"
1472    #    with t_path.open("r") as file:
1473    #        c = yaml.safe_load(file)
1474
1475    try:
1476        # Create the directory
1477        temp_dir.mkdir(parents=True, exist_ok=True)
1478        _logger.debug(f"Temporary directory created: {temp_dir}")
1479
1480        # Create temporary directories
1481        changed_path = temp_dir / "changed"
1482        changed_path.mkdir(parents=True, exist_ok=True)
1483        old_path = temp_dir / "old"
1484        old_path.mkdir(parents=True, exist_ok=True)
1485
1486        # checkout changed
1487        if version == "latest":
1488            _logger.debug(
1489                f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..."
1490            )
1491            shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True)
1492        else:
1493            _logger.debug(
1494                f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..."
1495            )
1496            changed_repo = git.Repo.clone_from(changed_phen_path, changed_path)
1497            changed_repo.git.checkout(version)
1498
1499        # checkout old
1500        if old_version == "latest":
1501            _logger.debug(
1502                f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1503            )
1504            shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True)
1505        else:
1506            _logger.debug(
1507                f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1508            )
1509            old_repo = git.Repo.clone_from(old_phen_dir, old_path)
1510            old_repo.git.checkout(old_version)
1511
1512        report_filename = f"{version}_{old_version}_diff.md"
1513        report_path = changed_phen_path / report_filename
1514        # diff old with new
1515        diff_phen(
1516            changed_path, version, old_path, old_version, report_path, not_check_config
1517        )
1518
1519    finally:
1520        # clean up tmp directory
1521        if temp_dir.exists():
1522            shutil.rmtree(temp_dir)