acmc.phen

Phenotype Module

This module provides functionality for managing phenotypes.

   1"""
   2Phenotype Module
   3================
   4This module provides functionality for managing phenotypes.
   5"""
   6
   7import argparse
   8import pandas as pd
   9import numpy as np
  10import json
  11import os
  12import sqlite3
  13import sys
  14import shutil
  15import time
  16import git
  17import re
  18import logging
  19import requests
  20import yaml
  21import semver
  22from git import Repo
  23from cerberus import Validator
  24from deepdiff import DeepDiff
  25from pathlib import Path
  26from urllib.parse import urlparse, urlunparse
  27from typing import Tuple, Set, Any
  28
  29import acmc
  30from acmc import trud, omop, parse, util
  31
  32# setup logging
  33import acmc.logging_config as lc
  34
  35logger = lc.setup_logger()
  36
  37pd.set_option("mode.chained_assignment", None)
  38
  39PHEN_DIR = "phen"
  40DEFAULT_PHEN_PATH = Path("./workspace") / PHEN_DIR
  41
  42CONCEPTS_DIR = "concepts"
  43MAP_DIR = "map"
  44CONCEPT_SET_DIR = "concept-sets"
  45CSV_PATH = Path(CONCEPT_SET_DIR) / "csv"
  46OMOP_PATH = Path(CONCEPT_SET_DIR) / "omop"
  47DEFAULT_PHEN_DIR_LIST = [CONCEPTS_DIR, MAP_DIR, CONCEPT_SET_DIR]
  48CONFIG_FILE = "config.yaml"
  49VOCAB_VERSION_FILE = "vocab_version.yaml"
  50SEMANTIC_VERSION_TYPES = ["major", "minor", "patch"]
  51DEFAULT_VERSION_INC = "patch"
  52
  53DEFAULT_GIT_BRANCH = "main"
  54
  55SPLIT_COL_ACTION = "split_col"
  56CODES_COL_ACTION = "codes_col"
  57DIVIDE_COL_ACTION = "divide_col"
  58COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION]
  59
  60CODE_FILE_TYPES = [".xlsx", ".xls", ".csv"]
  61SOURCE_COL_SUFFIX = "_acmc_source"
  62TARGET_COL_SUFFIX = "_acmc_target"
  63
  64# config.yaml schema
  65CONFIG_SCHEMA = {
  66    "phenotype": {
  67        "type": "dict",
  68        "required": True,
  69        "schema": {
  70            "version": {
  71                "type": "string",
  72                "required": True,
  73                "regex": r"^\d+\.\d+\.\d+$",  # Enforces 'vN.N.N' format
  74            },
  75            "omop": {
  76                "type": "dict",
  77                "required": True,
  78                "schema": {
  79                    "vocabulary_id": {"type": "string", "required": True},
  80                    "vocabulary_name": {"type": "string", "required": True},
  81                    "vocabulary_reference": {
  82                        "type": "string",
  83                        "required": True,
  84                        "regex": r"^https?://.*",  # Ensures it's a URL
  85                    },
  86                },
  87            },
  88            "map": {
  89                "type": "list",
  90                "schema": {
  91                    "type": "string",
  92                    "allowed": list(
  93                        parse.SUPPORTED_CODE_TYPES
  94                    ),  # Ensure only predefined values are allowed
  95                },
  96            },
  97            "concept_sets": {
  98                "type": "list",
  99                "required": True,
 100                "schema": {
 101                    "type": "dict",
 102                    "schema": {
 103                        "name": {"type": "string", "required": True},
 104                        "file": {
 105                            "type": "dict",
 106                            "required": False,
 107                            "schema": {
 108                                "path": {"type": "string", "required": True},
 109                                "columns": {"type": "dict", "required": True},
 110                                "category": {
 111                                    "type": "string"
 112                                },  # Optional but must be string if present
 113                                "actions": {
 114                                    "type": "dict",
 115                                    "schema": {"divide_col": {"type": "string"}},
 116                                },
 117                            },
 118                        },
 119                        "metadata": {"type": "dict", "required": True},
 120                    },
 121                },
 122            },
 123        },
 124    }
 125}
 126
 127
 128class PhenValidationException(Exception):
 129    """Custom exception class raised when validation errors in phenotype configuration file"""
 130
 131    def __init__(self, message, validation_errors=None):
 132        super().__init__(message)
 133        self.validation_errors = validation_errors
 134
 135
 136def construct_git_url(remote_url: str):
 137    """Constructs a git url for github or gitlab including a PAT token environment variable"""
 138    # check the url
 139    parsed_url = urlparse(remote_url)
 140
 141    # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd
 142    # need to update this function if the URL scheme is different.
 143    if "github.com" in parsed_url.netloc:
 144        # get GitHub PAT from environment variable
 145        auth = os.getenv("ACMC_GITHUB_PAT")
 146        if not auth:
 147            raise ValueError(
 148                "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable."
 149            )
 150    else:
 151        # get GitLab PAT from environment variable
 152        auth = os.getenv("ACMC_GITLAB_PAT")
 153        if not auth:
 154            raise ValueError(
 155                "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable."
 156            )
 157        auth = f"oauth2:{auth}"
 158
 159    # Construct the new URL with credentials
 160    new_netloc = f"{auth}@{parsed_url.netloc}"
 161    return urlunparse(
 162        (
 163            parsed_url.scheme,
 164            new_netloc,
 165            parsed_url.path,
 166            parsed_url.params,
 167            parsed_url.query,
 168            parsed_url.fragment,
 169        )
 170    )
 171
 172
 173def create_empty_git_dir(path: Path):
 174    """Creates a directory with a .gitkeep file so that it's tracked in git"""
 175    path.mkdir(exist_ok=True)
 176    keep_path = path / ".gitkeep"
 177    keep_path.touch(exist_ok=True)
 178
 179
 180def check_delete_dir(path: Path, msg: str) -> bool:
 181    """Checks on the command line if a user wants to delete a directory
 182
 183    Args:
 184        path (Path): path of the directory to be deleted
 185        msg (str): message to be displayed to the user
 186
 187    Returns:
 188        Boolean: True if deleted
 189    """
 190    deleted = False
 191
 192    user_input = input(f"{msg}").strip().lower()
 193    if user_input in ["yes", "y"]:
 194        shutil.rmtree(path)
 195        deleted = True
 196    else:
 197        logger.info("Directory was not deleted.")
 198
 199    return deleted
 200
 201
 202def fork(
 203    phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str = None
 204):
 205    """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
 206
 207    Args:
 208        phen_dir (str): local directory path where the upstream repo is to be cloned
 209        upstream_url (str): url to the upstream repo
 210        upstream_version (str): version in the upstream repo to clone
 211        new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
 212
 213    Raises:
 214        ValueError: if the specified version is not in the upstream repo
 215        ValueError: if the upstream repo is not a valid phenotype repo
 216        ValueError: if there's any other problems with Git
 217    """
 218    logger.info(
 219        f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}"
 220    )
 221
 222    phen_path = Path(phen_dir)
 223    # check if directory already exists and ask user if they want to recreate it
 224    if (
 225        phen_path.exists() and phen_path.is_dir()
 226    ):  # Check if it exists and is a directory
 227        configure = check_delete_dir(
 228            phen_path,
 229            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
 230        )
 231    else:
 232        configure = True
 233
 234    if not configure:
 235        logger.info(f"Exiting, phenotype not initiatised")
 236        return
 237
 238    try:
 239        # Clone repo
 240        git_url = construct_git_url(upstream_url)
 241        repo = git.Repo.clone_from(git_url, phen_path)
 242
 243        # Fetch all branches and tags
 244        repo.remotes.origin.fetch()
 245
 246        # Check if the version exists
 247        available_refs = [ref.name.split("/")[-1] for ref in repo.references]
 248        if upstream_version not in available_refs:
 249            raise ValueError(
 250                f"Version '{upstream_version}' not found in the repository: {upstream_url}."
 251            )
 252
 253        # Checkout the specified version
 254        repo.git.checkout(upstream_version)
 255        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
 256        main_branch.checkout()
 257
 258        # Check if 'config.yaml' exists in the root directory
 259        config_path = phen_path / "config.yaml"
 260        if not os.path.isfile(config_path):
 261            raise ValueError(
 262                f"The forked repository is not a valid ACMC repo because 'config.yaml' is missing in the root directory."
 263            )
 264
 265        # Validate the phenotype is compatible with the acmc tool
 266        validate(str(phen_path.resolve()))
 267
 268        # Delete each tag locally
 269        tags = repo.tags
 270        for tag in tags:
 271            repo.delete_tag(tag)
 272            logger.debug(f"Deleted tags from forked repo: {tag}")
 273
 274        # Add upstream remote
 275        repo.create_remote("upstream", upstream_url)
 276        remote = repo.remotes["origin"]
 277        repo.delete_remote(remote)  # Remove existing origin
 278
 279        # Optionally set a new origin remote
 280        if new_origin_url:
 281            git_url = construct_git_url(new_origin_url)
 282            repo.create_remote("origin", git_url)
 283            repo.git.push("--set-upstream", "origin", "main")
 284
 285        logger.info(f"Repository forked successfully at {phen_path}")
 286        logger.info(f"Upstream set to {upstream_url}")
 287        if new_origin_url:
 288            logger.info(f"Origin set to {new_origin_url}")
 289
 290    except Exception as e:
 291        if phen_path.exists():
 292            shutil.rmtree(phen_path)
 293        raise ValueError(f"Error occurred during repository fork: {str(e)}")
 294
 295
 296def init(phen_dir: str, remote_url: str):
 297    """Initial phenotype directory as git repo with standard structure"""
 298    logger.info(f"Initialising Phenotype in directory: {phen_dir}")
 299    phen_path = Path(phen_dir)
 300
 301    # check if directory already exists and ask user if they want to recreate it
 302    if (
 303        phen_path.exists() and phen_path.is_dir()
 304    ):  # Check if it exists and is a directory
 305        configure = check_delete_dir(
 306            phen_path,
 307            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
 308        )
 309    else:
 310        configure = True
 311
 312    if not configure:
 313        logger.info(f"Exiting, phenotype not initiatised")
 314        return
 315
 316    # Initialise repo from local or remote
 317    repo: Repo
 318    # if remote then clone the repo otherwise init a local repo
 319    if remote_url != None:
 320        # add PAT token to the URL
 321        git_url = construct_git_url(remote_url)
 322
 323        # clone the repo
 324        repo = git.cmd.Git()
 325        repo.clone(git_url, phen_path)
 326
 327        # open repo
 328        repo = Repo(phen_path)
 329        # check if there are any commits (new repo has no commits)
 330        if (
 331            len(repo.branches) == 0 or repo.head.is_detached
 332        ):  # Handle detached HEAD (e.g., after init)
 333            logger.debug("The phen repository has no commits yet.")
 334            commit_count = 0
 335        else:
 336            # Get the total number of commits in the default branch
 337            commit_count = sum(1 for _ in repo.iter_commits())
 338            logger.debug(f"Repo has previous commits: {commit_count}")
 339    else:
 340        # local repo, create the directories and init
 341        phen_path.mkdir(parents=True, exist_ok=True)
 342        logger.debug(f"Phen directory '{phen_path}' has been created.")
 343        repo = git.Repo.init(phen_path)
 344        commit_count = 0
 345
 346    phen_path = phen_path.resolve()
 347    # initialise empty repos
 348    if commit_count == 0:
 349        # create initial commit
 350        initial_file_path = phen_path / "README.md"
 351        with open(initial_file_path, "w") as file:
 352            file.write(
 353                "# Initial commit\nThis is the first commit in the phen repository.\n"
 354            )
 355        repo.index.add([initial_file_path])
 356        repo.index.commit("Initial commit")
 357        commit_count = 1
 358
 359    # Checkout the phens default branch, creating it if it does not exist
 360    if DEFAULT_GIT_BRANCH in repo.branches:
 361        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
 362        main_branch.checkout()
 363    else:
 364        main_branch = repo.create_head(DEFAULT_GIT_BRANCH)
 365        main_branch.checkout()
 366
 367    # if the phen path does not contain the config file then initialise the phen type
 368    config_path = phen_path / CONFIG_FILE
 369    if config_path.exists():
 370        logger.debug(f"Phenotype configuration files already exist")
 371        return
 372
 373    logger.info("Creating phen directory structure and config files")
 374    for d in DEFAULT_PHEN_DIR_LIST:
 375        create_empty_git_dir(phen_path / d)
 376
 377    # create empty phen config file
 378    config = {
 379        "phenotype": {
 380            "version": "0.0.0",
 381            "omop": {
 382                "vocabulary_id": "",
 383                "vocabulary_name": "",
 384                "vocabulary_reference": "",
 385            },
 386            "translate": [],
 387            "concept_sets": [],
 388        }
 389    }
 390
 391    with open(phen_path / CONFIG_FILE, "w") as file:
 392        yaml.dump(
 393            config,
 394            file,
 395            Dumper=util.QuotedDumper,
 396            default_flow_style=False,
 397            sort_keys=False,
 398            default_style='"',
 399        )
 400
 401    # add git ignore
 402    ignore_content = """# Ignore SQLite database files
 403*.db
 404*.sqlite3
 405 
 406# Ignore SQLite journal and metadata files
 407*.db-journal
 408*.sqlite3-journal
 409
 410# python
 411.ipynb_checkpoints
 412 """
 413    ignore_path = phen_path / ".gitignore"
 414    with open(ignore_path, "w") as file:
 415        file.write(ignore_content)
 416
 417    # add to git repo and commit
 418    for d in DEFAULT_PHEN_DIR_LIST:
 419        repo.git.add(phen_path / d)
 420    repo.git.add(all=True)
 421    repo.index.commit("initialised the phen git repo.")
 422
 423    logger.info(f"Phenotype initialised successfully")
 424
 425
 426def validate(phen_dir: str):
 427    """Validates the phenotype directory is a git repo with standard structure"""
 428    logger.info(f"Validating phenotype: {phen_dir}")
 429    phen_path = Path(phen_dir)
 430    if not phen_path.is_dir():
 431        raise NotADirectoryError(
 432            f"Error: '{str(phen_path.resolve())}' is not a directory"
 433        )
 434
 435    config_path = phen_path / CONFIG_FILE
 436    if not config_path.is_file():
 437        raise FileNotFoundError(
 438            f"Error: phen configuration file '{config_path}' does not exist."
 439        )
 440
 441    concepts_path = phen_path / CONCEPTS_DIR
 442    if not concepts_path.is_dir():
 443        raise FileNotFoundError(
 444            f"Error: source concepts directory {concepts_path} does not exist."
 445        )
 446
 447    # Calidate the directory is a git repo
 448    try:
 449        git.Repo(phen_path)
 450    except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError):
 451        raise Exception(f"Phen directory {phen_path} is not a git repo")
 452
 453    # Load configuration File
 454    if config_path.suffix == ".yaml":
 455        try:
 456            with config_path.open("r") as file:
 457                phenotype = yaml.safe_load(file)
 458
 459            validator = Validator(CONFIG_SCHEMA)
 460            if validator.validate(phenotype):
 461                logger.debug("YAML structure is valid.")
 462            else:
 463                logger.error(f"YAML structure validation failed: {validator.errors}")
 464                raise Exception(f"YAML structure validation failed: {validator.errors}")
 465        except yaml.YAMLError as e:
 466            logger.error(f"YAML syntax error: {e}")
 467            raise e
 468    else:
 469        raise Exception(
 470            f"Unsupported configuration filetype: {str(config_path.resolve())}"
 471        )
 472
 473    # initiatise
 474    validation_errors = []
 475    phenotype = phenotype["phenotype"]
 476    code_types = parse.CodeTypeParser().code_types
 477
 478    # check the version number is of the format vn.n.n
 479    match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"])
 480    if not match:
 481        validation_errors.append(
 482            f"Invalid version format in configuration file: {phenotype['version']}"
 483        )
 484
 485    # create a list of all the concept set names defined in the concept set configuration
 486    concept_set_names = []
 487    for item in phenotype["concept_sets"]:
 488        if item["name"] in concept_set_names:
 489            validation_errors.append(
 490                f"Duplicate concept set defined in concept sets {item['name'] }"
 491            )
 492        else:
 493            concept_set_names.append(item["name"])
 494
 495    # check codes definition
 496    for item in phenotype["concept_sets"]:
 497        # check concepte code file exists
 498        concept_code_file_path = concepts_path / item["file"]["path"]
 499        if not concept_code_file_path.exists():
 500            validation_errors.append(
 501                f"Coding file {str(concept_code_file_path.resolve())} does not exist"
 502            )
 503
 504        # check concepte code file is not empty
 505        if concept_code_file_path.stat().st_size == 0:
 506            validation_errors.append(
 507                f"Coding file {str(concept_code_file_path.resolve())} is an empty file"
 508            )
 509
 510        # check code file type is supported
 511        if concept_code_file_path.suffix not in CODE_FILE_TYPES:
 512            raise ValueError(
 513                f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types"
 514            )
 515
 516        # check columns specified are a supported medical coding type
 517        for column in item["file"]["columns"]:
 518            if column not in code_types:
 519                validation_errors.append(
 520                    f"Column type {column} for file {concept_code_file_path} is not supported"
 521                )
 522
 523        # check the actions are supported
 524        if "actions" in item["file"]:
 525            for action in item["file"]["actions"]:
 526                if action not in COL_ACTIONS:
 527                    validation_errors.append(f"Action {action} is not supported")
 528
 529    if len(validation_errors) > 0:
 530        logger.error(validation_errors)
 531        raise PhenValidationException(
 532            f"Configuration file {str(config_path.resolve())} failed validation",
 533            validation_errors,
 534        )
 535
 536    logger.info(f"Phenotype validated successfully")
 537
 538
 539def read_table_file(path: Path, excel_sheet: str = ""):
 540    """
 541    Load Code List File
 542    """
 543
 544    path = path.resolve()
 545    if path.suffix == ".csv":
 546        df = pd.read_csv(path, dtype=str)
 547    elif path.suffix == ".xlsx" or path.suffix == ".xls":
 548        if excel_sheet != "":
 549            df = pd.read_excel(path, sheet_name=excel_sheet, dtype=str)
 550        else:
 551            df = pd.read_excel(path, dtype=str)
 552    elif path.suffix == ".dta":
 553        df = pd.read_stata(path)
 554    else:
 555        raise ValueError(
 556            f"Unsupported filetype {path.suffix}, only support{CODE_FILE_TYPES} code file types"
 557        )
 558
 559    return df
 560
 561
 562def process_actions(df: pd.DataFrame, concept_set: dict) -> pd.DataFrame:
 563    # Perform Structural Changes to file before preprocessing
 564    logger.debug("Processing file structural actions")
 565    if (
 566        "actions" in concept_set["file"]
 567        and "split_col" in concept_set["file"]["actions"]
 568        and "codes_col" in concept_set["file"]["actions"]
 569    ):
 570        split_col = concept_set["file"]["actions"]["split_col"]
 571        codes_col = concept_set["file"]["actions"]["codes_col"]
 572        logger.debug(
 573            "Action: Splitting",
 574            split_col,
 575            "column into:",
 576            df[split_col].unique(),
 577        )
 578        codes = df[codes_col]
 579        oh = pd.get_dummies(df[split_col], dtype=bool)  # one hot encode
 580        oh = oh.where((oh != True), codes, axis=0)  # fill in 1s with codes
 581        oh[oh == False] = np.nan  # replace 0s with None
 582        df = pd.concat([df, oh], axis=1)  # merge in new columns
 583
 584    return df
 585
 586
 587# Perform QA Checks on columns individually and append to df
 588def preprocess_source_concepts(
 589    df: pd.DataFrame, concept_set: dict, code_file_path: Path
 590) -> Tuple[pd.DataFrame, list]:
 591    """Parses each column individually - Order and length will not be preserved!"""
 592    out = pd.DataFrame([])  # create output df to append to
 593    code_errors = []  # list of errors from processing
 594
 595    # remove unnamed columns due to extra commas, missing headers, or incorrect parsing
 596    df = df.drop(columns=[col for col in df.columns if "Unnamed" in col])
 597
 598    # Preprocess codes
 599    code_types = parse.CodeTypeParser().code_types
 600    for code_type in concept_set["file"]["columns"]:
 601        parser = code_types[code_type]
 602        logger.info(f"Processing {code_type} codes for {code_file_path}")
 603
 604        # get codes by column name
 605        source_col_name = concept_set["file"]["columns"][code_type]
 606        codes = df[source_col_name].dropna()
 607        codes = codes.astype(str)  # convert to string
 608        codes = codes.str.strip()  # remove excess spaces
 609
 610        # process codes, validating them using parser and returning the errors
 611        codes, errors = parser.process(codes, code_file_path)
 612        if len(errors) > 0:
 613            code_errors.extend(errors)
 614            logger.warning(f"Codes validation failed with {len(errors)} errors")
 615
 616        # add processed codes to df
 617        new_col_name = f"{source_col_name}_SOURCE"
 618        df = df.rename(columns={source_col_name: new_col_name})
 619        process_codes = pd.DataFrame({code_type: codes}).join(df)
 620        out = pd.concat(
 621            [out, process_codes],
 622            ignore_index=True,
 623        )
 624
 625    logger.debug(out.head())
 626
 627    return out, code_errors
 628
 629
 630def get_code_type_from_col_name(col_name: str):
 631    return col_name.split("_")[0]
 632
 633
 634# Translate Df with multiple codes into single code type Series
 635def translate_codes(
 636    source_df: pd.DataFrame, target_code_type: str, concept_name: str
 637) -> pd.DataFrame:
 638    """Translates each source code type the source coding list into a target type and returns all conversions as a concept set"""
 639
 640    # codes = pd.DataFrame([], dtype=str)
 641    codes = pd.DataFrame(
 642        columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string"
 643    )
 644    # Convert codes to target type
 645    logger.info(f"Converting to target code type {target_code_type}")
 646
 647    for source_code_type in source_df.columns:
 648
 649        # if target code type is the same as thet source code type, no translation, just appending source as target
 650        if source_code_type == target_code_type:
 651            copy_df = pd.DataFrame(
 652                {
 653                    "SOURCE_CONCEPT": source_df[source_code_type],
 654                    "SOURCE_CONCEPT_TYPE": source_code_type,
 655                    "CONCEPT": source_df[source_code_type],
 656                }
 657            )
 658            codes = pd.concat([codes, copy_df])
 659            logger.debug(
 660                f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating"
 661            )
 662        else:
 663            # get the translation filename using source to target code types
 664            filename = f"{source_code_type}_to_{target_code_type}.parquet"
 665            map_path = trud.PROCESSED_PATH / filename
 666
 667            # do the mapping if it exists
 668            if map_path.exists():
 669                # get mapping
 670                df_map = pd.read_parquet(map_path)
 671
 672                # do mapping
 673                translated_df = pd.merge(
 674                    source_df[source_code_type], df_map, how="left"
 675                )
 676
 677                # normalise the output
 678                translated_df.columns = ["SOURCE_CONCEPT", "CONCEPT"]
 679                translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type
 680
 681                # add to list of codes
 682                codes = pd.concat([codes, translated_df])
 683
 684            else:
 685                logger.warning(
 686                    f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist"
 687                )
 688
 689    codes = codes.dropna()  # delete NaNs
 690
 691    # added concept set type to output if any translations
 692    if len(codes.index) > 0:
 693        codes["CONCEPT_SET"] = concept_name
 694    else:
 695        logger.debug(f"No codes converted with target code type {target_code_type}")
 696
 697    return codes
 698
 699
 700def sql_row_exist(
 701    conn: sqlite3.Connection, table: str, column: str, value: str
 702) -> bool:
 703    # Execute and check if a result exists
 704    cur = conn.cursor()
 705    query = f"SELECT 1 FROM {table} WHERE {column} = ? LIMIT 1;"
 706    cur.execute(query, (value,))
 707    exists = cur.fetchone() is not None
 708
 709    return exists
 710
 711
 712def write_code_errors(code_errors: list, code_errors_path: Path):
 713    err_df = pd.DataFrame(
 714        [
 715            {
 716                "CONCEPT": ", ".join(err.codes[~err.mask].tolist()),
 717                "VOCABULARY": err.code_type,
 718                "SOURCE": err.codes_file,
 719                "CAUSE": err.message,
 720            }
 721            for err in code_errors
 722        ]
 723    )
 724
 725    err_df = err_df.drop_duplicates()  # Remove Duplicates from Error file
 726    err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"])
 727    err_df.to_csv(code_errors_path, index=False, mode="w")
 728
 729
 730def write_vocab_version(phen_path: Path):
 731    # write the vocab version files
 732
 733    if not trud.VERSION_PATH.exists():
 734        raise FileNotFoundError(
 735            f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed"
 736        )
 737
 738    if not omop.VERSION_PATH.exists():
 739        raise FileNotFoundError(
 740            f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed"
 741        )
 742
 743    with trud.VERSION_PATH.open("r") as file:
 744        trud_version = yaml.safe_load(file)
 745
 746    with omop.VERSION_PATH.open("r") as file:
 747        omop_version = yaml.safe_load(file)
 748
 749    # Create the combined YAML structure
 750    version_data = {
 751        "versions": {
 752            "acmc": acmc.__version__,
 753            "trud": trud_version,
 754            "omop": omop_version,
 755        }
 756    }
 757
 758    with open(phen_path / VOCAB_VERSION_FILE, "w") as file:
 759        yaml.dump(
 760            version_data,
 761            file,
 762            Dumper=util.QuotedDumper,
 763            default_flow_style=False,
 764            sort_keys=False,
 765            default_style='"',
 766        )
 767
 768
 769def map(phen_dir: str, target_code_type: str):
 770    logger.info(f"Processing phenotype: {phen_dir}")
 771
 772    # Validate configuration
 773    validate(phen_dir)
 774
 775    # initialise paths
 776    phen_path = Path(phen_dir)
 777    config_path = phen_path / CONFIG_FILE
 778
 779    # load configuration
 780    with config_path.open("r") as file:
 781        config = yaml.safe_load(file)
 782    phenotype = config["phenotype"]
 783
 784    if len(phenotype["map"]) == 0:
 785        raise ValueError(f"No map codes defined in the phenotype configuration")
 786
 787    if target_code_type is not None and target_code_type not in phenotype["map"]:
 788        raise ValueError(
 789            f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}"
 790        )
 791
 792    if target_code_type is not None:
 793        map_target_code_type(phen_path, phenotype, target_code_type)
 794    else:
 795        for t in phenotype["map"]:
 796            map_target_code_type(phen_path, phenotype, t)
 797
 798    logger.info(f"Phenotype processed successfully")
 799
 800
 801def map_target_code_type(phen_path: Path, phenotype: dict, target_code_type: str):
 802    logger.debug(f"Target coding format: {target_code_type}")
 803    concepts_path = phen_path / CONCEPTS_DIR
 804    # Create output dataframe
 805    out = pd.DataFrame([])
 806    code_errors = []
 807
 808    # Process each folder in codes section
 809    for concept_set in phenotype["concept_sets"]:
 810        logger.debug(f"--- {concept_set['file']} ---")
 811
 812        # Load code file
 813        codes_file_path = Path(concepts_path / concept_set["file"]["path"])
 814        df = read_table_file(codes_file_path)
 815
 816        # process structural actions
 817        df = process_actions(df, concept_set)
 818
 819        # preprocessing and validate of source concepts
 820        logger.debug("Processing and validating source concept codes")
 821        df, errors = preprocess_source_concepts(
 822            df,
 823            concept_set,
 824            codes_file_path,
 825        )
 826
 827        # create df with just the source code columns
 828        source_column_names = list(concept_set["file"]["columns"].keys())
 829        source_df = df[source_column_names]
 830
 831        logger.debug(source_df.columns)
 832        logger.debug(source_df.head())
 833
 834        logger.debug(f"Length of errors from preprocess_source_concepts {len(errors)}")
 835        if len(errors) > 0:
 836            code_errors.extend(errors)
 837        logger.debug(f" Length of code_errors {len(code_errors)}")
 838
 839        # Map source concepts codes to target codes
 840        # if processing a source coding list with categorical data
 841        if (
 842            "actions" in concept_set["file"]
 843            and "divide_col" in concept_set["file"]["actions"]
 844            and len(df) > 0
 845        ):
 846            divide_col = concept_set["file"]["actions"]["divide_col"]
 847            logger.debug(f"Action: Dividing Table by {divide_col}")
 848            logger.debug(f"column into: {df[divide_col].unique()}")
 849            df_grp = df.groupby(divide_col)
 850            for cat, grp in df_grp:
 851                if cat == concept_set["file"]["category"]:
 852                    grp = grp.drop(columns=[divide_col])  # delete categorical column
 853                    source_df = grp[source_column_names]
 854                    trans_out = translate_codes(
 855                        source_df,
 856                        target_code_type=target_code_type,
 857                        concept_name=concept_set["name"],
 858                    )
 859                    out = pd.concat([out, trans_out])
 860        else:
 861            source_df = df[source_column_names]
 862            trans_out = translate_codes(
 863                source_df,
 864                target_code_type=target_code_type,
 865                concept_name=concept_set["name"],
 866            )
 867            out = pd.concat([out, trans_out])
 868
 869    if len(code_errors) > 0:
 870        logger.error(f"The map processing has {len(code_errors)} errors")
 871        error_path = phen_path / MAP_DIR / "errors"
 872        error_path.mkdir(parents=True, exist_ok=True)
 873        error_filename = f"{target_code_type}-code-errors.csv"
 874        write_code_errors(code_errors, error_path / error_filename)
 875
 876    # Check there is output from processing
 877    if len(out.index) == 0:
 878        logger.error(f"No output after map processing")
 879        raise Exception(
 880            f"No output after map processing, check config {str(phen_path.resolve())}"
 881        )
 882
 883    # final processing
 884    out = out.reset_index(drop=True)
 885    out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
 886    out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"])
 887
 888    out_count = len(out.index)
 889    # added metadata
 890    # Loop over each source_concept_type and perform the left join on all columns apart from source code columns
 891    result_list = []
 892    source_column_names = list(concept_set["file"]["columns"].keys())
 893    for source_concept_type in source_column_names:
 894
 895        # Filter output based on the current source_concept_type
 896        out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type]
 897        filtered_count = len(out_filtered_df.index)
 898
 899        # Remove the source type columns except the current type will leave the metadata and the join
 900        remove_types = [
 901            type for type in source_column_names if type != source_concept_type
 902        ]
 903        metadata_df = df.drop(columns=remove_types)
 904        metadata_df = metadata_df.rename(
 905            columns={source_concept_type: "SOURCE_CONCEPT"}
 906        )
 907        metadata_df_count = len(metadata_df.index)
 908
 909        # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata
 910        result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT")
 911        result_count = len(result.index)
 912
 913        logger.debug(
 914            f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}"
 915        )
 916
 917        # Append the result to the result_list
 918        result_list.append(result)
 919
 920    # Concatenate all the results into a single DataFrame
 921    final_out = pd.concat(result_list, ignore_index=True)
 922    final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
 923    logger.debug(
 924        f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}"
 925    )
 926
 927    # Save output to map directory
 928    output_filename = target_code_type + ".csv"
 929    map_path = phen_path / MAP_DIR / output_filename
 930    final_out.to_csv(map_path, index=False)
 931    logger.info(f"Saved mapped concepts to {str(map_path.resolve())}")
 932
 933    # save concept sets as separate files
 934    concept_set_path = phen_path / CSV_PATH / target_code_type
 935
 936    # empty the concept-set directory except for hiddle files, e.g. .git
 937    if concept_set_path.exists():
 938        for item in concept_set_path.iterdir():
 939            if not item.name.startswith("."):
 940                item.unlink()
 941    else:
 942        concept_set_path.mkdir(parents=True, exist_ok=True)
 943
 944    # write each concept as a separate file
 945    for name, concept in final_out.groupby("CONCEPT_SET"):
 946        concept = concept.sort_values(by="CONCEPT")  # sort rows
 947        concept = concept.dropna(how="all", axis=1)  # remove empty cols
 948        concept = concept.reindex(
 949            sorted(concept.columns), axis=1
 950        )  # sort cols alphabetically
 951        filename = f"{name}.csv"
 952        concept_path = concept_set_path / filename
 953        concept.to_csv(concept_path, index=False)
 954
 955    write_vocab_version(phen_path)
 956
 957    logger.info(f"Phenotype processed target code type {target_code_type}")
 958
 959
 960def generate_version_tag(
 961    repo: git.Repo, increment: str = DEFAULT_VERSION_INC, use_v_prefix: bool = False
 962) -> str:
 963    # Get all valid semantic version tags
 964    versions = []
 965    for tag in repo.tags:
 966        tag_name = (
 967            tag.name.lstrip("v") if use_v_prefix else tag.name
 968        )  # Remove 'v' if needed
 969        if semver.Version.is_valid(tag_name):
 970            versions.append(semver.Version.parse(tag_name))
 971
 972    # Determine the next version
 973    if not versions:
 974        new_version = semver.Version(0, 0, 1)
 975    else:
 976        latest_version = max(versions)
 977        if increment == "major":
 978            new_version = latest_version.bump_major()
 979        elif increment == "minor":
 980            new_version = latest_version.bump_minor()
 981        else:
 982            new_version = latest_version.bump_patch()
 983
 984    # Create the new tag
 985    new_version_str = f"v{new_version}" if use_v_prefix else str(new_version)
 986
 987    return new_version_str
 988
 989
 990def publish(
 991    phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC
 992):
 993    """Publishes updates to the phenotype by commiting all changes to the repo directory"""
 994
 995    # Validate config
 996    validate(phen_dir)
 997    phen_path = Path(phen_dir)
 998
 999    # load git repo and set the branch
1000    repo = git.Repo(phen_path)
1001    if DEFAULT_GIT_BRANCH in repo.branches:
1002        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
1003        main_branch.checkout()
1004    else:
1005        raise AttributeError(
1006            f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}"
1007        )
1008
1009    # check if any changes to publish
1010    if not repo.is_dirty() and not repo.untracked_files:
1011        if remote_url is not None and "origin" not in repo.remotes:
1012            logger.info(f"First publish to remote url {remote_url}")
1013        else:
1014            logger.info("Nothing to publish, no changes to the repo")
1015            return
1016
1017    # get next version
1018    new_version_str = generate_version_tag(repo, increment)
1019    logger.info(f"New version: {new_version_str}")
1020
1021    # Write version in configuration file
1022    config_path = phen_path / CONFIG_FILE
1023    with config_path.open("r") as file:
1024        config = yaml.safe_load(file)
1025
1026    config["phenotype"]["version"] = new_version_str
1027    with open(config_path, "w") as file:
1028        yaml.dump(
1029            config,
1030            file,
1031            Dumper=util.QuotedDumper,
1032            default_flow_style=False,
1033            sort_keys=False,
1034            default_style='"',
1035        )
1036
1037    # Add and commit changes to repo including version updates
1038    commit_message = f"Committing updates to phenotype {phen_path}"
1039    repo.git.add("--all")
1040    repo.index.commit(commit_message)
1041
1042    # Add tag to the repo
1043    repo.create_tag(new_version_str)
1044
1045    # push to origin if a remote repo
1046    if remote_url is not None and "origin" not in repo.remotes:
1047        git_url = construct_git_url(remote_url)
1048        repo.create_remote("origin", git_url)
1049
1050    try:
1051        if "origin" in repo.remotes:
1052            logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}")
1053            origin = repo.remotes.origin
1054            logger.info(f"Pushing main branch to remote repo")
1055            repo.git.push("--set-upstream", "origin", "main")
1056            logger.info(f"Pushing version tags to remote git repo")
1057            origin.push(tags=True)
1058            logger.debug("Changes pushed to 'origin'")
1059        else:
1060            logger.debug("Remote 'origin' is not set")
1061    except Exception as e:
1062        tag_ref = repo.tags[new_version_str]
1063        repo.delete_tag(tag_ref)
1064        repo.git.reset("--soft", "HEAD~1")
1065        raise e
1066
1067    logger.info(f"Phenotype published successfully")
1068
1069
1070def export(phen_dir: str, version: str):
1071    """Exports a phen repo at a specific tagged version into a target directory"""
1072    logger.info(f"Exporting phenotype {phen_dir} at version {version}")
1073
1074    # validate configuration
1075    validate(phen_dir)
1076    phen_path = Path(phen_dir)
1077
1078    # load configuration
1079    config_path = phen_path / CONFIG_FILE
1080    with config_path.open("r") as file:
1081        config = yaml.safe_load(file)
1082
1083    map_path = phen_path / MAP_DIR
1084    if not map_path.exists():
1085        logger.warning(f"Map path does not exist '{map_path}'")
1086
1087    export_path = phen_path / OMOP_PATH
1088    # check export directory exists and if not create it
1089    if not export_path.exists():
1090        export_path.mkdir(parents=True)
1091        logger.debug(f"OMOP export directory '{export_path}' created.")
1092
1093    # omop export db
1094    export_db_path = omop.export(
1095        map_path,
1096        export_path,
1097        config["phenotype"]["version"],
1098        config["phenotype"]["omop"],
1099    )
1100
1101    # write to tables
1102    # export as csv
1103    logger.info(f"Phenotype exported successfully")
1104
1105
1106def copy(phen_dir: str, target_dir: str, version: str):
1107    """Copys a phen repo at a specific tagged version into a target directory"""
1108
1109    # Validate
1110    validate(phen_dir)
1111    phen_path = Path(phen_dir)
1112
1113    # Check target directory exists
1114    target_path = Path(target_dir)
1115    if not target_path.exists():
1116        raise FileNotFoundError(f"The target directory {target_path} does not exist")
1117
1118    # Set copy directory
1119    copy_path = target_path / version
1120    logger.info(f"Copying repo {phen_path} to {copy_path}")
1121
1122    if (
1123        copy_path.exists() and copy_path.is_dir()
1124    ):  # Check if it exists and is a directory
1125        copy = check_delete_dir(
1126            copy_path,
1127            f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ",
1128        )
1129    else:
1130        copy = True
1131
1132    if not copy:
1133        logger.info(f"Not copying the version {version}")
1134        return
1135
1136    logger.debug(f"Cloning repo from {phen_path} into {copy_path}...")
1137    repo = git.Repo.clone_from(phen_path, copy_path)
1138
1139    # Check out the latest commit or specified version
1140    if version:
1141        # Checkout a specific version (e.g., branch, tag, or commit hash)
1142        logger.info(f"Checking out version {version}...")
1143        repo.git.checkout(version)
1144    else:
1145        # Checkout the latest commit (HEAD)
1146        logger.info(f"Checking out the latest commit...")
1147        repo.git.checkout("HEAD")
1148
1149    logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}")
1150
1151    logger.info(f"Phenotype copied successfully")
1152
1153
1154# Convert concept_sets list into dictionaries
1155def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1156    """Extracts concepts as {name: file_path} dictionary and a name set."""
1157    concepts_dict = {
1158        item["name"]: item["file"]["path"]
1159        for item in config_data["phenotype"]["concept_sets"]
1160    }
1161    name_set = set(concepts_dict.keys())
1162    return concepts_dict, name_set
1163
1164
1165def extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]:
1166    """
1167    Extracts clean keys from a DeepDiff dictionary.
1168
1169    :param diff: DeepDiff result dictionary
1170    :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed")
1171    :return: A set of clean key names
1172    """
1173    return {key.split("root['")[1].split("']")[0] for key in diff.get(key_type, [])}
1174
1175
1176def diff_config(old_config: dict, new_config: dict) -> str:
1177    report = f"\n# Changes to phenotype configuration\n"
1178    report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n"
1179
1180    old_concepts, old_names = extract_concepts(old_config)
1181    new_concepts, new_names = extract_concepts(new_config)
1182
1183    # Check added and removed names
1184    added_names = new_names - old_names  # Names that appear in new but not in old
1185    removed_names = old_names - new_names  # Names that were in old but not in new
1186
1187    # find file path changes for unchanged names
1188    unchanged_names = old_names & new_names  # Names that exist in both
1189    file_diff = DeepDiff(
1190        {name: old_concepts[name] for name in unchanged_names},
1191        {name: new_concepts[name] for name in unchanged_names},
1192    )
1193
1194    # Find renamed concepts (same file, different name)
1195    renamed_concepts = []
1196    for removed in removed_names:
1197        old_path = old_concepts[removed]
1198        for added in added_names:
1199            new_path = new_concepts[added]
1200            if old_path == new_path:
1201                renamed_concepts.append((removed, added))
1202
1203    # Remove renamed concepts from added and removed sets
1204    for old_name, new_name in renamed_concepts:
1205        added_names.discard(new_name)
1206        removed_names.discard(old_name)
1207
1208    # generate config report
1209    if added_names:
1210        report += "## Added Concepts\n"
1211        for name in added_names:
1212            report += f"- `{name}` (File: `{new_concepts[name]}`)\n"
1213        report += "\n"
1214
1215    if removed_names:
1216        report += "## Removed Concepts\n"
1217        for name in removed_names:
1218            report += f"- `{name}` (File: `{old_concepts[name]}`)\n"
1219        report += "\n"
1220
1221    if renamed_concepts:
1222        report += "## Renamed Concepts\n"
1223        for old_name, new_name in renamed_concepts:
1224            report += (
1225                f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n"
1226            )
1227        report += "\n"
1228
1229    if "values_changed" in file_diff:
1230        report += "## Updated File Paths\n"
1231        for name, change in file_diff["values_changed"].items():
1232            old_file = change["old_value"]
1233            new_file = change["new_value"]
1234            clean_name = name.split("root['")[1].split("']")[0]
1235            report += (
1236                f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n"
1237            )
1238        report += "\n"
1239
1240    if not (
1241        added_names
1242        or removed_names
1243        or renamed_concepts
1244        or file_diff.get("values_changed")
1245    ):
1246        report += "No changes in concept sets.\n"
1247
1248    return report
1249
1250
1251def diff_map_files(old_map_path: Path, new_map_path: Path) -> str:
1252    old_output_files = [
1253        file.name
1254        for file in old_map_path.iterdir()
1255        if file.is_file() and not file.name.startswith(".")
1256    ]
1257    new_output_files = [
1258        file.name
1259        for file in new_map_path.iterdir()
1260        if file.is_file() and not file.name.startswith(".")
1261    ]
1262
1263    # Convert the lists to sets for easy comparison
1264    old_output_set = set(old_output_files)
1265    new_output_set = set(new_output_files)
1266
1267    # Outputs that are in old_output_set but not in new_output_set (removed files)
1268    removed_outputs = old_output_set - new_output_set
1269    # Outputs that are in new_output_set but not in old_output_set (added files)
1270    added_outputs = new_output_set - old_output_set
1271    # Outputs that are the intersection of old_output_set and new_output_set
1272    common_outputs = old_output_set & new_output_set
1273
1274    report = f"\n# Changes to available translations\n"
1275    report += f"This compares the coding translations files available.\n\n"
1276    report += f"- Removed outputs: {sorted(list(removed_outputs))}\n"
1277    report += f"- Added outputs: {sorted(list(added_outputs))}\n"
1278    report += f"- Common outputs: {sorted(list(common_outputs))}\n\n"
1279
1280    # Step N: Compare common outputs between versions
1281    report += f"# Changes to concepts in translation files\n\n"
1282    report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n"
1283    for file in common_outputs:
1284        old_output = old_map_path / file
1285        new_output = new_map_path / file
1286
1287        logger.debug(f"Old ouptput: {str(old_output.resolve())}")
1288        logger.debug(f"New ouptput: {str(new_output.resolve())}")
1289
1290        df1 = pd.read_csv(old_output)
1291        df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1292        df2 = pd.read_csv(new_output)
1293        df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1294
1295        # Check for added and removed concepts
1296        report += f"- File {file}\n"
1297        sorted_list = sorted(list(set(df1.index) - set(df2.index)))
1298        report += f"- Removed concepts {sorted_list}\n"
1299        sorted_list = sorted(list(set(df2.index) - set(df1.index)))
1300        report += f"- Added concepts {sorted_list}\n"
1301
1302        # Check for changed concepts
1303        diff = df2 - df1  # diff in counts
1304        diff = diff[
1305            (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna()
1306        ]  # get non-zero counts
1307        s = "\n"
1308        if len(diff.index) > 0:
1309            for concept, row in diff.iterrows():
1310                s += "\t - {} {}\n".format(concept, row["CONCEPT"])
1311            report += f"- Changed concepts {s}\n\n"
1312        else:
1313            report += f"- Changed concepts []\n\n"
1314
1315    return report
1316
1317
1318def diff_phen(
1319    new_phen_path: Path,
1320    new_version: str,
1321    old_phen_path: Path,
1322    old_version: str,
1323    report_path: Path,
1324):
1325    """Compare the differences between two versions of a phenotype"""
1326
1327    # validate phenotypes
1328    logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}")
1329    validate(str(old_phen_path.resolve()))
1330    logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}")
1331    validate(str(new_phen_path.resolve()))
1332
1333    # get old and new config
1334    old_config_path = old_phen_path / CONFIG_FILE
1335    with old_config_path.open("r") as file:
1336        old_config = yaml.safe_load(file)
1337    new_config_path = new_phen_path / CONFIG_FILE
1338    with new_config_path.open("r") as file:
1339        new_config = yaml.safe_load(file)
1340
1341    # write report heading
1342    report = f"# Phenotype Comparison Report\n"
1343    report += f"## Original phenotype\n"
1344    report += f"  - {old_config['phenotype']['omop']['vocabulary_id']}\n"
1345    report += f"  - {old_version}\n"
1346    report += f"  - {str(old_phen_path.resolve())}\n"
1347    report += f"## Changed phenotype:\n"
1348    report += f"  - {new_config['phenotype']['omop']['vocabulary_id']}\n"
1349    report += f"  - {new_version}\n"
1350    report += f"  - {str(new_phen_path.resolve())}\n"
1351
1352    # Step 1: check differences configuration files
1353    # Convert list of dicts into a dict: {name: file}
1354    report += diff_config(old_config, new_config)
1355
1356    # Step 2: check differences between map files
1357    # List files from output directories
1358    old_map_path = old_phen_path / MAP_DIR
1359    new_map_path = new_phen_path / MAP_DIR
1360    report += diff_map_files(old_map_path, new_map_path)
1361
1362    # initialise report file
1363    logger.debug(f"Writing to report file {str(report_path.resolve())}")
1364    report_file = open(report_path, "w")
1365    report_file.write(report)
1366    report_file.close()
1367
1368    logger.info(f"Phenotypes diff'd successfully")
1369
1370
1371def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str):
1372    # make tmp directory .acmc
1373    timestamp = time.strftime("%Y%m%d_%H%M%S")
1374    temp_dir = Path(f".acmc/diff_{timestamp}")
1375
1376    changed_phen_path = Path(phen_dir)
1377    if not changed_phen_path.exists():
1378        raise ValueError(
1379            f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}"
1380        )
1381
1382    old_phen_path = Path(old_phen_dir)
1383    if not old_phen_path.exists():
1384        raise ValueError(
1385            f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}"
1386        )
1387
1388    try:
1389        # Create the directory
1390        temp_dir.mkdir(parents=True, exist_ok=True)
1391        logger.debug(f"Temporary directory created: {temp_dir}")
1392
1393        # Create temporary directories
1394        changed_path = temp_dir / "changed"
1395        changed_path.mkdir(parents=True, exist_ok=True)
1396        old_path = temp_dir / "old"
1397        old_path.mkdir(parents=True, exist_ok=True)
1398
1399        # checkout changed
1400        if version == "latest":
1401            logger.debug(
1402                f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..."
1403            )
1404            shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True)
1405        else:
1406            logger.debug(
1407                f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..."
1408            )
1409            changed_repo = git.Repo.clone_from(changed_phen_path, changed_path)
1410            changed_repo.git.checkout(version)
1411
1412        # checkout old
1413        if old_version == "latest":
1414            logger.debug(
1415                f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1416            )
1417            shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True)
1418        else:
1419            logger.debug(
1420                f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1421            )
1422            old_repo = git.Repo.clone_from(old_phen_dir, old_path)
1423            old_repo.git.checkout(old_version)
1424
1425        report_filename = f"{version}_{old_version}_diff.md"
1426        report_path = changed_phen_path / report_filename
1427        # diff old with new
1428        diff_phen(changed_path, version, old_path, old_version, report_path)
1429
1430    finally:
1431        # clean up tmp directory
1432        if temp_dir.exists():
1433            shutil.rmtree(temp_dir)
1434            print(f"Temporary directory removed: {temp_dir}")
logger = <Logger acmc_logger (INFO)>
PHEN_DIR = 'phen'
DEFAULT_PHEN_PATH = PosixPath('workspace/phen')
CONCEPTS_DIR = 'concepts'
MAP_DIR = 'map'
CONCEPT_SET_DIR = 'concept-sets'
CSV_PATH = PosixPath('concept-sets/csv')
OMOP_PATH = PosixPath('concept-sets/omop')
DEFAULT_PHEN_DIR_LIST = ['concepts', 'map', 'concept-sets']
CONFIG_FILE = 'config.yaml'
VOCAB_VERSION_FILE = 'vocab_version.yaml'
SEMANTIC_VERSION_TYPES = ['major', 'minor', 'patch']
DEFAULT_VERSION_INC = 'patch'
DEFAULT_GIT_BRANCH = 'main'
SPLIT_COL_ACTION = 'split_col'
CODES_COL_ACTION = 'codes_col'
DIVIDE_COL_ACTION = 'divide_col'
COL_ACTIONS = ['split_col', 'codes_col', 'divide_col']
CODE_FILE_TYPES = ['.xlsx', '.xls', '.csv']
SOURCE_COL_SUFFIX = '_acmc_source'
TARGET_COL_SUFFIX = '_acmc_target'
CONFIG_SCHEMA = {'phenotype': {'type': 'dict', 'required': True, 'schema': {'version': {'type': 'string', 'required': True, 'regex': '^\\d+\\.\\d+\\.\\d+$'}, 'omop': {'type': 'dict', 'required': True, 'schema': {'vocabulary_id': {'type': 'string', 'required': True}, 'vocabulary_name': {'type': 'string', 'required': True}, 'vocabulary_reference': {'type': 'string', 'required': True, 'regex': '^https?://.*'}}}, 'map': {'type': 'list', 'schema': {'type': 'string', 'allowed': ['icd10', 'atc', 'read2', 'snomed', 'opcs4', 'read3']}}, 'concept_sets': {'type': 'list', 'required': True, 'schema': {'type': 'dict', 'schema': {'name': {'type': 'string', 'required': True}, 'file': {'type': 'dict', 'required': False, 'schema': {'path': {'type': 'string', 'required': True}, 'columns': {'type': 'dict', 'required': True}, 'category': {'type': 'string'}, 'actions': {'type': 'dict', 'schema': {'divide_col': {'type': 'string'}}}}}, 'metadata': {'type': 'dict', 'required': True}}}}}}}
class PhenValidationException(builtins.Exception):
129class PhenValidationException(Exception):
130    """Custom exception class raised when validation errors in phenotype configuration file"""
131
132    def __init__(self, message, validation_errors=None):
133        super().__init__(message)
134        self.validation_errors = validation_errors

Custom exception class raised when validation errors in phenotype configuration file

PhenValidationException(message, validation_errors=None)
132    def __init__(self, message, validation_errors=None):
133        super().__init__(message)
134        self.validation_errors = validation_errors
validation_errors
def construct_git_url(remote_url: str):
137def construct_git_url(remote_url: str):
138    """Constructs a git url for github or gitlab including a PAT token environment variable"""
139    # check the url
140    parsed_url = urlparse(remote_url)
141
142    # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd
143    # need to update this function if the URL scheme is different.
144    if "github.com" in parsed_url.netloc:
145        # get GitHub PAT from environment variable
146        auth = os.getenv("ACMC_GITHUB_PAT")
147        if not auth:
148            raise ValueError(
149                "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable."
150            )
151    else:
152        # get GitLab PAT from environment variable
153        auth = os.getenv("ACMC_GITLAB_PAT")
154        if not auth:
155            raise ValueError(
156                "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable."
157            )
158        auth = f"oauth2:{auth}"
159
160    # Construct the new URL with credentials
161    new_netloc = f"{auth}@{parsed_url.netloc}"
162    return urlunparse(
163        (
164            parsed_url.scheme,
165            new_netloc,
166            parsed_url.path,
167            parsed_url.params,
168            parsed_url.query,
169            parsed_url.fragment,
170        )
171    )

Constructs a git url for github or gitlab including a PAT token environment variable

def create_empty_git_dir(path: pathlib.Path):
174def create_empty_git_dir(path: Path):
175    """Creates a directory with a .gitkeep file so that it's tracked in git"""
176    path.mkdir(exist_ok=True)
177    keep_path = path / ".gitkeep"
178    keep_path.touch(exist_ok=True)

Creates a directory with a .gitkeep file so that it's tracked in git

def check_delete_dir(path: pathlib.Path, msg: str) -> bool:
181def check_delete_dir(path: Path, msg: str) -> bool:
182    """Checks on the command line if a user wants to delete a directory
183
184    Args:
185        path (Path): path of the directory to be deleted
186        msg (str): message to be displayed to the user
187
188    Returns:
189        Boolean: True if deleted
190    """
191    deleted = False
192
193    user_input = input(f"{msg}").strip().lower()
194    if user_input in ["yes", "y"]:
195        shutil.rmtree(path)
196        deleted = True
197    else:
198        logger.info("Directory was not deleted.")
199
200    return deleted

Checks on the command line if a user wants to delete a directory

Args: path (Path): path of the directory to be deleted msg (str): message to be displayed to the user

Returns: Boolean: True if deleted

def fork( phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str = None):
203def fork(
204    phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str = None
205):
206    """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
207
208    Args:
209        phen_dir (str): local directory path where the upstream repo is to be cloned
210        upstream_url (str): url to the upstream repo
211        upstream_version (str): version in the upstream repo to clone
212        new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
213
214    Raises:
215        ValueError: if the specified version is not in the upstream repo
216        ValueError: if the upstream repo is not a valid phenotype repo
217        ValueError: if there's any other problems with Git
218    """
219    logger.info(
220        f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}"
221    )
222
223    phen_path = Path(phen_dir)
224    # check if directory already exists and ask user if they want to recreate it
225    if (
226        phen_path.exists() and phen_path.is_dir()
227    ):  # Check if it exists and is a directory
228        configure = check_delete_dir(
229            phen_path,
230            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
231        )
232    else:
233        configure = True
234
235    if not configure:
236        logger.info(f"Exiting, phenotype not initiatised")
237        return
238
239    try:
240        # Clone repo
241        git_url = construct_git_url(upstream_url)
242        repo = git.Repo.clone_from(git_url, phen_path)
243
244        # Fetch all branches and tags
245        repo.remotes.origin.fetch()
246
247        # Check if the version exists
248        available_refs = [ref.name.split("/")[-1] for ref in repo.references]
249        if upstream_version not in available_refs:
250            raise ValueError(
251                f"Version '{upstream_version}' not found in the repository: {upstream_url}."
252            )
253
254        # Checkout the specified version
255        repo.git.checkout(upstream_version)
256        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
257        main_branch.checkout()
258
259        # Check if 'config.yaml' exists in the root directory
260        config_path = phen_path / "config.yaml"
261        if not os.path.isfile(config_path):
262            raise ValueError(
263                f"The forked repository is not a valid ACMC repo because 'config.yaml' is missing in the root directory."
264            )
265
266        # Validate the phenotype is compatible with the acmc tool
267        validate(str(phen_path.resolve()))
268
269        # Delete each tag locally
270        tags = repo.tags
271        for tag in tags:
272            repo.delete_tag(tag)
273            logger.debug(f"Deleted tags from forked repo: {tag}")
274
275        # Add upstream remote
276        repo.create_remote("upstream", upstream_url)
277        remote = repo.remotes["origin"]
278        repo.delete_remote(remote)  # Remove existing origin
279
280        # Optionally set a new origin remote
281        if new_origin_url:
282            git_url = construct_git_url(new_origin_url)
283            repo.create_remote("origin", git_url)
284            repo.git.push("--set-upstream", "origin", "main")
285
286        logger.info(f"Repository forked successfully at {phen_path}")
287        logger.info(f"Upstream set to {upstream_url}")
288        if new_origin_url:
289            logger.info(f"Origin set to {new_origin_url}")
290
291    except Exception as e:
292        if phen_path.exists():
293            shutil.rmtree(phen_path)
294        raise ValueError(f"Error occurred during repository fork: {str(e)}")

Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"

Args: phen_dir (str): local directory path where the upstream repo is to be cloned upstream_url (str): url to the upstream repo upstream_version (str): version in the upstream repo to clone new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.

Raises: ValueError: if the specified version is not in the upstream repo ValueError: if the upstream repo is not a valid phenotype repo ValueError: if there's any other problems with Git

def init(phen_dir: str, remote_url: str):
297def init(phen_dir: str, remote_url: str):
298    """Initial phenotype directory as git repo with standard structure"""
299    logger.info(f"Initialising Phenotype in directory: {phen_dir}")
300    phen_path = Path(phen_dir)
301
302    # check if directory already exists and ask user if they want to recreate it
303    if (
304        phen_path.exists() and phen_path.is_dir()
305    ):  # Check if it exists and is a directory
306        configure = check_delete_dir(
307            phen_path,
308            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
309        )
310    else:
311        configure = True
312
313    if not configure:
314        logger.info(f"Exiting, phenotype not initiatised")
315        return
316
317    # Initialise repo from local or remote
318    repo: Repo
319    # if remote then clone the repo otherwise init a local repo
320    if remote_url != None:
321        # add PAT token to the URL
322        git_url = construct_git_url(remote_url)
323
324        # clone the repo
325        repo = git.cmd.Git()
326        repo.clone(git_url, phen_path)
327
328        # open repo
329        repo = Repo(phen_path)
330        # check if there are any commits (new repo has no commits)
331        if (
332            len(repo.branches) == 0 or repo.head.is_detached
333        ):  # Handle detached HEAD (e.g., after init)
334            logger.debug("The phen repository has no commits yet.")
335            commit_count = 0
336        else:
337            # Get the total number of commits in the default branch
338            commit_count = sum(1 for _ in repo.iter_commits())
339            logger.debug(f"Repo has previous commits: {commit_count}")
340    else:
341        # local repo, create the directories and init
342        phen_path.mkdir(parents=True, exist_ok=True)
343        logger.debug(f"Phen directory '{phen_path}' has been created.")
344        repo = git.Repo.init(phen_path)
345        commit_count = 0
346
347    phen_path = phen_path.resolve()
348    # initialise empty repos
349    if commit_count == 0:
350        # create initial commit
351        initial_file_path = phen_path / "README.md"
352        with open(initial_file_path, "w") as file:
353            file.write(
354                "# Initial commit\nThis is the first commit in the phen repository.\n"
355            )
356        repo.index.add([initial_file_path])
357        repo.index.commit("Initial commit")
358        commit_count = 1
359
360    # Checkout the phens default branch, creating it if it does not exist
361    if DEFAULT_GIT_BRANCH in repo.branches:
362        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
363        main_branch.checkout()
364    else:
365        main_branch = repo.create_head(DEFAULT_GIT_BRANCH)
366        main_branch.checkout()
367
368    # if the phen path does not contain the config file then initialise the phen type
369    config_path = phen_path / CONFIG_FILE
370    if config_path.exists():
371        logger.debug(f"Phenotype configuration files already exist")
372        return
373
374    logger.info("Creating phen directory structure and config files")
375    for d in DEFAULT_PHEN_DIR_LIST:
376        create_empty_git_dir(phen_path / d)
377
378    # create empty phen config file
379    config = {
380        "phenotype": {
381            "version": "0.0.0",
382            "omop": {
383                "vocabulary_id": "",
384                "vocabulary_name": "",
385                "vocabulary_reference": "",
386            },
387            "translate": [],
388            "concept_sets": [],
389        }
390    }
391
392    with open(phen_path / CONFIG_FILE, "w") as file:
393        yaml.dump(
394            config,
395            file,
396            Dumper=util.QuotedDumper,
397            default_flow_style=False,
398            sort_keys=False,
399            default_style='"',
400        )
401
402    # add git ignore
403    ignore_content = """# Ignore SQLite database files
404*.db
405*.sqlite3
406 
407# Ignore SQLite journal and metadata files
408*.db-journal
409*.sqlite3-journal
410
411# python
412.ipynb_checkpoints
413 """
414    ignore_path = phen_path / ".gitignore"
415    with open(ignore_path, "w") as file:
416        file.write(ignore_content)
417
418    # add to git repo and commit
419    for d in DEFAULT_PHEN_DIR_LIST:
420        repo.git.add(phen_path / d)
421    repo.git.add(all=True)
422    repo.index.commit("initialised the phen git repo.")
423
424    logger.info(f"Phenotype initialised successfully")

Initial phenotype directory as git repo with standard structure

def validate(phen_dir: str):
427def validate(phen_dir: str):
428    """Validates the phenotype directory is a git repo with standard structure"""
429    logger.info(f"Validating phenotype: {phen_dir}")
430    phen_path = Path(phen_dir)
431    if not phen_path.is_dir():
432        raise NotADirectoryError(
433            f"Error: '{str(phen_path.resolve())}' is not a directory"
434        )
435
436    config_path = phen_path / CONFIG_FILE
437    if not config_path.is_file():
438        raise FileNotFoundError(
439            f"Error: phen configuration file '{config_path}' does not exist."
440        )
441
442    concepts_path = phen_path / CONCEPTS_DIR
443    if not concepts_path.is_dir():
444        raise FileNotFoundError(
445            f"Error: source concepts directory {concepts_path} does not exist."
446        )
447
448    # Calidate the directory is a git repo
449    try:
450        git.Repo(phen_path)
451    except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError):
452        raise Exception(f"Phen directory {phen_path} is not a git repo")
453
454    # Load configuration File
455    if config_path.suffix == ".yaml":
456        try:
457            with config_path.open("r") as file:
458                phenotype = yaml.safe_load(file)
459
460            validator = Validator(CONFIG_SCHEMA)
461            if validator.validate(phenotype):
462                logger.debug("YAML structure is valid.")
463            else:
464                logger.error(f"YAML structure validation failed: {validator.errors}")
465                raise Exception(f"YAML structure validation failed: {validator.errors}")
466        except yaml.YAMLError as e:
467            logger.error(f"YAML syntax error: {e}")
468            raise e
469    else:
470        raise Exception(
471            f"Unsupported configuration filetype: {str(config_path.resolve())}"
472        )
473
474    # initiatise
475    validation_errors = []
476    phenotype = phenotype["phenotype"]
477    code_types = parse.CodeTypeParser().code_types
478
479    # check the version number is of the format vn.n.n
480    match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"])
481    if not match:
482        validation_errors.append(
483            f"Invalid version format in configuration file: {phenotype['version']}"
484        )
485
486    # create a list of all the concept set names defined in the concept set configuration
487    concept_set_names = []
488    for item in phenotype["concept_sets"]:
489        if item["name"] in concept_set_names:
490            validation_errors.append(
491                f"Duplicate concept set defined in concept sets {item['name'] }"
492            )
493        else:
494            concept_set_names.append(item["name"])
495
496    # check codes definition
497    for item in phenotype["concept_sets"]:
498        # check concepte code file exists
499        concept_code_file_path = concepts_path / item["file"]["path"]
500        if not concept_code_file_path.exists():
501            validation_errors.append(
502                f"Coding file {str(concept_code_file_path.resolve())} does not exist"
503            )
504
505        # check concepte code file is not empty
506        if concept_code_file_path.stat().st_size == 0:
507            validation_errors.append(
508                f"Coding file {str(concept_code_file_path.resolve())} is an empty file"
509            )
510
511        # check code file type is supported
512        if concept_code_file_path.suffix not in CODE_FILE_TYPES:
513            raise ValueError(
514                f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types"
515            )
516
517        # check columns specified are a supported medical coding type
518        for column in item["file"]["columns"]:
519            if column not in code_types:
520                validation_errors.append(
521                    f"Column type {column} for file {concept_code_file_path} is not supported"
522                )
523
524        # check the actions are supported
525        if "actions" in item["file"]:
526            for action in item["file"]["actions"]:
527                if action not in COL_ACTIONS:
528                    validation_errors.append(f"Action {action} is not supported")
529
530    if len(validation_errors) > 0:
531        logger.error(validation_errors)
532        raise PhenValidationException(
533            f"Configuration file {str(config_path.resolve())} failed validation",
534            validation_errors,
535        )
536
537    logger.info(f"Phenotype validated successfully")

Validates the phenotype directory is a git repo with standard structure

def read_table_file(path: pathlib.Path, excel_sheet: str = ''):
540def read_table_file(path: Path, excel_sheet: str = ""):
541    """
542    Load Code List File
543    """
544
545    path = path.resolve()
546    if path.suffix == ".csv":
547        df = pd.read_csv(path, dtype=str)
548    elif path.suffix == ".xlsx" or path.suffix == ".xls":
549        if excel_sheet != "":
550            df = pd.read_excel(path, sheet_name=excel_sheet, dtype=str)
551        else:
552            df = pd.read_excel(path, dtype=str)
553    elif path.suffix == ".dta":
554        df = pd.read_stata(path)
555    else:
556        raise ValueError(
557            f"Unsupported filetype {path.suffix}, only support{CODE_FILE_TYPES} code file types"
558        )
559
560    return df

Load Code List File

def process_actions( df: pandas.core.frame.DataFrame, concept_set: dict) -> pandas.core.frame.DataFrame:
563def process_actions(df: pd.DataFrame, concept_set: dict) -> pd.DataFrame:
564    # Perform Structural Changes to file before preprocessing
565    logger.debug("Processing file structural actions")
566    if (
567        "actions" in concept_set["file"]
568        and "split_col" in concept_set["file"]["actions"]
569        and "codes_col" in concept_set["file"]["actions"]
570    ):
571        split_col = concept_set["file"]["actions"]["split_col"]
572        codes_col = concept_set["file"]["actions"]["codes_col"]
573        logger.debug(
574            "Action: Splitting",
575            split_col,
576            "column into:",
577            df[split_col].unique(),
578        )
579        codes = df[codes_col]
580        oh = pd.get_dummies(df[split_col], dtype=bool)  # one hot encode
581        oh = oh.where((oh != True), codes, axis=0)  # fill in 1s with codes
582        oh[oh == False] = np.nan  # replace 0s with None
583        df = pd.concat([df, oh], axis=1)  # merge in new columns
584
585    return df
def preprocess_source_concepts( df: pandas.core.frame.DataFrame, concept_set: dict, code_file_path: pathlib.Path) -> Tuple[pandas.core.frame.DataFrame, list]:
589def preprocess_source_concepts(
590    df: pd.DataFrame, concept_set: dict, code_file_path: Path
591) -> Tuple[pd.DataFrame, list]:
592    """Parses each column individually - Order and length will not be preserved!"""
593    out = pd.DataFrame([])  # create output df to append to
594    code_errors = []  # list of errors from processing
595
596    # remove unnamed columns due to extra commas, missing headers, or incorrect parsing
597    df = df.drop(columns=[col for col in df.columns if "Unnamed" in col])
598
599    # Preprocess codes
600    code_types = parse.CodeTypeParser().code_types
601    for code_type in concept_set["file"]["columns"]:
602        parser = code_types[code_type]
603        logger.info(f"Processing {code_type} codes for {code_file_path}")
604
605        # get codes by column name
606        source_col_name = concept_set["file"]["columns"][code_type]
607        codes = df[source_col_name].dropna()
608        codes = codes.astype(str)  # convert to string
609        codes = codes.str.strip()  # remove excess spaces
610
611        # process codes, validating them using parser and returning the errors
612        codes, errors = parser.process(codes, code_file_path)
613        if len(errors) > 0:
614            code_errors.extend(errors)
615            logger.warning(f"Codes validation failed with {len(errors)} errors")
616
617        # add processed codes to df
618        new_col_name = f"{source_col_name}_SOURCE"
619        df = df.rename(columns={source_col_name: new_col_name})
620        process_codes = pd.DataFrame({code_type: codes}).join(df)
621        out = pd.concat(
622            [out, process_codes],
623            ignore_index=True,
624        )
625
626    logger.debug(out.head())
627
628    return out, code_errors

Parses each column individually - Order and length will not be preserved!

def get_code_type_from_col_name(col_name: str):
631def get_code_type_from_col_name(col_name: str):
632    return col_name.split("_")[0]
def translate_codes( source_df: pandas.core.frame.DataFrame, target_code_type: str, concept_name: str) -> pandas.core.frame.DataFrame:
636def translate_codes(
637    source_df: pd.DataFrame, target_code_type: str, concept_name: str
638) -> pd.DataFrame:
639    """Translates each source code type the source coding list into a target type and returns all conversions as a concept set"""
640
641    # codes = pd.DataFrame([], dtype=str)
642    codes = pd.DataFrame(
643        columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string"
644    )
645    # Convert codes to target type
646    logger.info(f"Converting to target code type {target_code_type}")
647
648    for source_code_type in source_df.columns:
649
650        # if target code type is the same as thet source code type, no translation, just appending source as target
651        if source_code_type == target_code_type:
652            copy_df = pd.DataFrame(
653                {
654                    "SOURCE_CONCEPT": source_df[source_code_type],
655                    "SOURCE_CONCEPT_TYPE": source_code_type,
656                    "CONCEPT": source_df[source_code_type],
657                }
658            )
659            codes = pd.concat([codes, copy_df])
660            logger.debug(
661                f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating"
662            )
663        else:
664            # get the translation filename using source to target code types
665            filename = f"{source_code_type}_to_{target_code_type}.parquet"
666            map_path = trud.PROCESSED_PATH / filename
667
668            # do the mapping if it exists
669            if map_path.exists():
670                # get mapping
671                df_map = pd.read_parquet(map_path)
672
673                # do mapping
674                translated_df = pd.merge(
675                    source_df[source_code_type], df_map, how="left"
676                )
677
678                # normalise the output
679                translated_df.columns = ["SOURCE_CONCEPT", "CONCEPT"]
680                translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type
681
682                # add to list of codes
683                codes = pd.concat([codes, translated_df])
684
685            else:
686                logger.warning(
687                    f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist"
688                )
689
690    codes = codes.dropna()  # delete NaNs
691
692    # added concept set type to output if any translations
693    if len(codes.index) > 0:
694        codes["CONCEPT_SET"] = concept_name
695    else:
696        logger.debug(f"No codes converted with target code type {target_code_type}")
697
698    return codes

Translates each source code type the source coding list into a target type and returns all conversions as a concept set

def sql_row_exist(conn: sqlite3.Connection, table: str, column: str, value: str) -> bool:
701def sql_row_exist(
702    conn: sqlite3.Connection, table: str, column: str, value: str
703) -> bool:
704    # Execute and check if a result exists
705    cur = conn.cursor()
706    query = f"SELECT 1 FROM {table} WHERE {column} = ? LIMIT 1;"
707    cur.execute(query, (value,))
708    exists = cur.fetchone() is not None
709
710    return exists
def write_code_errors(code_errors: list, code_errors_path: pathlib.Path):
713def write_code_errors(code_errors: list, code_errors_path: Path):
714    err_df = pd.DataFrame(
715        [
716            {
717                "CONCEPT": ", ".join(err.codes[~err.mask].tolist()),
718                "VOCABULARY": err.code_type,
719                "SOURCE": err.codes_file,
720                "CAUSE": err.message,
721            }
722            for err in code_errors
723        ]
724    )
725
726    err_df = err_df.drop_duplicates()  # Remove Duplicates from Error file
727    err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"])
728    err_df.to_csv(code_errors_path, index=False, mode="w")
def write_vocab_version(phen_path: pathlib.Path):
731def write_vocab_version(phen_path: Path):
732    # write the vocab version files
733
734    if not trud.VERSION_PATH.exists():
735        raise FileNotFoundError(
736            f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed"
737        )
738
739    if not omop.VERSION_PATH.exists():
740        raise FileNotFoundError(
741            f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed"
742        )
743
744    with trud.VERSION_PATH.open("r") as file:
745        trud_version = yaml.safe_load(file)
746
747    with omop.VERSION_PATH.open("r") as file:
748        omop_version = yaml.safe_load(file)
749
750    # Create the combined YAML structure
751    version_data = {
752        "versions": {
753            "acmc": acmc.__version__,
754            "trud": trud_version,
755            "omop": omop_version,
756        }
757    }
758
759    with open(phen_path / VOCAB_VERSION_FILE, "w") as file:
760        yaml.dump(
761            version_data,
762            file,
763            Dumper=util.QuotedDumper,
764            default_flow_style=False,
765            sort_keys=False,
766            default_style='"',
767        )
def map(phen_dir: str, target_code_type: str):
770def map(phen_dir: str, target_code_type: str):
771    logger.info(f"Processing phenotype: {phen_dir}")
772
773    # Validate configuration
774    validate(phen_dir)
775
776    # initialise paths
777    phen_path = Path(phen_dir)
778    config_path = phen_path / CONFIG_FILE
779
780    # load configuration
781    with config_path.open("r") as file:
782        config = yaml.safe_load(file)
783    phenotype = config["phenotype"]
784
785    if len(phenotype["map"]) == 0:
786        raise ValueError(f"No map codes defined in the phenotype configuration")
787
788    if target_code_type is not None and target_code_type not in phenotype["map"]:
789        raise ValueError(
790            f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}"
791        )
792
793    if target_code_type is not None:
794        map_target_code_type(phen_path, phenotype, target_code_type)
795    else:
796        for t in phenotype["map"]:
797            map_target_code_type(phen_path, phenotype, t)
798
799    logger.info(f"Phenotype processed successfully")
def map_target_code_type(phen_path: pathlib.Path, phenotype: dict, target_code_type: str):
802def map_target_code_type(phen_path: Path, phenotype: dict, target_code_type: str):
803    logger.debug(f"Target coding format: {target_code_type}")
804    concepts_path = phen_path / CONCEPTS_DIR
805    # Create output dataframe
806    out = pd.DataFrame([])
807    code_errors = []
808
809    # Process each folder in codes section
810    for concept_set in phenotype["concept_sets"]:
811        logger.debug(f"--- {concept_set['file']} ---")
812
813        # Load code file
814        codes_file_path = Path(concepts_path / concept_set["file"]["path"])
815        df = read_table_file(codes_file_path)
816
817        # process structural actions
818        df = process_actions(df, concept_set)
819
820        # preprocessing and validate of source concepts
821        logger.debug("Processing and validating source concept codes")
822        df, errors = preprocess_source_concepts(
823            df,
824            concept_set,
825            codes_file_path,
826        )
827
828        # create df with just the source code columns
829        source_column_names = list(concept_set["file"]["columns"].keys())
830        source_df = df[source_column_names]
831
832        logger.debug(source_df.columns)
833        logger.debug(source_df.head())
834
835        logger.debug(f"Length of errors from preprocess_source_concepts {len(errors)}")
836        if len(errors) > 0:
837            code_errors.extend(errors)
838        logger.debug(f" Length of code_errors {len(code_errors)}")
839
840        # Map source concepts codes to target codes
841        # if processing a source coding list with categorical data
842        if (
843            "actions" in concept_set["file"]
844            and "divide_col" in concept_set["file"]["actions"]
845            and len(df) > 0
846        ):
847            divide_col = concept_set["file"]["actions"]["divide_col"]
848            logger.debug(f"Action: Dividing Table by {divide_col}")
849            logger.debug(f"column into: {df[divide_col].unique()}")
850            df_grp = df.groupby(divide_col)
851            for cat, grp in df_grp:
852                if cat == concept_set["file"]["category"]:
853                    grp = grp.drop(columns=[divide_col])  # delete categorical column
854                    source_df = grp[source_column_names]
855                    trans_out = translate_codes(
856                        source_df,
857                        target_code_type=target_code_type,
858                        concept_name=concept_set["name"],
859                    )
860                    out = pd.concat([out, trans_out])
861        else:
862            source_df = df[source_column_names]
863            trans_out = translate_codes(
864                source_df,
865                target_code_type=target_code_type,
866                concept_name=concept_set["name"],
867            )
868            out = pd.concat([out, trans_out])
869
870    if len(code_errors) > 0:
871        logger.error(f"The map processing has {len(code_errors)} errors")
872        error_path = phen_path / MAP_DIR / "errors"
873        error_path.mkdir(parents=True, exist_ok=True)
874        error_filename = f"{target_code_type}-code-errors.csv"
875        write_code_errors(code_errors, error_path / error_filename)
876
877    # Check there is output from processing
878    if len(out.index) == 0:
879        logger.error(f"No output after map processing")
880        raise Exception(
881            f"No output after map processing, check config {str(phen_path.resolve())}"
882        )
883
884    # final processing
885    out = out.reset_index(drop=True)
886    out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
887    out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"])
888
889    out_count = len(out.index)
890    # added metadata
891    # Loop over each source_concept_type and perform the left join on all columns apart from source code columns
892    result_list = []
893    source_column_names = list(concept_set["file"]["columns"].keys())
894    for source_concept_type in source_column_names:
895
896        # Filter output based on the current source_concept_type
897        out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type]
898        filtered_count = len(out_filtered_df.index)
899
900        # Remove the source type columns except the current type will leave the metadata and the join
901        remove_types = [
902            type for type in source_column_names if type != source_concept_type
903        ]
904        metadata_df = df.drop(columns=remove_types)
905        metadata_df = metadata_df.rename(
906            columns={source_concept_type: "SOURCE_CONCEPT"}
907        )
908        metadata_df_count = len(metadata_df.index)
909
910        # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata
911        result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT")
912        result_count = len(result.index)
913
914        logger.debug(
915            f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}"
916        )
917
918        # Append the result to the result_list
919        result_list.append(result)
920
921    # Concatenate all the results into a single DataFrame
922    final_out = pd.concat(result_list, ignore_index=True)
923    final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
924    logger.debug(
925        f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}"
926    )
927
928    # Save output to map directory
929    output_filename = target_code_type + ".csv"
930    map_path = phen_path / MAP_DIR / output_filename
931    final_out.to_csv(map_path, index=False)
932    logger.info(f"Saved mapped concepts to {str(map_path.resolve())}")
933
934    # save concept sets as separate files
935    concept_set_path = phen_path / CSV_PATH / target_code_type
936
937    # empty the concept-set directory except for hiddle files, e.g. .git
938    if concept_set_path.exists():
939        for item in concept_set_path.iterdir():
940            if not item.name.startswith("."):
941                item.unlink()
942    else:
943        concept_set_path.mkdir(parents=True, exist_ok=True)
944
945    # write each concept as a separate file
946    for name, concept in final_out.groupby("CONCEPT_SET"):
947        concept = concept.sort_values(by="CONCEPT")  # sort rows
948        concept = concept.dropna(how="all", axis=1)  # remove empty cols
949        concept = concept.reindex(
950            sorted(concept.columns), axis=1
951        )  # sort cols alphabetically
952        filename = f"{name}.csv"
953        concept_path = concept_set_path / filename
954        concept.to_csv(concept_path, index=False)
955
956    write_vocab_version(phen_path)
957
958    logger.info(f"Phenotype processed target code type {target_code_type}")
def generate_version_tag( repo: git.repo.base.Repo, increment: str = 'patch', use_v_prefix: bool = False) -> str:
961def generate_version_tag(
962    repo: git.Repo, increment: str = DEFAULT_VERSION_INC, use_v_prefix: bool = False
963) -> str:
964    # Get all valid semantic version tags
965    versions = []
966    for tag in repo.tags:
967        tag_name = (
968            tag.name.lstrip("v") if use_v_prefix else tag.name
969        )  # Remove 'v' if needed
970        if semver.Version.is_valid(tag_name):
971            versions.append(semver.Version.parse(tag_name))
972
973    # Determine the next version
974    if not versions:
975        new_version = semver.Version(0, 0, 1)
976    else:
977        latest_version = max(versions)
978        if increment == "major":
979            new_version = latest_version.bump_major()
980        elif increment == "minor":
981            new_version = latest_version.bump_minor()
982        else:
983            new_version = latest_version.bump_patch()
984
985    # Create the new tag
986    new_version_str = f"v{new_version}" if use_v_prefix else str(new_version)
987
988    return new_version_str
def publish(phen_dir: str, msg: str, remote_url: str, increment: str = 'patch'):
 991def publish(
 992    phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC
 993):
 994    """Publishes updates to the phenotype by commiting all changes to the repo directory"""
 995
 996    # Validate config
 997    validate(phen_dir)
 998    phen_path = Path(phen_dir)
 999
1000    # load git repo and set the branch
1001    repo = git.Repo(phen_path)
1002    if DEFAULT_GIT_BRANCH in repo.branches:
1003        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
1004        main_branch.checkout()
1005    else:
1006        raise AttributeError(
1007            f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}"
1008        )
1009
1010    # check if any changes to publish
1011    if not repo.is_dirty() and not repo.untracked_files:
1012        if remote_url is not None and "origin" not in repo.remotes:
1013            logger.info(f"First publish to remote url {remote_url}")
1014        else:
1015            logger.info("Nothing to publish, no changes to the repo")
1016            return
1017
1018    # get next version
1019    new_version_str = generate_version_tag(repo, increment)
1020    logger.info(f"New version: {new_version_str}")
1021
1022    # Write version in configuration file
1023    config_path = phen_path / CONFIG_FILE
1024    with config_path.open("r") as file:
1025        config = yaml.safe_load(file)
1026
1027    config["phenotype"]["version"] = new_version_str
1028    with open(config_path, "w") as file:
1029        yaml.dump(
1030            config,
1031            file,
1032            Dumper=util.QuotedDumper,
1033            default_flow_style=False,
1034            sort_keys=False,
1035            default_style='"',
1036        )
1037
1038    # Add and commit changes to repo including version updates
1039    commit_message = f"Committing updates to phenotype {phen_path}"
1040    repo.git.add("--all")
1041    repo.index.commit(commit_message)
1042
1043    # Add tag to the repo
1044    repo.create_tag(new_version_str)
1045
1046    # push to origin if a remote repo
1047    if remote_url is not None and "origin" not in repo.remotes:
1048        git_url = construct_git_url(remote_url)
1049        repo.create_remote("origin", git_url)
1050
1051    try:
1052        if "origin" in repo.remotes:
1053            logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}")
1054            origin = repo.remotes.origin
1055            logger.info(f"Pushing main branch to remote repo")
1056            repo.git.push("--set-upstream", "origin", "main")
1057            logger.info(f"Pushing version tags to remote git repo")
1058            origin.push(tags=True)
1059            logger.debug("Changes pushed to 'origin'")
1060        else:
1061            logger.debug("Remote 'origin' is not set")
1062    except Exception as e:
1063        tag_ref = repo.tags[new_version_str]
1064        repo.delete_tag(tag_ref)
1065        repo.git.reset("--soft", "HEAD~1")
1066        raise e
1067
1068    logger.info(f"Phenotype published successfully")

Publishes updates to the phenotype by commiting all changes to the repo directory

def export(phen_dir: str, version: str):
1071def export(phen_dir: str, version: str):
1072    """Exports a phen repo at a specific tagged version into a target directory"""
1073    logger.info(f"Exporting phenotype {phen_dir} at version {version}")
1074
1075    # validate configuration
1076    validate(phen_dir)
1077    phen_path = Path(phen_dir)
1078
1079    # load configuration
1080    config_path = phen_path / CONFIG_FILE
1081    with config_path.open("r") as file:
1082        config = yaml.safe_load(file)
1083
1084    map_path = phen_path / MAP_DIR
1085    if not map_path.exists():
1086        logger.warning(f"Map path does not exist '{map_path}'")
1087
1088    export_path = phen_path / OMOP_PATH
1089    # check export directory exists and if not create it
1090    if not export_path.exists():
1091        export_path.mkdir(parents=True)
1092        logger.debug(f"OMOP export directory '{export_path}' created.")
1093
1094    # omop export db
1095    export_db_path = omop.export(
1096        map_path,
1097        export_path,
1098        config["phenotype"]["version"],
1099        config["phenotype"]["omop"],
1100    )
1101
1102    # write to tables
1103    # export as csv
1104    logger.info(f"Phenotype exported successfully")

Exports a phen repo at a specific tagged version into a target directory

def copy(phen_dir: str, target_dir: str, version: str):
1107def copy(phen_dir: str, target_dir: str, version: str):
1108    """Copys a phen repo at a specific tagged version into a target directory"""
1109
1110    # Validate
1111    validate(phen_dir)
1112    phen_path = Path(phen_dir)
1113
1114    # Check target directory exists
1115    target_path = Path(target_dir)
1116    if not target_path.exists():
1117        raise FileNotFoundError(f"The target directory {target_path} does not exist")
1118
1119    # Set copy directory
1120    copy_path = target_path / version
1121    logger.info(f"Copying repo {phen_path} to {copy_path}")
1122
1123    if (
1124        copy_path.exists() and copy_path.is_dir()
1125    ):  # Check if it exists and is a directory
1126        copy = check_delete_dir(
1127            copy_path,
1128            f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ",
1129        )
1130    else:
1131        copy = True
1132
1133    if not copy:
1134        logger.info(f"Not copying the version {version}")
1135        return
1136
1137    logger.debug(f"Cloning repo from {phen_path} into {copy_path}...")
1138    repo = git.Repo.clone_from(phen_path, copy_path)
1139
1140    # Check out the latest commit or specified version
1141    if version:
1142        # Checkout a specific version (e.g., branch, tag, or commit hash)
1143        logger.info(f"Checking out version {version}...")
1144        repo.git.checkout(version)
1145    else:
1146        # Checkout the latest commit (HEAD)
1147        logger.info(f"Checking out the latest commit...")
1148        repo.git.checkout("HEAD")
1149
1150    logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}")
1151
1152    logger.info(f"Phenotype copied successfully")

Copys a phen repo at a specific tagged version into a target directory

def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1156def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1157    """Extracts concepts as {name: file_path} dictionary and a name set."""
1158    concepts_dict = {
1159        item["name"]: item["file"]["path"]
1160        for item in config_data["phenotype"]["concept_sets"]
1161    }
1162    name_set = set(concepts_dict.keys())
1163    return concepts_dict, name_set

Extracts concepts as {name: file_path} dictionary and a name set.

def extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]:
1166def extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]:
1167    """
1168    Extracts clean keys from a DeepDiff dictionary.
1169
1170    :param diff: DeepDiff result dictionary
1171    :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed")
1172    :return: A set of clean key names
1173    """
1174    return {key.split("root['")[1].split("']")[0] for key in diff.get(key_type, [])}

Extracts clean keys from a DeepDiff dictionary.

:param diff: DeepDiff result dictionary :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed") :return: A set of clean key names

def diff_config(old_config: dict, new_config: dict) -> str:
1177def diff_config(old_config: dict, new_config: dict) -> str:
1178    report = f"\n# Changes to phenotype configuration\n"
1179    report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n"
1180
1181    old_concepts, old_names = extract_concepts(old_config)
1182    new_concepts, new_names = extract_concepts(new_config)
1183
1184    # Check added and removed names
1185    added_names = new_names - old_names  # Names that appear in new but not in old
1186    removed_names = old_names - new_names  # Names that were in old but not in new
1187
1188    # find file path changes for unchanged names
1189    unchanged_names = old_names & new_names  # Names that exist in both
1190    file_diff = DeepDiff(
1191        {name: old_concepts[name] for name in unchanged_names},
1192        {name: new_concepts[name] for name in unchanged_names},
1193    )
1194
1195    # Find renamed concepts (same file, different name)
1196    renamed_concepts = []
1197    for removed in removed_names:
1198        old_path = old_concepts[removed]
1199        for added in added_names:
1200            new_path = new_concepts[added]
1201            if old_path == new_path:
1202                renamed_concepts.append((removed, added))
1203
1204    # Remove renamed concepts from added and removed sets
1205    for old_name, new_name in renamed_concepts:
1206        added_names.discard(new_name)
1207        removed_names.discard(old_name)
1208
1209    # generate config report
1210    if added_names:
1211        report += "## Added Concepts\n"
1212        for name in added_names:
1213            report += f"- `{name}` (File: `{new_concepts[name]}`)\n"
1214        report += "\n"
1215
1216    if removed_names:
1217        report += "## Removed Concepts\n"
1218        for name in removed_names:
1219            report += f"- `{name}` (File: `{old_concepts[name]}`)\n"
1220        report += "\n"
1221
1222    if renamed_concepts:
1223        report += "## Renamed Concepts\n"
1224        for old_name, new_name in renamed_concepts:
1225            report += (
1226                f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n"
1227            )
1228        report += "\n"
1229
1230    if "values_changed" in file_diff:
1231        report += "## Updated File Paths\n"
1232        for name, change in file_diff["values_changed"].items():
1233            old_file = change["old_value"]
1234            new_file = change["new_value"]
1235            clean_name = name.split("root['")[1].split("']")[0]
1236            report += (
1237                f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n"
1238            )
1239        report += "\n"
1240
1241    if not (
1242        added_names
1243        or removed_names
1244        or renamed_concepts
1245        or file_diff.get("values_changed")
1246    ):
1247        report += "No changes in concept sets.\n"
1248
1249    return report
def diff_map_files(old_map_path: pathlib.Path, new_map_path: pathlib.Path) -> str:
1252def diff_map_files(old_map_path: Path, new_map_path: Path) -> str:
1253    old_output_files = [
1254        file.name
1255        for file in old_map_path.iterdir()
1256        if file.is_file() and not file.name.startswith(".")
1257    ]
1258    new_output_files = [
1259        file.name
1260        for file in new_map_path.iterdir()
1261        if file.is_file() and not file.name.startswith(".")
1262    ]
1263
1264    # Convert the lists to sets for easy comparison
1265    old_output_set = set(old_output_files)
1266    new_output_set = set(new_output_files)
1267
1268    # Outputs that are in old_output_set but not in new_output_set (removed files)
1269    removed_outputs = old_output_set - new_output_set
1270    # Outputs that are in new_output_set but not in old_output_set (added files)
1271    added_outputs = new_output_set - old_output_set
1272    # Outputs that are the intersection of old_output_set and new_output_set
1273    common_outputs = old_output_set & new_output_set
1274
1275    report = f"\n# Changes to available translations\n"
1276    report += f"This compares the coding translations files available.\n\n"
1277    report += f"- Removed outputs: {sorted(list(removed_outputs))}\n"
1278    report += f"- Added outputs: {sorted(list(added_outputs))}\n"
1279    report += f"- Common outputs: {sorted(list(common_outputs))}\n\n"
1280
1281    # Step N: Compare common outputs between versions
1282    report += f"# Changes to concepts in translation files\n\n"
1283    report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n"
1284    for file in common_outputs:
1285        old_output = old_map_path / file
1286        new_output = new_map_path / file
1287
1288        logger.debug(f"Old ouptput: {str(old_output.resolve())}")
1289        logger.debug(f"New ouptput: {str(new_output.resolve())}")
1290
1291        df1 = pd.read_csv(old_output)
1292        df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1293        df2 = pd.read_csv(new_output)
1294        df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1295
1296        # Check for added and removed concepts
1297        report += f"- File {file}\n"
1298        sorted_list = sorted(list(set(df1.index) - set(df2.index)))
1299        report += f"- Removed concepts {sorted_list}\n"
1300        sorted_list = sorted(list(set(df2.index) - set(df1.index)))
1301        report += f"- Added concepts {sorted_list}\n"
1302
1303        # Check for changed concepts
1304        diff = df2 - df1  # diff in counts
1305        diff = diff[
1306            (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna()
1307        ]  # get non-zero counts
1308        s = "\n"
1309        if len(diff.index) > 0:
1310            for concept, row in diff.iterrows():
1311                s += "\t - {} {}\n".format(concept, row["CONCEPT"])
1312            report += f"- Changed concepts {s}\n\n"
1313        else:
1314            report += f"- Changed concepts []\n\n"
1315
1316    return report
def diff_phen( new_phen_path: pathlib.Path, new_version: str, old_phen_path: pathlib.Path, old_version: str, report_path: pathlib.Path):
1319def diff_phen(
1320    new_phen_path: Path,
1321    new_version: str,
1322    old_phen_path: Path,
1323    old_version: str,
1324    report_path: Path,
1325):
1326    """Compare the differences between two versions of a phenotype"""
1327
1328    # validate phenotypes
1329    logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}")
1330    validate(str(old_phen_path.resolve()))
1331    logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}")
1332    validate(str(new_phen_path.resolve()))
1333
1334    # get old and new config
1335    old_config_path = old_phen_path / CONFIG_FILE
1336    with old_config_path.open("r") as file:
1337        old_config = yaml.safe_load(file)
1338    new_config_path = new_phen_path / CONFIG_FILE
1339    with new_config_path.open("r") as file:
1340        new_config = yaml.safe_load(file)
1341
1342    # write report heading
1343    report = f"# Phenotype Comparison Report\n"
1344    report += f"## Original phenotype\n"
1345    report += f"  - {old_config['phenotype']['omop']['vocabulary_id']}\n"
1346    report += f"  - {old_version}\n"
1347    report += f"  - {str(old_phen_path.resolve())}\n"
1348    report += f"## Changed phenotype:\n"
1349    report += f"  - {new_config['phenotype']['omop']['vocabulary_id']}\n"
1350    report += f"  - {new_version}\n"
1351    report += f"  - {str(new_phen_path.resolve())}\n"
1352
1353    # Step 1: check differences configuration files
1354    # Convert list of dicts into a dict: {name: file}
1355    report += diff_config(old_config, new_config)
1356
1357    # Step 2: check differences between map files
1358    # List files from output directories
1359    old_map_path = old_phen_path / MAP_DIR
1360    new_map_path = new_phen_path / MAP_DIR
1361    report += diff_map_files(old_map_path, new_map_path)
1362
1363    # initialise report file
1364    logger.debug(f"Writing to report file {str(report_path.resolve())}")
1365    report_file = open(report_path, "w")
1366    report_file.write(report)
1367    report_file.close()
1368
1369    logger.info(f"Phenotypes diff'd successfully")

Compare the differences between two versions of a phenotype

def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str):
1372def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str):
1373    # make tmp directory .acmc
1374    timestamp = time.strftime("%Y%m%d_%H%M%S")
1375    temp_dir = Path(f".acmc/diff_{timestamp}")
1376
1377    changed_phen_path = Path(phen_dir)
1378    if not changed_phen_path.exists():
1379        raise ValueError(
1380            f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}"
1381        )
1382
1383    old_phen_path = Path(old_phen_dir)
1384    if not old_phen_path.exists():
1385        raise ValueError(
1386            f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}"
1387        )
1388
1389    try:
1390        # Create the directory
1391        temp_dir.mkdir(parents=True, exist_ok=True)
1392        logger.debug(f"Temporary directory created: {temp_dir}")
1393
1394        # Create temporary directories
1395        changed_path = temp_dir / "changed"
1396        changed_path.mkdir(parents=True, exist_ok=True)
1397        old_path = temp_dir / "old"
1398        old_path.mkdir(parents=True, exist_ok=True)
1399
1400        # checkout changed
1401        if version == "latest":
1402            logger.debug(
1403                f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..."
1404            )
1405            shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True)
1406        else:
1407            logger.debug(
1408                f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..."
1409            )
1410            changed_repo = git.Repo.clone_from(changed_phen_path, changed_path)
1411            changed_repo.git.checkout(version)
1412
1413        # checkout old
1414        if old_version == "latest":
1415            logger.debug(
1416                f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1417            )
1418            shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True)
1419        else:
1420            logger.debug(
1421                f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1422            )
1423            old_repo = git.Repo.clone_from(old_phen_dir, old_path)
1424            old_repo.git.checkout(old_version)
1425
1426        report_filename = f"{version}_{old_version}_diff.md"
1427        report_path = changed_phen_path / report_filename
1428        # diff old with new
1429        diff_phen(changed_path, version, old_path, old_version, report_path)
1430
1431    finally:
1432        # clean up tmp directory
1433        if temp_dir.exists():
1434            shutil.rmtree(temp_dir)
1435            print(f"Temporary directory removed: {temp_dir}")