acmc.phen

phenotype.py module

This module provides functionality for managing phenotypes.

   1"""
   2phenotype.py module
   3
   4This module provides functionality for managing phenotypes.
   5"""
   6
   7import argparse
   8import pandas as pd
   9import numpy as np
  10import json
  11import os
  12import sqlite3
  13import sys
  14import shutil
  15import time
  16import git
  17import re
  18import logging
  19import requests
  20import yaml
  21import semver
  22from git import Repo
  23from cerberus import Validator  # type: ignore
  24from deepdiff import DeepDiff
  25from pathlib import Path
  26from urllib.parse import urlparse, urlunparse
  27from typing import Tuple, Set, Any
  28import acmc
  29from acmc import trud, omop, parse, util, logging_config as lc
  30
  31# set up logging
  32_logger = lc.setup_logger()
  33
  34pd.set_option("mode.chained_assignment", None)
  35
  36PHEN_DIR = "phen"
  37"""Default phenotype directory name"""
  38
  39DEFAULT_PHEN_PATH = Path("./workspace") / PHEN_DIR
  40"""Default phenotype directory path"""
  41
  42CONCEPTS_DIR = "concepts"
  43"""Default concepts directory name"""
  44
  45MAP_DIR = "map"
  46"""Default map directory name"""
  47
  48CONCEPT_SET_DIR = "concept-sets"
  49"""Default concept set directory name"""
  50
  51CSV_PATH = Path(CONCEPT_SET_DIR) / "csv"
  52"""Default CSV concept set directory path"""
  53
  54OMOP_PATH = Path(CONCEPT_SET_DIR) / "omop"
  55"""Default OMOP concept set directory path"""
  56
  57DEFAULT_PHEN_DIR_LIST = [CONCEPTS_DIR, MAP_DIR, CONCEPT_SET_DIR]
  58"""List of default phenotype directories"""
  59
  60CONFIG_FILE = "config.yml"
  61"""Default configuration filename"""
  62
  63VOCAB_VERSION_FILE = "vocab_version.yml"
  64"""Default vocabulary version filename"""
  65
  66SEMANTIC_VERSION_TYPES = ["major", "minor", "patch"]
  67"""List of semantic version increment types"""
  68
  69DEFAULT_VERSION_INC = "patch"
  70"""Default semantic version increment type"""
  71
  72DEFAULT_GIT_BRANCH = "main"
  73"""Default phenotype repo branch name"""
  74
  75SPLIT_COL_ACTION = "split_col"
  76"""Split column preprocessing action type"""
  77
  78CODES_COL_ACTION = "codes_col"
  79"""Codes column preprocessing action type"""
  80
  81DIVIDE_COL_ACTION = "divide_col"
  82"""Divide column preprocessing action type"""
  83
  84COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION]
  85"""List of column preprocessing action types"""
  86
  87CODE_FILE_TYPES = [".xlsx", ".xls", ".csv"]
  88"""List of supported source concept coding list file types"""
  89
  90# config.yaml schema
  91CONFIG_SCHEMA = {
  92    "phenotype": {
  93        "type": "dict",
  94        "required": True,
  95        "schema": {
  96            "version": {
  97                "type": "string",
  98                "required": True,
  99                "regex": r"^\d+\.\d+\.\d+$",  # Enforces 'vN.N.N' format
 100            },
 101            "omop": {
 102                "type": "dict",
 103                "required": True,
 104                "schema": {
 105                    "vocabulary_id": {"type": "string", "required": True},
 106                    "vocabulary_name": {"type": "string", "required": True},
 107                    "vocabulary_reference": {
 108                        "type": "string",
 109                        "required": True,
 110                        "regex": r"^https?://.*",  # Ensures it's a URL
 111                    },
 112                },
 113            },
 114            "map": {
 115                "type": "list",
 116                "schema": {
 117                    "type": "string",
 118                    "allowed": list(
 119                        parse.SUPPORTED_CODE_TYPES
 120                    ),  # Ensure only predefined values are allowed
 121                },
 122            },
 123            "concept_sets": {
 124                "type": "list",
 125                "required": True,
 126                "schema": {
 127                    "type": "dict",
 128                    "schema": {
 129                        "name": {"type": "string", "required": True},
 130                        "files": {
 131                            "type": "list",
 132                            "required": True,
 133                            "schema": {
 134                                "type": "dict",
 135                                "schema": {
 136                                    "path": {"type": "string", "required": True},
 137                                    "columns": {"type": "dict", "required": True},
 138                                    "category": {
 139                                        "type": "string"
 140                                    },  # Optional but must be string if present
 141                                    "actions": {
 142                                        "type": "dict",
 143                                        "schema": {
 144                                            "divide_col": {"type": "string"},
 145                                            "split_col": {"type": "string"},
 146                                            "codes_col": {"type": "string"},
 147                                        },
 148                                    },
 149                                },
 150                            },
 151                        },
 152                        "metadata": {"type": "dict", "required": False},
 153                    },
 154                },
 155            },
 156        },
 157    }
 158}
 159"""Phenotype config.yml schema definition"""
 160
 161
 162class PhenValidationException(Exception):
 163    """Custom exception class raised when validation errors in phenotype configuration file"""
 164
 165    def __init__(self, message, validation_errors=None):
 166        super().__init__(message)
 167        self.validation_errors = validation_errors
 168
 169
 170def _construct_git_url(remote_url: str):
 171    """Constructs a git url for github or gitlab including a PAT token environment variable"""
 172    # check the url
 173    parsed_url = urlparse(remote_url)
 174
 175    # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd
 176    # need to update this function if the URL scheme is different.
 177    if "github.com" in parsed_url.netloc:
 178        # get GitHub PAT from environment variable
 179        auth = os.getenv("ACMC_GITHUB_PAT")
 180        if not auth:
 181            raise ValueError(
 182                "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable."
 183            )
 184    else:
 185        # get GitLab PAT from environment variable
 186        auth = os.getenv("ACMC_GITLAB_PAT")
 187        if not auth:
 188            raise ValueError(
 189                "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable."
 190            )
 191        auth = f"oauth2:{auth}"
 192
 193    # Construct the new URL with credentials
 194    new_netloc = f"{auth}@{parsed_url.netloc}"
 195    return urlunparse(
 196        (
 197            parsed_url.scheme,
 198            new_netloc,
 199            parsed_url.path,
 200            parsed_url.params,
 201            parsed_url.query,
 202            parsed_url.fragment,
 203        )
 204    )
 205
 206
 207def _create_empty_git_dir(path: Path):
 208    """Creates a directory with a .gitkeep file so that it's tracked in git"""
 209    path.mkdir(exist_ok=True)
 210    keep_path = path / ".gitkeep"
 211    keep_path.touch(exist_ok=True)
 212
 213
 214def _check_delete_dir(path: Path, msg: str) -> bool:
 215    """Checks on the command line if a user wants to delete a directory
 216
 217    Args:
 218        path (Path): path of the directory to be deleted
 219        msg (str): message to be displayed to the user
 220
 221    Returns:
 222        Boolean: True if deleted
 223    """
 224    deleted = False
 225
 226    user_input = input(f"{msg}").strip().lower()
 227    if user_input in ["yes", "y"]:
 228        shutil.rmtree(path)
 229        deleted = True
 230    else:
 231        _logger.info("Directory was not deleted.")
 232
 233    return deleted
 234
 235
 236def init(phen_dir: str, remote_url: str):
 237    """Initial phenotype directory as git repo with standard structure"""
 238    _logger.info(f"Initialising Phenotype in directory: {phen_dir}")
 239    phen_path = Path(phen_dir)
 240
 241    # check if directory already exists and ask user if they want to recreate it
 242    if (
 243        phen_path.exists() and phen_path.is_dir()
 244    ):  # Check if it exists and is a directory
 245        configure = _check_delete_dir(
 246            phen_path,
 247            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
 248        )
 249    else:
 250        configure = True
 251
 252    if not configure:
 253        _logger.info(f"Exiting, phenotype not initiatised")
 254        return
 255
 256    # Initialise repo from local or remote
 257    repo: Repo
 258
 259    # if remote then clone the repo otherwise init a local repo
 260    if remote_url != None:
 261        # add PAT token to the URL
 262        git_url = _construct_git_url(remote_url)
 263
 264        # clone the repo
 265        git_cmd = git.cmd.Git()
 266        git_cmd.clone(git_url, phen_path)
 267
 268        # open repo
 269        repo = Repo(phen_path)
 270        # check if there are any commits (new repo has no commits)
 271        if (
 272            len(repo.branches) == 0 or repo.head.is_detached
 273        ):  # Handle detached HEAD (e.g., after init)
 274            _logger.debug("The phen repository has no commits yet.")
 275            commit_count = 0
 276        else:
 277            # Get the total number of commits in the default branch
 278            commit_count = sum(1 for _ in repo.iter_commits())
 279            _logger.debug(f"Repo has previous commits: {commit_count}")
 280    else:
 281        # local repo, create the directories and init
 282        phen_path.mkdir(parents=True, exist_ok=True)
 283        _logger.debug(f"Phen directory '{phen_path}' has been created.")
 284        repo = git.Repo.init(phen_path)
 285        commit_count = 0
 286
 287    phen_path = phen_path.resolve()
 288    # initialise empty repos
 289    if commit_count == 0:
 290        # create initial commit
 291        initial_file_path = phen_path / "README.md"
 292        with open(initial_file_path, "w") as file:
 293            file.write(
 294                "# Initial commit\nThis is the first commit in the phen repository.\n"
 295            )
 296        repo.index.add([initial_file_path])
 297        repo.index.commit("Initial commit")
 298        commit_count = 1
 299
 300    # Checkout the phens default branch, creating it if it does not exist
 301    if DEFAULT_GIT_BRANCH in repo.branches:
 302        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
 303        main_branch.checkout()
 304    else:
 305        main_branch = repo.create_head(DEFAULT_GIT_BRANCH)
 306        main_branch.checkout()
 307
 308    # if the phen path does not contain the config file then initialise the phen type
 309    config_path = phen_path / CONFIG_FILE
 310    if config_path.exists():
 311        _logger.debug(f"Phenotype configuration files already exist")
 312        return
 313
 314    _logger.info("Creating phen directory structure and config files")
 315    for d in DEFAULT_PHEN_DIR_LIST:
 316        _create_empty_git_dir(phen_path / d)
 317
 318    # create empty phen config file
 319    config = {
 320        "phenotype": {
 321            "version": "0.0.0",
 322            "omop": {
 323                "vocabulary_id": "",
 324                "vocabulary_name": "",
 325                "vocabulary_reference": "",
 326            },
 327            "translate": [],
 328            "concept_sets": [],
 329        }
 330    }
 331
 332    with open(phen_path / CONFIG_FILE, "w") as file:
 333        yaml.dump(
 334            config,
 335            file,
 336            Dumper=util.QuotedDumper,
 337            default_flow_style=False,
 338            sort_keys=False,
 339            default_style='"',
 340        )
 341
 342    # add git ignore
 343    ignore_content = """# Ignore SQLite database files
 344*.db
 345*.sqlite3
 346 
 347# Ignore SQLite journal and metadata files
 348*.db-journal
 349*.sqlite3-journal
 350
 351# python
 352.ipynb_checkpoints
 353 """
 354    ignore_path = phen_path / ".gitignore"
 355    with open(ignore_path, "w") as file:
 356        file.write(ignore_content)
 357
 358    # add to git repo and commit
 359    for d in DEFAULT_PHEN_DIR_LIST:
 360        repo.git.add(phen_path / d)
 361    repo.git.add(all=True)
 362    repo.index.commit("initialised the phen git repo.")
 363
 364    _logger.info(f"Phenotype initialised successfully")
 365
 366
 367def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
 368    """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
 369
 370    Args:
 371        phen_dir (str): local directory path where the upstream repo is to be cloned
 372        upstream_url (str): url to the upstream repo
 373        upstream_version (str): version in the upstream repo to clone
 374        new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
 375
 376    Raises:
 377        ValueError: if the specified version is not in the upstream repo
 378        ValueError: if the upstream repo is not a valid phenotype repo
 379        ValueError: if there's any other problems with Git
 380    """
 381    _logger.info(
 382        f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}"
 383    )
 384
 385    phen_path = Path(phen_dir)
 386    # check if directory already exists and ask user if they want to recreate it
 387    if (
 388        phen_path.exists() and phen_path.is_dir()
 389    ):  # Check if it exists and is a directory
 390        configure = _check_delete_dir(
 391            phen_path,
 392            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
 393        )
 394    else:
 395        configure = True
 396
 397    if not configure:
 398        _logger.info(f"Exiting, phenotype not initiatised")
 399        return
 400
 401    try:
 402        # Clone repo
 403        git_url = _construct_git_url(upstream_url)
 404        repo = git.Repo.clone_from(git_url, phen_path)
 405
 406        # Fetch all branches and tags
 407        repo.remotes.origin.fetch()
 408
 409        # Check if the version exists
 410        available_refs = [ref.name.split("/")[-1] for ref in repo.references]
 411        if upstream_version not in available_refs:
 412            raise ValueError(
 413                f"Version '{upstream_version}' not found in the repository: {upstream_url}."
 414            )
 415
 416        # Checkout the specified version
 417        repo.git.checkout(upstream_version)
 418        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
 419        main_branch.checkout()
 420
 421        # Check if 'config.yml' exists in the root directory
 422        config_path = phen_path / "config.yml"
 423        if not os.path.isfile(config_path):
 424            raise ValueError(
 425                f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory."
 426            )
 427
 428        # Validate the phenotype is compatible with the acmc tool
 429        validate(str(phen_path.resolve()))
 430
 431        # Delete each tag locally
 432        tags = repo.tags
 433        for tag in tags:
 434            repo.delete_tag(tag)
 435            _logger.debug(f"Deleted tags from forked repo: {tag}")
 436
 437        # Add upstream remote
 438        repo.create_remote("upstream", upstream_url)
 439        remote = repo.remotes["origin"]
 440        repo.delete_remote(remote)  # Remove existing origin
 441
 442        # Optionally set a new origin remote
 443        if new_origin_url:
 444            git_url = _construct_git_url(new_origin_url)
 445            repo.create_remote("origin", git_url)
 446            repo.git.push("--set-upstream", "origin", "main")
 447
 448        _logger.info(f"Repository forked successfully at {phen_path}")
 449        _logger.info(f"Upstream set to {upstream_url}")
 450        if new_origin_url:
 451            _logger.info(f"Origin set to {new_origin_url}")
 452
 453    except Exception as e:
 454        if phen_path.exists():
 455            shutil.rmtree(phen_path)
 456        raise ValueError(f"Error occurred during repository fork: {str(e)}")
 457
 458
 459def validate(phen_dir: str):
 460    """Validates the phenotype directory is a git repo with standard structure"""
 461    _logger.info(f"Validating phenotype: {phen_dir}")
 462    phen_path = Path(phen_dir)
 463    if not phen_path.is_dir():
 464        raise NotADirectoryError(
 465            f"Error: '{str(phen_path.resolve())}' is not a directory"
 466        )
 467
 468    config_path = phen_path / CONFIG_FILE
 469    if not config_path.is_file():
 470        raise FileNotFoundError(
 471            f"Error: phen configuration file '{config_path}' does not exist."
 472        )
 473
 474    concepts_path = phen_path / CONCEPTS_DIR
 475    if not concepts_path.is_dir():
 476        raise FileNotFoundError(
 477            f"Error: source concepts directory {concepts_path} does not exist."
 478        )
 479
 480    # Calidate the directory is a git repo
 481    try:
 482        git.Repo(phen_path)
 483    except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError):
 484        raise Exception(f"Phen directory {phen_path} is not a git repo")
 485
 486    # Load configuration File
 487    if config_path.suffix == ".yml":
 488        try:
 489            with config_path.open("r") as file:
 490                phenotype = yaml.safe_load(file)
 491
 492            validator = Validator(CONFIG_SCHEMA)
 493            if validator.validate(phenotype):
 494                _logger.debug("YAML structure is valid.")
 495            else:
 496                _logger.error(f"YAML structure validation failed: {validator.errors}")
 497                raise Exception(f"YAML structure validation failed: {validator.errors}")
 498        except yaml.YAMLError as e:
 499            _logger.error(f"YAML syntax error: {e}")
 500            raise e
 501    else:
 502        raise Exception(
 503            f"Unsupported configuration filetype: {str(config_path.resolve())}"
 504        )
 505
 506    # initiatise
 507    validation_errors = []
 508    phenotype = phenotype["phenotype"]
 509    code_types = parse.CodeTypeParser().code_types
 510
 511    # check the version number is of the format vn.n.n
 512    match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"])
 513    if not match:
 514        validation_errors.append(
 515            f"Invalid version format in configuration file: {phenotype['version']}"
 516        )
 517
 518    # create a list of all the concept set names defined in the concept set configuration
 519    concept_set_names = []
 520    for item in phenotype["concept_sets"]:
 521        if item["name"] in concept_set_names:
 522            validation_errors.append(
 523                f"Duplicate concept set defined in concept sets {item['name'] }"
 524            )
 525        else:
 526            concept_set_names.append(item["name"])
 527
 528    # check codes definition
 529    for files in phenotype["concept_sets"]:
 530        for item in files["files"]:
 531            # check concepte code file exists
 532            concept_code_file_path = concepts_path / item["path"]
 533            if not concept_code_file_path.exists():
 534                validation_errors.append(
 535                    f"Coding file {str(concept_code_file_path.resolve())} does not exist"
 536                )
 537
 538            # check concepte code file is not empty
 539            if concept_code_file_path.stat().st_size == 0:
 540                validation_errors.append(
 541                    f"Coding file {str(concept_code_file_path.resolve())} is an empty file"
 542                )
 543
 544            # check code file type is supported
 545            if concept_code_file_path.suffix not in CODE_FILE_TYPES:
 546                raise ValueError(
 547                    f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types"
 548                )
 549
 550            # check columns specified are a supported medical coding type
 551            for column in item["columns"]:
 552                if column not in code_types:
 553                    validation_errors.append(
 554                        f"Column type {column} for file {concept_code_file_path} is not supported"
 555                    )
 556
 557            # check the actions are supported
 558            if "actions" in item:
 559                for action in item["actions"]:
 560                    if action not in COL_ACTIONS:
 561                        validation_errors.append(f"Action {action} is not supported")
 562
 563    if len(validation_errors) > 0:
 564        _logger.error(validation_errors)
 565        raise PhenValidationException(
 566            f"Configuration file {str(config_path.resolve())} failed validation",
 567            validation_errors,
 568        )
 569
 570    _logger.info(f"Phenotype validated successfully")
 571
 572
 573def _read_table_file(path: Path, excel_sheet: str = ""):
 574    """
 575    Load Code List File
 576    """
 577
 578    path = path.resolve()
 579    if path.suffix == ".csv":
 580        df = pd.read_csv(path, dtype=str)
 581    elif path.suffix == ".xlsx" or path.suffix == ".xls":
 582        if excel_sheet != "":
 583            df = pd.read_excel(path, sheet_name=excel_sheet, dtype=str)
 584        else:
 585            df = pd.read_excel(path, dtype=str)
 586    elif path.suffix == ".dta":
 587        df = pd.read_stata(path)
 588    else:
 589        raise ValueError(
 590            f"Unsupported filetype {path.suffix}, only support{CODE_FILE_TYPES} code file types"
 591        )
 592
 593    return df
 594
 595
 596def _process_actions(df: pd.DataFrame, concept_set: dict) -> pd.DataFrame:
 597    # Perform Structural Changes to file before preprocessing
 598    _logger.debug("Processing file structural actions")
 599    if (
 600        "actions" in concept_set
 601        and "split_col" in concept_set["actions"]
 602        and "codes_col" in concept_set["actions"]
 603    ):
 604        split_col = concept_set["actions"]["split_col"]
 605        codes_col = concept_set["actions"]["codes_col"]
 606        _logger.debug(
 607            "Action: Splitting",
 608            split_col,
 609            "column into:",
 610            df[split_col].unique(),
 611        )
 612        codes = df[codes_col]
 613        oh = pd.get_dummies(df[split_col], dtype=bool)  # one hot encode
 614        oh = oh.where((oh != True), codes, axis=0)  # fill in 1s with codes
 615        oh[oh == False] = np.nan  # replace 0s with None
 616        df = pd.concat([df, oh], axis=1)  # merge in new columns
 617
 618    return df
 619
 620
 621def _preprocess_source_concepts(
 622    df: pd.DataFrame, concept_set: dict, code_file_path: Path
 623) -> Tuple[pd.DataFrame, list]:
 624    """Perform QA Checks on columns individually and append to df"""
 625    out = pd.DataFrame([])  # create output df to append to
 626    code_errors = []  # list of errors from processing
 627
 628    # remove unnamed columns due to extra commas, missing headers, or incorrect parsing
 629    df = df.drop(columns=[col for col in df.columns if "Unnamed" in col])
 630
 631    # Preprocess codes
 632    code_types = parse.CodeTypeParser().code_types
 633    for code_type in concept_set["columns"]:
 634        parser = code_types[code_type]
 635        _logger.info(f"Processing {code_type} codes for {code_file_path}")
 636
 637        # get codes by column name
 638        source_col_name = concept_set["columns"][code_type]
 639        codes = df[source_col_name].dropna()
 640        codes = codes.astype(str)  # convert to string
 641        codes = codes.str.strip()  # remove excess spaces
 642
 643        # process codes, validating them using parser and returning the errors
 644        codes, errors = parser.process(codes, code_file_path)
 645        if len(errors) > 0:
 646            code_errors.extend(errors)
 647            _logger.warning(f"Codes validation failed with {len(errors)} errors")
 648
 649        # add processed codes to df
 650        new_col_name = f"{source_col_name}_SOURCE"
 651        df = df.rename(columns={source_col_name: new_col_name})
 652        process_codes = pd.DataFrame({code_type: codes}).join(df)
 653        out = pd.concat(
 654            [out, process_codes],
 655            ignore_index=True,
 656        )
 657
 658    _logger.debug(out.head())
 659
 660    return out, code_errors
 661
 662
 663# Translate Df with multiple codes into single code type Series
 664def translate_codes(
 665    source_df: pd.DataFrame,
 666    target_code_type: str,
 667    concept_name: str,
 668    not_translate: bool,
 669    do_reverse_translate: bool,
 670) -> pd.DataFrame:
 671    """Translates each source code type the source coding list into a target type and returns all conversions as a concept set"""
 672
 673    # codes = pd.DataFrame([], dtype=str)
 674    codes = pd.DataFrame(
 675        columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string"
 676    )
 677    # Convert codes to target type
 678    _logger.info(f"Converting to target code type {target_code_type}")
 679
 680    for source_code_type in source_df.columns:
 681        # if target code type is the same as thet source code type, no translation, just appending source as target
 682        if source_code_type == target_code_type:
 683            copy_df = pd.DataFrame(
 684                {
 685                    "SOURCE_CONCEPT": source_df[source_code_type],
 686                    "SOURCE_CONCEPT_TYPE": source_code_type,
 687                    "CONCEPT": source_df[source_code_type],
 688                }
 689            )
 690            codes = pd.concat([codes, copy_df])
 691            _logger.debug(
 692                f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating"
 693            )
 694        elif not not_translate:
 695            # get the translation filename using source to target code types
 696            filename = f"{source_code_type}_to_{target_code_type}.parquet"
 697            map_path = trud.PROCESSED_PATH / filename
 698
 699            filename_reversed = f"{target_code_type}_to_{source_code_type}.parquet"
 700            map_path_reversed = trud.PROCESSED_PATH / filename_reversed
 701
 702            # do the mapping if it exists
 703            if map_path.exists():
 704                codes = _translate_codes(map_path, source_df, source_code_type, codes)
 705            # otherwise do reverse mapping if enabled and it exists
 706            elif do_reverse_translate and map_path_reversed.exists():
 707                codes = _translate_codes(
 708                    map_path_reversed, source_df, source_code_type, codes, reverse=True
 709                )
 710            else:
 711                _logger.warning(
 712                    f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist"
 713                )
 714
 715    codes = codes.dropna()  # delete NaNs
 716
 717    # print(codes)
 718
 719    # added concept set type to output if any translations
 720    if len(codes.index) > 0:
 721        codes["CONCEPT_SET"] = concept_name
 722    else:
 723        # print("NO CODES CONVERTED")
 724        _logger.debug(f"No codes converted with target code type {target_code_type}")
 725
 726    return codes
 727
 728
 729def _translate_codes(map_path, source_df, source_code_type, codes, reverse=False) -> pd.DataFrame:
 730    # get mapping
 731    df_map = pd.read_parquet(map_path)
 732
 733    # do mapping
 734    if not(reverse):
 735        translated_df = pd.merge(source_df[source_code_type], df_map, how="left")
 736        # print("NORMAL")#, source_df, df_map)
 737    else:
 738        translated_df = pd.merge(source_df[source_code_type], df_map, how="left") #output codes from target as reversed
 739        # print("REVERSED")#, source_df, df_map)
 740
 741    # normalise the output
 742    translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"])
 743    translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type
 744    # print(translated_df, codes)
 745
 746    # add to list of codes
 747    codes = pd.concat([codes, translated_df])
 748
 749    return codes
 750
 751
 752def _write_code_errors(code_errors: list, code_errors_path: Path):
 753    err_df = pd.DataFrame(
 754        [
 755            {
 756                "CONCEPT": ", ".join(err.codes[~err.mask].tolist()),
 757                "VOCABULARY": err.code_type,
 758                "SOURCE": err.codes_file,
 759                "CAUSE": err.message,
 760            }
 761            for err in code_errors
 762            if err.mask is not None
 763        ]
 764    )
 765
 766    err_df = err_df.drop_duplicates()  # Remove Duplicates from Error file
 767    err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"])
 768    err_df.to_csv(code_errors_path, index=False, mode="w")
 769
 770
 771def write_vocab_version(phen_path: Path):
 772    # write the vocab version files
 773
 774    if not trud.VERSION_PATH.exists():
 775        raise FileNotFoundError(
 776            f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed"
 777        )
 778
 779    if not omop.VERSION_PATH.exists():
 780        raise FileNotFoundError(
 781            f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed"
 782        )
 783
 784    with trud.VERSION_PATH.open("r") as file:
 785        trud_version = yaml.safe_load(file)
 786
 787    with omop.VERSION_PATH.open("r") as file:
 788        omop_version = yaml.safe_load(file)
 789
 790    # Create the combined YAML structure
 791    version_data = {
 792        "versions": {
 793            "acmc": acmc.__version__,
 794            "trud": trud_version,
 795            "omop": omop_version,
 796        }
 797    }
 798
 799    with open(phen_path / VOCAB_VERSION_FILE, "w") as file:
 800        yaml.dump(
 801            version_data,
 802            file,
 803            Dumper=util.QuotedDumper,
 804            default_flow_style=False,
 805            sort_keys=False,
 806            default_style='"',
 807        )
 808
 809
 810def map(
 811    phen_dir: str,
 812    target_code_type: str,
 813    not_translate: bool,
 814    no_metadata: bool,
 815    do_reverse_translate: bool,
 816):
 817    _logger.info(f"Processing phenotype: {phen_dir}")
 818
 819    # Validate configuration
 820    validate(phen_dir)
 821
 822    # initialise paths
 823    phen_path = Path(phen_dir)
 824    config_path = phen_path / CONFIG_FILE
 825
 826    # load configuration
 827    with config_path.open("r") as file:
 828        config = yaml.safe_load(file)
 829    phenotype = config["phenotype"]
 830
 831    if len(phenotype["map"]) == 0:
 832        raise ValueError(f"No map codes defined in the phenotype configuration")
 833
 834    if target_code_type is not None and target_code_type not in phenotype["map"]:
 835        raise ValueError(
 836            f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}"
 837        )
 838
 839    if target_code_type is not None:
 840        _map_target_code_type(
 841            phen_path,
 842            phenotype,
 843            target_code_type,
 844            not_translate,
 845            no_metadata,
 846            do_reverse_translate,
 847        )
 848    else:
 849        for t in phenotype["map"]:
 850            _map_target_code_type(
 851                phen_path,
 852                phenotype,
 853                t,
 854                not_translate,
 855                no_metadata,
 856                do_reverse_translate,
 857            )
 858
 859    _logger.info(f"Phenotype processed successfully")
 860
 861
 862def _map_target_code_type(
 863    phen_path: Path,
 864    phenotype: dict,
 865    target_code_type: str,
 866    not_translate: bool,
 867    no_metadata: bool,
 868    do_reverse_translate: bool,
 869):
 870    _logger.debug(f"Target coding format: {target_code_type}")
 871    concepts_path = phen_path / CONCEPTS_DIR
 872    # Create output dataframe
 873    out = pd.DataFrame([])
 874    code_errors = []
 875
 876    # Process each folder in codes section
 877    for files in phenotype["concept_sets"]:
 878        concept_set_name = files["name"]
 879        if "metadata" in files:
 880            concept_set_metadata = files["metadata"]
 881        else:
 882            concept_set_metadata = {}
 883        for concept_set in files["files"]:
 884            _logger.debug(f"--- {concept_set} ---")
 885
 886            # Load code file
 887            codes_file_path = Path(concepts_path / concept_set["path"])
 888            df = _read_table_file(codes_file_path)
 889
 890            # process structural actions
 891            df = _process_actions(df, concept_set)
 892
 893            # preprocessing and validate of source concepts
 894            _logger.debug("Processing and validating source concept codes")
 895            df, errors = _preprocess_source_concepts(
 896                df,
 897                concept_set,
 898                codes_file_path,
 899            )
 900
 901            # create df with just the source code columns
 902            source_column_names = list(concept_set["columns"].keys())
 903            source_df = df[source_column_names]
 904
 905            _logger.debug(source_df.columns)
 906            _logger.debug(source_df.head())
 907
 908            _logger.debug(
 909                f"Length of errors from _preprocess_source_concepts {len(errors)}"
 910            )
 911            if len(errors) > 0:
 912                code_errors.extend(errors)
 913            _logger.debug(f" Length of code_errors {len(code_errors)}")
 914
 915            # Map source concepts codes to target codes
 916            # if processing a source coding list with categorical data
 917            if (
 918                "actions" in concept_set
 919                and "divide_col" in concept_set["actions"]
 920                and len(df) > 0
 921            ):
 922                divide_col = concept_set["actions"]["divide_col"]
 923                _logger.debug(f"Action: Dividing Table by {divide_col}")
 924                _logger.debug(f"column into: {df[divide_col].unique()}")
 925                df_grp = df.groupby(divide_col)
 926                for cat, grp in df_grp:
 927                    if cat == concept_set["category"]:
 928                        grp = grp.drop(
 929                            columns=[divide_col]
 930                        )  # delete categorical column
 931                        source_df = grp[source_column_names]
 932                        trans_out = translate_codes(
 933                            source_df,
 934                            target_code_type=target_code_type,
 935                            concept_name=concept_set_name,
 936                            not_translate=not_translate,
 937                            do_reverse_translate=do_reverse_translate,
 938                        )
 939                        trans_out = add_metadata(
 940                            codes=trans_out,
 941                            metadata=concept_set_metadata,
 942                            no_metadata=no_metadata,
 943                        )
 944                        out = pd.concat([out, trans_out])
 945            else:
 946                source_df = df[source_column_names]
 947                trans_out = translate_codes(
 948                    source_df,
 949                    target_code_type=target_code_type,
 950                    concept_name=concept_set_name,
 951                    not_translate=not_translate,
 952                    do_reverse_translate=do_reverse_translate,
 953                )
 954                trans_out = add_metadata(
 955                    codes=trans_out,
 956                    metadata=concept_set_metadata,
 957                    no_metadata=no_metadata,
 958                )
 959                out = pd.concat([out, trans_out])
 960
 961    if len(code_errors) > 0:
 962        _logger.error(f"The map processing has {len(code_errors)} errors")
 963        error_path = phen_path / MAP_DIR / "errors"
 964        error_path.mkdir(parents=True, exist_ok=True)
 965        error_filename = f"{target_code_type}-code-errors.csv"
 966        _write_code_errors(code_errors, error_path / error_filename)
 967
 968    # Check there is output from processing
 969    if len(out.index) == 0:
 970        _logger.error(f"No output after map processing")
 971        raise Exception(
 972            f"No output after map processing, check config {str(phen_path.resolve())}"
 973        )
 974
 975    # final processing
 976    out = out.reset_index(drop=True)
 977    out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
 978    out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"])
 979
 980    # out_count = len(out.index)
 981    # added metadata
 982    # Loop over each source_concept_type and perform the left join on all columns apart from source code columns
 983    # result_list = []
 984    # for files in phenotype["concept_sets"]:
 985    #     concept_set_name = files["name"]
 986    #     for concept_set in files["files"]:
 987    #         source_column_names = list(concept_set["columns"].keys())
 988    #         for source_concept_type in source_column_names:
 989    #             # Filter output based on the current source_concept_type
 990    #             out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type]
 991    #             filtered_count = len(out_filtered_df.index)
 992
 993    #             # Remove the source type columns except the current type will leave the metadata and the join
 994    #             remove_types = [
 995    #                 type for type in source_column_names if type != source_concept_type
 996    #             ]
 997    #             metadata_df = df.drop(columns=remove_types)
 998    #             metadata_df = metadata_df.rename(
 999    #                 columns={source_concept_type: "SOURCE_CONCEPT"}
1000    #             )
1001    #             metadata_df_count = len(metadata_df.index)
1002
1003    # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata
1004    # result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT")
1005    # result_count = len(result.index)
1006
1007    #             _logger.debug(
1008    #                 f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}"
1009    #             )
1010
1011    #             # Append the result to the result_list
1012    #             result_list.append(result)
1013
1014    # Concatenate all the results into a single DataFrame
1015    # final_out = pd.concat(result_list, ignore_index=True)
1016    # final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"])
1017    # _logger.debug(
1018    #     f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}"
1019    # )
1020
1021    # Save output to map directory
1022    output_filename = target_code_type + ".csv"
1023    map_path = phen_path / MAP_DIR / output_filename
1024    out.to_csv(map_path, index=False)
1025    _logger.info(f"Saved mapped concepts to {str(map_path.resolve())}")
1026
1027    # save concept sets as separate files
1028    concept_set_path = phen_path / CSV_PATH / target_code_type
1029
1030    # empty the concept-set directory except for hiddle files, e.g. .git
1031    if concept_set_path.exists():
1032        for item in concept_set_path.iterdir():
1033            if not item.name.startswith("."):
1034                item.unlink()
1035    else:
1036        concept_set_path.mkdir(parents=True, exist_ok=True)
1037
1038    # write each concept as a separate file
1039    for name, concept in out.groupby("CONCEPT_SET"):
1040        concept = concept.sort_values(by="CONCEPT")  # sort rows
1041        concept = concept.dropna(how="all", axis=1)  # remove empty cols
1042        concept = concept.reindex(
1043            sorted(concept.columns), axis=1
1044        )  # sort cols alphabetically
1045        filename = f"{name}.csv"
1046        concept_path = concept_set_path / filename
1047        concept.to_csv(concept_path, index=False)
1048
1049    write_vocab_version(phen_path)
1050
1051    _logger.info(f"Phenotype processed target code type {target_code_type}")
1052
1053
1054# Add metadata dict to each row of Df codes
1055def add_metadata(
1056    codes: pd.DataFrame,
1057    metadata: dict,
1058    no_metadata: bool,
1059) -> pd.DataFrame:
1060    """Add concept set metadata, stored as a dictionary, to each concept row"""
1061
1062    if not no_metadata:
1063        for meta_name, meta_value in metadata.items():
1064            codes[meta_name] = meta_value
1065            _logger.debug(
1066                f"Adding metadata for concept set: metadata name {meta_name}, metadata value {meta_value}"
1067            )
1068
1069    return codes
1070
1071
1072def _generate_version_tag(
1073    repo: git.Repo, increment: str = DEFAULT_VERSION_INC, use_v_prefix: bool = False
1074) -> str:
1075    # Get all valid semantic version tags
1076    versions = []
1077    for tag in repo.tags:
1078        if tag.name.startswith("v"):
1079            tag_name = tag.name[1:]  # Remove the first character
1080        else:
1081            tag_name = tag.name
1082        if semver.Version.is_valid(tag_name):
1083            versions.append(semver.Version.parse(tag_name))
1084
1085    _logger.debug(f"Versions: {versions}")
1086    # Determine the next version
1087    if not versions:
1088        new_version = semver.Version(0, 0, 1)
1089    else:
1090        latest_version = max(versions)
1091        if increment == "major":
1092            new_version = latest_version.bump_major()
1093        elif increment == "minor":
1094            new_version = latest_version.bump_minor()
1095        else:
1096            new_version = latest_version.bump_patch()
1097
1098    # Create the new tag
1099    new_version_str = f"v{new_version}" if use_v_prefix else str(new_version)
1100
1101    return new_version_str
1102
1103
1104def publish(
1105    phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC
1106):
1107    """Publishes updates to the phenotype by commiting all changes to the repo directory"""
1108
1109    # Validate config
1110    validate(phen_dir)
1111    phen_path = Path(phen_dir)
1112
1113    # load git repo and set the branch
1114    repo = git.Repo(phen_path)
1115    if DEFAULT_GIT_BRANCH in repo.branches:
1116        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
1117        main_branch.checkout()
1118    else:
1119        raise AttributeError(
1120            f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}"
1121        )
1122
1123    # check if any changes to publish
1124    if not repo.is_dirty() and not repo.untracked_files:
1125        if remote_url is not None and "origin" not in repo.remotes:
1126            _logger.info(f"First publish to remote url {remote_url}")
1127        else:
1128            _logger.info("Nothing to publish, no changes to the repo")
1129            return
1130
1131    # get next version
1132    new_version_str = _generate_version_tag(repo, increment)
1133    _logger.info(f"New version: {new_version_str}")
1134
1135    # Write version in configuration file
1136    config_path = phen_path / CONFIG_FILE
1137    with config_path.open("r") as file:
1138        config = yaml.safe_load(file)
1139
1140    config["phenotype"]["version"] = new_version_str
1141    with open(config_path, "w") as file:
1142        yaml.dump(
1143            config,
1144            file,
1145            Dumper=util.QuotedDumper,
1146            default_flow_style=False,
1147            sort_keys=False,
1148            default_style='"',
1149        )
1150
1151    # Add and commit changes to repo including version updates
1152    commit_message = f"Committing updates to phenotype {phen_path}"
1153    repo.git.add("--all")
1154    repo.index.commit(commit_message)
1155
1156    # Add tag to the repo
1157    repo.create_tag(new_version_str)
1158
1159    # push to origin if a remote repo
1160    if remote_url is not None and "origin" not in repo.remotes:
1161        git_url = _construct_git_url(remote_url)
1162        repo.create_remote("origin", git_url)
1163
1164    try:
1165        if "origin" in repo.remotes:
1166            _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}")
1167            origin = repo.remotes.origin
1168            _logger.info(f"Pushing main branch to remote repo")
1169            repo.git.push("--set-upstream", "origin", "main")
1170            _logger.info(f"Pushing version tags to remote git repo")
1171            origin.push(tags=True)
1172            _logger.debug("Changes pushed to 'origin'")
1173        else:
1174            _logger.debug("Remote 'origin' is not set")
1175    except Exception as e:
1176        tag_ref = repo.tags[new_version_str]
1177        repo.delete_tag(tag_ref)
1178        repo.git.reset("--soft", "HEAD~1")
1179        raise e
1180
1181    _logger.info(f"Phenotype published successfully")
1182
1183
1184def export(phen_dir: str, version: str):
1185    """Exports a phen repo at a specific tagged version into a target directory"""
1186    _logger.info(f"Exporting phenotype {phen_dir} at version {version}")
1187
1188    # validate configuration
1189    validate(phen_dir)
1190    phen_path = Path(phen_dir)
1191
1192    # load configuration
1193    config_path = phen_path / CONFIG_FILE
1194    with config_path.open("r") as file:
1195        config = yaml.safe_load(file)
1196
1197    map_path = phen_path / MAP_DIR
1198    if not map_path.exists():
1199        _logger.warning(f"Map path does not exist '{map_path}'")
1200
1201    export_path = phen_path / OMOP_PATH
1202    # check export directory exists and if not create it
1203    if not export_path.exists():
1204        export_path.mkdir(parents=True)
1205        _logger.debug(f"OMOP export directory '{export_path}' created.")
1206
1207    # omop export db
1208    export_db_path = omop.export(
1209        map_path,
1210        export_path,
1211        config["phenotype"]["version"],
1212        config["phenotype"]["omop"],
1213    )
1214
1215    # write to tables
1216    # export as csv
1217    _logger.info(f"Phenotype exported successfully")
1218
1219
1220def copy(phen_dir: str, target_dir: str, version: str):
1221    """Copys a phen repo at a specific tagged version into a target directory"""
1222
1223    # Validate
1224    validate(phen_dir)
1225    phen_path = Path(phen_dir)
1226
1227    # Check target directory exists
1228    target_path = Path(target_dir)
1229    if not target_path.exists():
1230        raise FileNotFoundError(f"The target directory {target_path} does not exist")
1231
1232    # Set copy directory
1233    copy_path = target_path / version
1234    _logger.info(f"Copying repo {phen_path} to {copy_path}")
1235
1236    if (
1237        copy_path.exists() and copy_path.is_dir()
1238    ):  # Check if it exists and is a directory
1239        copy = _check_delete_dir(
1240            copy_path,
1241            f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ",
1242        )
1243    else:
1244        copy = True
1245
1246    if not copy:
1247        _logger.info(f"Not copying the version {version}")
1248        return
1249
1250    _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...")
1251    repo = git.Repo.clone_from(phen_path, copy_path)
1252
1253    # Check out the latest commit or specified version
1254    if version:
1255        # Checkout a specific version (e.g., branch, tag, or commit hash)
1256        _logger.info(f"Checking out version {version}...")
1257        repo.git.checkout(version)
1258    else:
1259        # Checkout the latest commit (HEAD)
1260        _logger.info(f"Checking out the latest commit...")
1261        repo.git.checkout("HEAD")
1262
1263    _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}")
1264
1265    _logger.info(f"Phenotype copied successfully")
1266
1267
1268# Convert concept_sets list into dictionaries
1269def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1270    """Extracts concepts as {name: file_path} dictionary and a name set."""
1271    concepts_dict = {
1272        item["name"]: [file["path"] for file in item["files"]]
1273        for item in config_data["phenotype"]["concept_sets"]
1274    }
1275    name_set = set(concepts_dict.keys())
1276    return concepts_dict, name_set
1277
1278
1279def _extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]:
1280    """
1281    Extracts clean keys from a DeepDiff dictionary.
1282
1283    :param diff: DeepDiff result dictionary
1284    :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed")
1285    :return: A set of clean key names
1286    """
1287    return {key.split("root['")[1].split("']")[0] for key in diff.get(key_type, [])}
1288
1289
1290def diff_config(old_config: dict, new_config: dict) -> str:
1291    report = f"\n# Changes to phenotype configuration\n"
1292    report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n"
1293
1294    old_concepts, old_names = extract_concepts(old_config)
1295    new_concepts, new_names = extract_concepts(new_config)
1296
1297    # Check added and removed concept set names
1298    added_names = new_names - old_names  # Names that appear in new but not in old
1299    removed_names = old_names - new_names  # Names that were in old but not in new
1300
1301    # find file path changes for unchanged names
1302    unchanged_names = old_names & new_names  # Names that exist in both
1303    file_diff = DeepDiff(
1304        {name: old_concepts[name] for name in unchanged_names},
1305        {name: new_concepts[name] for name in unchanged_names},
1306    )
1307
1308    # Find renamed concepts (same file, different name)
1309    renamed_concepts = []
1310    for removed in removed_names:
1311        old_path = old_concepts[removed]
1312        for added in added_names:
1313            new_path = new_concepts[added]
1314            if old_path == new_path:
1315                renamed_concepts.append((removed, added))
1316
1317    # Remove renamed concepts from added and removed sets
1318    for old_name, new_name in renamed_concepts:
1319        added_names.discard(new_name)
1320        removed_names.discard(old_name)
1321
1322    # generate config report
1323    if added_names:
1324        report += "## Added Concepts\n"
1325        for name in added_names:
1326            report += f"- `{name}` (File: `{new_concepts[name]}`)\n"
1327        report += "\n"
1328
1329    if removed_names:
1330        report += "## Removed Concepts\n"
1331        for name in removed_names:
1332            report += f"- `{name}` (File: `{old_concepts[name]}`)\n"
1333        report += "\n"
1334
1335    if renamed_concepts:
1336        report += "## Renamed Concepts\n"
1337        for old_name, new_name in renamed_concepts:
1338            report += (
1339                f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n"
1340            )
1341        report += "\n"
1342
1343    if "values_changed" in file_diff:
1344        report += "## Updated File Paths\n"
1345        for name, change in file_diff["values_changed"].items():
1346            old_file = change["old_value"]
1347            new_file = change["new_value"]
1348            clean_name = name.split("root['")[1].split("']")[0]
1349            report += (
1350                f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n"
1351            )
1352        report += "\n"
1353
1354    if not (
1355        added_names
1356        or removed_names
1357        or renamed_concepts
1358        or file_diff.get("values_changed")
1359    ):
1360        report += "No changes in concept sets.\n"
1361
1362    return report
1363
1364
1365def diff_map_files(old_map_path: Path, new_map_path: Path) -> str:
1366    old_output_files = [
1367        file.name
1368        for file in old_map_path.iterdir()
1369        if file.is_file() and not file.name.startswith(".")
1370    ]
1371    new_output_files = [
1372        file.name
1373        for file in new_map_path.iterdir()
1374        if file.is_file() and not file.name.startswith(".")
1375    ]
1376
1377    # Convert the lists to sets for easy comparison
1378    old_output_set = set(old_output_files)
1379    new_output_set = set(new_output_files)
1380
1381    # Outputs that are in old_output_set but not in new_output_set (removed files)
1382    removed_outputs = old_output_set - new_output_set
1383    # Outputs that are in new_output_set but not in old_output_set (added files)
1384    added_outputs = new_output_set - old_output_set
1385    # Outputs that are the intersection of old_output_set and new_output_set
1386    common_outputs = old_output_set & new_output_set
1387
1388    report = f"\n# Changes to available translations\n"
1389    report += f"This compares the coding translations files available.\n\n"
1390    report += f"- Removed outputs: {sorted(list(removed_outputs))}\n"
1391    report += f"- Added outputs: {sorted(list(added_outputs))}\n"
1392    report += f"- Common outputs: {sorted(list(common_outputs))}\n\n"
1393
1394    # Step N: Compare common outputs between versions
1395    report += f"# Changes to concepts in translation files\n\n"
1396    report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n"
1397    for file in common_outputs:
1398        old_output = old_map_path / file
1399        new_output = new_map_path / file
1400
1401        _logger.debug(f"Old ouptput: {str(old_output.resolve())}")
1402        _logger.debug(f"New ouptput: {str(new_output.resolve())}")
1403
1404        df1 = pd.read_csv(old_output)
1405        df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1406        df2 = pd.read_csv(new_output)
1407        df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1408
1409        # Check for added and removed concepts
1410        report += f"- File {file}\n"
1411        sorted_list = sorted(list(set(df1.index) - set(df2.index)))
1412        report += f"- Removed concepts {sorted_list}\n"
1413        sorted_list = sorted(list(set(df2.index) - set(df1.index)))
1414        report += f"- Added concepts {sorted_list}\n"
1415
1416        # Check for changed concepts
1417        diff = df2 - df1  # diff in counts
1418        diff = diff[
1419            (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna()
1420        ]  # get non-zero counts
1421        s = "\n"
1422        if len(diff.index) > 0:
1423            for concept, row in diff.iterrows():
1424                s += "\t - {} {}\n".format(concept, row["CONCEPT"])
1425            report += f"- Changed concepts {s}\n\n"
1426        else:
1427            report += f"- Changed concepts []\n\n"
1428
1429    return report
1430
1431
1432def diff_phen(
1433    new_phen_path: Path,
1434    new_version: str,
1435    old_phen_path: Path,
1436    old_version: str,
1437    report_path: Path,
1438    not_check_config: bool,
1439):
1440    """Compare the differences between two versions of a phenotype"""
1441
1442    # write report heading
1443    report = f"# Phenotype Comparison Report\n"
1444
1445    # Step 1: check differences configuration files
1446    if not not_check_config:
1447        # validate phenotypes
1448        _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}")
1449        validate(str(old_phen_path.resolve()))
1450        _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}")
1451        validate(str(new_phen_path.resolve()))
1452
1453        # get old and new config
1454        old_config_path = old_phen_path / CONFIG_FILE
1455        with old_config_path.open("r") as file:
1456            old_config = yaml.safe_load(file)
1457        new_config_path = new_phen_path / CONFIG_FILE
1458        with new_config_path.open("r") as file:
1459            new_config = yaml.safe_load(file)
1460
1461        # write report
1462        report += f"## Original phenotype\n"
1463        report += f"  - {old_config['phenotype']['omop']['vocabulary_id']}\n"
1464        report += f"  - {old_version}\n"
1465        report += f"  - {str(old_phen_path.resolve())}\n"
1466        report += f"## Changed phenotype:\n"
1467        report += f"  - {new_config['phenotype']['omop']['vocabulary_id']}\n"
1468        report += f"  - {new_version}\n"
1469        report += f"  - {str(new_phen_path.resolve())}\n"
1470
1471        # Convert list of dicts into a dict: {name: file}
1472        report += diff_config(old_config, new_config)
1473
1474    # Step 2: check differences between map files
1475    # List files from output directories
1476    old_map_path = old_phen_path / MAP_DIR
1477    new_map_path = new_phen_path / MAP_DIR
1478    report += diff_map_files(old_map_path, new_map_path)
1479
1480    # initialise report file
1481    _logger.debug(f"Writing to report file {str(report_path.resolve())}")
1482    report_file = open(report_path, "w")
1483    report_file.write(report)
1484    report_file.close()
1485
1486    _logger.info(f"Phenotypes diff'd successfully")
1487
1488
1489def diff(
1490    phen_dir: str,
1491    version: str,
1492    old_phen_dir: str,
1493    old_version: str,
1494    not_check_config: bool,
1495):
1496    # make tmp directory .acmc
1497    timestamp = time.strftime("%Y%m%d_%H%M%S")
1498    temp_dir = Path(f".acmc/diff_{timestamp}")
1499
1500    changed_phen_path = Path(phen_dir)
1501    if not changed_phen_path.exists():
1502        raise ValueError(
1503            f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}"
1504        )
1505
1506    old_phen_path = Path(old_phen_dir)
1507    if not old_phen_path.exists():
1508        raise ValueError(
1509            f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}"
1510        )
1511
1512    #    t_path = old_phen_path / "config.yml"
1513    #    with t_path.open("r") as file:
1514    #        c = yaml.safe_load(file)
1515
1516    try:
1517        # Create the directory
1518        temp_dir.mkdir(parents=True, exist_ok=True)
1519        _logger.debug(f"Temporary directory created: {temp_dir}")
1520
1521        # Create temporary directories
1522        changed_path = temp_dir / "changed"
1523        changed_path.mkdir(parents=True, exist_ok=True)
1524        old_path = temp_dir / "old"
1525        old_path.mkdir(parents=True, exist_ok=True)
1526
1527        # checkout changed
1528        if version == "latest":
1529            _logger.debug(
1530                f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..."
1531            )
1532            shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True)
1533        else:
1534            _logger.debug(
1535                f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..."
1536            )
1537            changed_repo = git.Repo.clone_from(changed_phen_path, changed_path)
1538            changed_repo.git.checkout(version)
1539
1540        # checkout old
1541        if old_version == "latest":
1542            _logger.debug(
1543                f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1544            )
1545            shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True)
1546        else:
1547            _logger.debug(
1548                f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1549            )
1550            old_repo = git.Repo.clone_from(old_phen_dir, old_path)
1551            old_repo.git.checkout(old_version)
1552
1553        report_filename = f"{version}_{old_version}_diff.md"
1554        report_path = changed_phen_path / report_filename
1555        # diff old with new
1556        diff_phen(
1557            changed_path, version, old_path, old_version, report_path, not_check_config
1558        )
1559
1560    finally:
1561        # clean up tmp directory
1562        if temp_dir.exists():
1563            shutil.rmtree(temp_dir)
PHEN_DIR = 'phen'

Default phenotype directory name

DEFAULT_PHEN_PATH = PosixPath('workspace/phen')

Default phenotype directory path

CONCEPTS_DIR = 'concepts'

Default concepts directory name

MAP_DIR = 'map'

Default map directory name

CONCEPT_SET_DIR = 'concept-sets'

Default concept set directory name

CSV_PATH = PosixPath('concept-sets/csv')

Default CSV concept set directory path

OMOP_PATH = PosixPath('concept-sets/omop')

Default OMOP concept set directory path

DEFAULT_PHEN_DIR_LIST = ['concepts', 'map', 'concept-sets']

List of default phenotype directories

CONFIG_FILE = 'config.yml'

Default configuration filename

VOCAB_VERSION_FILE = 'vocab_version.yml'

Default vocabulary version filename

SEMANTIC_VERSION_TYPES = ['major', 'minor', 'patch']

List of semantic version increment types

DEFAULT_VERSION_INC = 'patch'

Default semantic version increment type

DEFAULT_GIT_BRANCH = 'main'

Default phenotype repo branch name

SPLIT_COL_ACTION = 'split_col'

Split column preprocessing action type

CODES_COL_ACTION = 'codes_col'

Codes column preprocessing action type

DIVIDE_COL_ACTION = 'divide_col'

Divide column preprocessing action type

COL_ACTIONS = ['split_col', 'codes_col', 'divide_col']

List of column preprocessing action types

CODE_FILE_TYPES = ['.xlsx', '.xls', '.csv']

List of supported source concept coding list file types

CONFIG_SCHEMA = {'phenotype': {'type': 'dict', 'required': True, 'schema': {'version': {'type': 'string', 'required': True, 'regex': '^\\d+\\.\\d+\\.\\d+$'}, 'omop': {'type': 'dict', 'required': True, 'schema': {'vocabulary_id': {'type': 'string', 'required': True}, 'vocabulary_name': {'type': 'string', 'required': True}, 'vocabulary_reference': {'type': 'string', 'required': True, 'regex': '^https?://.*'}}}, 'map': {'type': 'list', 'schema': {'type': 'string', 'allowed': ['opcs4', 'atc', 'snomed', 'icd10', 'read2', 'read3']}}, 'concept_sets': {'type': 'list', 'required': True, 'schema': {'type': 'dict', 'schema': {'name': {'type': 'string', 'required': True}, 'files': {'type': 'list', 'required': True, 'schema': {'type': 'dict', 'schema': {'path': {'type': 'string', 'required': True}, 'columns': {'type': 'dict', 'required': True}, 'category': {'type': 'string'}, 'actions': {'type': 'dict', 'schema': {'divide_col': {'type': 'string'}, 'split_col': {'type': 'string'}, 'codes_col': {'type': 'string'}}}}}}, 'metadata': {'type': 'dict', 'required': False}}}}}}}

Phenotype config.yml schema definition

class PhenValidationException(builtins.Exception):
163class PhenValidationException(Exception):
164    """Custom exception class raised when validation errors in phenotype configuration file"""
165
166    def __init__(self, message, validation_errors=None):
167        super().__init__(message)
168        self.validation_errors = validation_errors

Custom exception class raised when validation errors in phenotype configuration file

PhenValidationException(message, validation_errors=None)
166    def __init__(self, message, validation_errors=None):
167        super().__init__(message)
168        self.validation_errors = validation_errors
validation_errors
def init(phen_dir: str, remote_url: str):
237def init(phen_dir: str, remote_url: str):
238    """Initial phenotype directory as git repo with standard structure"""
239    _logger.info(f"Initialising Phenotype in directory: {phen_dir}")
240    phen_path = Path(phen_dir)
241
242    # check if directory already exists and ask user if they want to recreate it
243    if (
244        phen_path.exists() and phen_path.is_dir()
245    ):  # Check if it exists and is a directory
246        configure = _check_delete_dir(
247            phen_path,
248            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
249        )
250    else:
251        configure = True
252
253    if not configure:
254        _logger.info(f"Exiting, phenotype not initiatised")
255        return
256
257    # Initialise repo from local or remote
258    repo: Repo
259
260    # if remote then clone the repo otherwise init a local repo
261    if remote_url != None:
262        # add PAT token to the URL
263        git_url = _construct_git_url(remote_url)
264
265        # clone the repo
266        git_cmd = git.cmd.Git()
267        git_cmd.clone(git_url, phen_path)
268
269        # open repo
270        repo = Repo(phen_path)
271        # check if there are any commits (new repo has no commits)
272        if (
273            len(repo.branches) == 0 or repo.head.is_detached
274        ):  # Handle detached HEAD (e.g., after init)
275            _logger.debug("The phen repository has no commits yet.")
276            commit_count = 0
277        else:
278            # Get the total number of commits in the default branch
279            commit_count = sum(1 for _ in repo.iter_commits())
280            _logger.debug(f"Repo has previous commits: {commit_count}")
281    else:
282        # local repo, create the directories and init
283        phen_path.mkdir(parents=True, exist_ok=True)
284        _logger.debug(f"Phen directory '{phen_path}' has been created.")
285        repo = git.Repo.init(phen_path)
286        commit_count = 0
287
288    phen_path = phen_path.resolve()
289    # initialise empty repos
290    if commit_count == 0:
291        # create initial commit
292        initial_file_path = phen_path / "README.md"
293        with open(initial_file_path, "w") as file:
294            file.write(
295                "# Initial commit\nThis is the first commit in the phen repository.\n"
296            )
297        repo.index.add([initial_file_path])
298        repo.index.commit("Initial commit")
299        commit_count = 1
300
301    # Checkout the phens default branch, creating it if it does not exist
302    if DEFAULT_GIT_BRANCH in repo.branches:
303        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
304        main_branch.checkout()
305    else:
306        main_branch = repo.create_head(DEFAULT_GIT_BRANCH)
307        main_branch.checkout()
308
309    # if the phen path does not contain the config file then initialise the phen type
310    config_path = phen_path / CONFIG_FILE
311    if config_path.exists():
312        _logger.debug(f"Phenotype configuration files already exist")
313        return
314
315    _logger.info("Creating phen directory structure and config files")
316    for d in DEFAULT_PHEN_DIR_LIST:
317        _create_empty_git_dir(phen_path / d)
318
319    # create empty phen config file
320    config = {
321        "phenotype": {
322            "version": "0.0.0",
323            "omop": {
324                "vocabulary_id": "",
325                "vocabulary_name": "",
326                "vocabulary_reference": "",
327            },
328            "translate": [],
329            "concept_sets": [],
330        }
331    }
332
333    with open(phen_path / CONFIG_FILE, "w") as file:
334        yaml.dump(
335            config,
336            file,
337            Dumper=util.QuotedDumper,
338            default_flow_style=False,
339            sort_keys=False,
340            default_style='"',
341        )
342
343    # add git ignore
344    ignore_content = """# Ignore SQLite database files
345*.db
346*.sqlite3
347 
348# Ignore SQLite journal and metadata files
349*.db-journal
350*.sqlite3-journal
351
352# python
353.ipynb_checkpoints
354 """
355    ignore_path = phen_path / ".gitignore"
356    with open(ignore_path, "w") as file:
357        file.write(ignore_content)
358
359    # add to git repo and commit
360    for d in DEFAULT_PHEN_DIR_LIST:
361        repo.git.add(phen_path / d)
362    repo.git.add(all=True)
363    repo.index.commit("initialised the phen git repo.")
364
365    _logger.info(f"Phenotype initialised successfully")

Initial phenotype directory as git repo with standard structure

def fork( phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
368def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str):
369    """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
370
371    Args:
372        phen_dir (str): local directory path where the upstream repo is to be cloned
373        upstream_url (str): url to the upstream repo
374        upstream_version (str): version in the upstream repo to clone
375        new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
376
377    Raises:
378        ValueError: if the specified version is not in the upstream repo
379        ValueError: if the upstream repo is not a valid phenotype repo
380        ValueError: if there's any other problems with Git
381    """
382    _logger.info(
383        f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}"
384    )
385
386    phen_path = Path(phen_dir)
387    # check if directory already exists and ask user if they want to recreate it
388    if (
389        phen_path.exists() and phen_path.is_dir()
390    ):  # Check if it exists and is a directory
391        configure = _check_delete_dir(
392            phen_path,
393            f"The phen directory already exists. Do you want to reinitialise? (yes/no): ",
394        )
395    else:
396        configure = True
397
398    if not configure:
399        _logger.info(f"Exiting, phenotype not initiatised")
400        return
401
402    try:
403        # Clone repo
404        git_url = _construct_git_url(upstream_url)
405        repo = git.Repo.clone_from(git_url, phen_path)
406
407        # Fetch all branches and tags
408        repo.remotes.origin.fetch()
409
410        # Check if the version exists
411        available_refs = [ref.name.split("/")[-1] for ref in repo.references]
412        if upstream_version not in available_refs:
413            raise ValueError(
414                f"Version '{upstream_version}' not found in the repository: {upstream_url}."
415            )
416
417        # Checkout the specified version
418        repo.git.checkout(upstream_version)
419        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
420        main_branch.checkout()
421
422        # Check if 'config.yml' exists in the root directory
423        config_path = phen_path / "config.yml"
424        if not os.path.isfile(config_path):
425            raise ValueError(
426                f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory."
427            )
428
429        # Validate the phenotype is compatible with the acmc tool
430        validate(str(phen_path.resolve()))
431
432        # Delete each tag locally
433        tags = repo.tags
434        for tag in tags:
435            repo.delete_tag(tag)
436            _logger.debug(f"Deleted tags from forked repo: {tag}")
437
438        # Add upstream remote
439        repo.create_remote("upstream", upstream_url)
440        remote = repo.remotes["origin"]
441        repo.delete_remote(remote)  # Remove existing origin
442
443        # Optionally set a new origin remote
444        if new_origin_url:
445            git_url = _construct_git_url(new_origin_url)
446            repo.create_remote("origin", git_url)
447            repo.git.push("--set-upstream", "origin", "main")
448
449        _logger.info(f"Repository forked successfully at {phen_path}")
450        _logger.info(f"Upstream set to {upstream_url}")
451        if new_origin_url:
452            _logger.info(f"Origin set to {new_origin_url}")
453
454    except Exception as e:
455        if phen_path.exists():
456            shutil.rmtree(phen_path)
457        raise ValueError(f"Error occurred during repository fork: {str(e)}")

Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"

Arguments:
  • phen_dir (str): local directory path where the upstream repo is to be cloned
  • upstream_url (str): url to the upstream repo
  • upstream_version (str): version in the upstream repo to clone
  • new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
Raises:
  • ValueError: if the specified version is not in the upstream repo
  • ValueError: if the upstream repo is not a valid phenotype repo
  • ValueError: if there's any other problems with Git
def validate(phen_dir: str):
460def validate(phen_dir: str):
461    """Validates the phenotype directory is a git repo with standard structure"""
462    _logger.info(f"Validating phenotype: {phen_dir}")
463    phen_path = Path(phen_dir)
464    if not phen_path.is_dir():
465        raise NotADirectoryError(
466            f"Error: '{str(phen_path.resolve())}' is not a directory"
467        )
468
469    config_path = phen_path / CONFIG_FILE
470    if not config_path.is_file():
471        raise FileNotFoundError(
472            f"Error: phen configuration file '{config_path}' does not exist."
473        )
474
475    concepts_path = phen_path / CONCEPTS_DIR
476    if not concepts_path.is_dir():
477        raise FileNotFoundError(
478            f"Error: source concepts directory {concepts_path} does not exist."
479        )
480
481    # Calidate the directory is a git repo
482    try:
483        git.Repo(phen_path)
484    except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError):
485        raise Exception(f"Phen directory {phen_path} is not a git repo")
486
487    # Load configuration File
488    if config_path.suffix == ".yml":
489        try:
490            with config_path.open("r") as file:
491                phenotype = yaml.safe_load(file)
492
493            validator = Validator(CONFIG_SCHEMA)
494            if validator.validate(phenotype):
495                _logger.debug("YAML structure is valid.")
496            else:
497                _logger.error(f"YAML structure validation failed: {validator.errors}")
498                raise Exception(f"YAML structure validation failed: {validator.errors}")
499        except yaml.YAMLError as e:
500            _logger.error(f"YAML syntax error: {e}")
501            raise e
502    else:
503        raise Exception(
504            f"Unsupported configuration filetype: {str(config_path.resolve())}"
505        )
506
507    # initiatise
508    validation_errors = []
509    phenotype = phenotype["phenotype"]
510    code_types = parse.CodeTypeParser().code_types
511
512    # check the version number is of the format vn.n.n
513    match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"])
514    if not match:
515        validation_errors.append(
516            f"Invalid version format in configuration file: {phenotype['version']}"
517        )
518
519    # create a list of all the concept set names defined in the concept set configuration
520    concept_set_names = []
521    for item in phenotype["concept_sets"]:
522        if item["name"] in concept_set_names:
523            validation_errors.append(
524                f"Duplicate concept set defined in concept sets {item['name'] }"
525            )
526        else:
527            concept_set_names.append(item["name"])
528
529    # check codes definition
530    for files in phenotype["concept_sets"]:
531        for item in files["files"]:
532            # check concepte code file exists
533            concept_code_file_path = concepts_path / item["path"]
534            if not concept_code_file_path.exists():
535                validation_errors.append(
536                    f"Coding file {str(concept_code_file_path.resolve())} does not exist"
537                )
538
539            # check concepte code file is not empty
540            if concept_code_file_path.stat().st_size == 0:
541                validation_errors.append(
542                    f"Coding file {str(concept_code_file_path.resolve())} is an empty file"
543                )
544
545            # check code file type is supported
546            if concept_code_file_path.suffix not in CODE_FILE_TYPES:
547                raise ValueError(
548                    f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types"
549                )
550
551            # check columns specified are a supported medical coding type
552            for column in item["columns"]:
553                if column not in code_types:
554                    validation_errors.append(
555                        f"Column type {column} for file {concept_code_file_path} is not supported"
556                    )
557
558            # check the actions are supported
559            if "actions" in item:
560                for action in item["actions"]:
561                    if action not in COL_ACTIONS:
562                        validation_errors.append(f"Action {action} is not supported")
563
564    if len(validation_errors) > 0:
565        _logger.error(validation_errors)
566        raise PhenValidationException(
567            f"Configuration file {str(config_path.resolve())} failed validation",
568            validation_errors,
569        )
570
571    _logger.info(f"Phenotype validated successfully")

Validates the phenotype directory is a git repo with standard structure

def translate_codes( source_df: pandas.core.frame.DataFrame, target_code_type: str, concept_name: str, not_translate: bool, do_reverse_translate: bool) -> pandas.core.frame.DataFrame:
665def translate_codes(
666    source_df: pd.DataFrame,
667    target_code_type: str,
668    concept_name: str,
669    not_translate: bool,
670    do_reverse_translate: bool,
671) -> pd.DataFrame:
672    """Translates each source code type the source coding list into a target type and returns all conversions as a concept set"""
673
674    # codes = pd.DataFrame([], dtype=str)
675    codes = pd.DataFrame(
676        columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string"
677    )
678    # Convert codes to target type
679    _logger.info(f"Converting to target code type {target_code_type}")
680
681    for source_code_type in source_df.columns:
682        # if target code type is the same as thet source code type, no translation, just appending source as target
683        if source_code_type == target_code_type:
684            copy_df = pd.DataFrame(
685                {
686                    "SOURCE_CONCEPT": source_df[source_code_type],
687                    "SOURCE_CONCEPT_TYPE": source_code_type,
688                    "CONCEPT": source_df[source_code_type],
689                }
690            )
691            codes = pd.concat([codes, copy_df])
692            _logger.debug(
693                f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating"
694            )
695        elif not not_translate:
696            # get the translation filename using source to target code types
697            filename = f"{source_code_type}_to_{target_code_type}.parquet"
698            map_path = trud.PROCESSED_PATH / filename
699
700            filename_reversed = f"{target_code_type}_to_{source_code_type}.parquet"
701            map_path_reversed = trud.PROCESSED_PATH / filename_reversed
702
703            # do the mapping if it exists
704            if map_path.exists():
705                codes = _translate_codes(map_path, source_df, source_code_type, codes)
706            # otherwise do reverse mapping if enabled and it exists
707            elif do_reverse_translate and map_path_reversed.exists():
708                codes = _translate_codes(
709                    map_path_reversed, source_df, source_code_type, codes, reverse=True
710                )
711            else:
712                _logger.warning(
713                    f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist"
714                )
715
716    codes = codes.dropna()  # delete NaNs
717
718    # print(codes)
719
720    # added concept set type to output if any translations
721    if len(codes.index) > 0:
722        codes["CONCEPT_SET"] = concept_name
723    else:
724        # print("NO CODES CONVERTED")
725        _logger.debug(f"No codes converted with target code type {target_code_type}")
726
727    return codes

Translates each source code type the source coding list into a target type and returns all conversions as a concept set

def write_vocab_version(phen_path: pathlib.Path):
772def write_vocab_version(phen_path: Path):
773    # write the vocab version files
774
775    if not trud.VERSION_PATH.exists():
776        raise FileNotFoundError(
777            f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed"
778        )
779
780    if not omop.VERSION_PATH.exists():
781        raise FileNotFoundError(
782            f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed"
783        )
784
785    with trud.VERSION_PATH.open("r") as file:
786        trud_version = yaml.safe_load(file)
787
788    with omop.VERSION_PATH.open("r") as file:
789        omop_version = yaml.safe_load(file)
790
791    # Create the combined YAML structure
792    version_data = {
793        "versions": {
794            "acmc": acmc.__version__,
795            "trud": trud_version,
796            "omop": omop_version,
797        }
798    }
799
800    with open(phen_path / VOCAB_VERSION_FILE, "w") as file:
801        yaml.dump(
802            version_data,
803            file,
804            Dumper=util.QuotedDumper,
805            default_flow_style=False,
806            sort_keys=False,
807            default_style='"',
808        )
def map( phen_dir: str, target_code_type: str, not_translate: bool, no_metadata: bool, do_reverse_translate: bool):
811def map(
812    phen_dir: str,
813    target_code_type: str,
814    not_translate: bool,
815    no_metadata: bool,
816    do_reverse_translate: bool,
817):
818    _logger.info(f"Processing phenotype: {phen_dir}")
819
820    # Validate configuration
821    validate(phen_dir)
822
823    # initialise paths
824    phen_path = Path(phen_dir)
825    config_path = phen_path / CONFIG_FILE
826
827    # load configuration
828    with config_path.open("r") as file:
829        config = yaml.safe_load(file)
830    phenotype = config["phenotype"]
831
832    if len(phenotype["map"]) == 0:
833        raise ValueError(f"No map codes defined in the phenotype configuration")
834
835    if target_code_type is not None and target_code_type not in phenotype["map"]:
836        raise ValueError(
837            f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}"
838        )
839
840    if target_code_type is not None:
841        _map_target_code_type(
842            phen_path,
843            phenotype,
844            target_code_type,
845            not_translate,
846            no_metadata,
847            do_reverse_translate,
848        )
849    else:
850        for t in phenotype["map"]:
851            _map_target_code_type(
852                phen_path,
853                phenotype,
854                t,
855                not_translate,
856                no_metadata,
857                do_reverse_translate,
858            )
859
860    _logger.info(f"Phenotype processed successfully")
def add_metadata( codes: pandas.core.frame.DataFrame, metadata: dict, no_metadata: bool) -> pandas.core.frame.DataFrame:
1056def add_metadata(
1057    codes: pd.DataFrame,
1058    metadata: dict,
1059    no_metadata: bool,
1060) -> pd.DataFrame:
1061    """Add concept set metadata, stored as a dictionary, to each concept row"""
1062
1063    if not no_metadata:
1064        for meta_name, meta_value in metadata.items():
1065            codes[meta_name] = meta_value
1066            _logger.debug(
1067                f"Adding metadata for concept set: metadata name {meta_name}, metadata value {meta_value}"
1068            )
1069
1070    return codes

Add concept set metadata, stored as a dictionary, to each concept row

def publish(phen_dir: str, msg: str, remote_url: str, increment: str = 'patch'):
1105def publish(
1106    phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC
1107):
1108    """Publishes updates to the phenotype by commiting all changes to the repo directory"""
1109
1110    # Validate config
1111    validate(phen_dir)
1112    phen_path = Path(phen_dir)
1113
1114    # load git repo and set the branch
1115    repo = git.Repo(phen_path)
1116    if DEFAULT_GIT_BRANCH in repo.branches:
1117        main_branch = repo.heads[DEFAULT_GIT_BRANCH]
1118        main_branch.checkout()
1119    else:
1120        raise AttributeError(
1121            f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}"
1122        )
1123
1124    # check if any changes to publish
1125    if not repo.is_dirty() and not repo.untracked_files:
1126        if remote_url is not None and "origin" not in repo.remotes:
1127            _logger.info(f"First publish to remote url {remote_url}")
1128        else:
1129            _logger.info("Nothing to publish, no changes to the repo")
1130            return
1131
1132    # get next version
1133    new_version_str = _generate_version_tag(repo, increment)
1134    _logger.info(f"New version: {new_version_str}")
1135
1136    # Write version in configuration file
1137    config_path = phen_path / CONFIG_FILE
1138    with config_path.open("r") as file:
1139        config = yaml.safe_load(file)
1140
1141    config["phenotype"]["version"] = new_version_str
1142    with open(config_path, "w") as file:
1143        yaml.dump(
1144            config,
1145            file,
1146            Dumper=util.QuotedDumper,
1147            default_flow_style=False,
1148            sort_keys=False,
1149            default_style='"',
1150        )
1151
1152    # Add and commit changes to repo including version updates
1153    commit_message = f"Committing updates to phenotype {phen_path}"
1154    repo.git.add("--all")
1155    repo.index.commit(commit_message)
1156
1157    # Add tag to the repo
1158    repo.create_tag(new_version_str)
1159
1160    # push to origin if a remote repo
1161    if remote_url is not None and "origin" not in repo.remotes:
1162        git_url = _construct_git_url(remote_url)
1163        repo.create_remote("origin", git_url)
1164
1165    try:
1166        if "origin" in repo.remotes:
1167            _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}")
1168            origin = repo.remotes.origin
1169            _logger.info(f"Pushing main branch to remote repo")
1170            repo.git.push("--set-upstream", "origin", "main")
1171            _logger.info(f"Pushing version tags to remote git repo")
1172            origin.push(tags=True)
1173            _logger.debug("Changes pushed to 'origin'")
1174        else:
1175            _logger.debug("Remote 'origin' is not set")
1176    except Exception as e:
1177        tag_ref = repo.tags[new_version_str]
1178        repo.delete_tag(tag_ref)
1179        repo.git.reset("--soft", "HEAD~1")
1180        raise e
1181
1182    _logger.info(f"Phenotype published successfully")

Publishes updates to the phenotype by commiting all changes to the repo directory

def export(phen_dir: str, version: str):
1185def export(phen_dir: str, version: str):
1186    """Exports a phen repo at a specific tagged version into a target directory"""
1187    _logger.info(f"Exporting phenotype {phen_dir} at version {version}")
1188
1189    # validate configuration
1190    validate(phen_dir)
1191    phen_path = Path(phen_dir)
1192
1193    # load configuration
1194    config_path = phen_path / CONFIG_FILE
1195    with config_path.open("r") as file:
1196        config = yaml.safe_load(file)
1197
1198    map_path = phen_path / MAP_DIR
1199    if not map_path.exists():
1200        _logger.warning(f"Map path does not exist '{map_path}'")
1201
1202    export_path = phen_path / OMOP_PATH
1203    # check export directory exists and if not create it
1204    if not export_path.exists():
1205        export_path.mkdir(parents=True)
1206        _logger.debug(f"OMOP export directory '{export_path}' created.")
1207
1208    # omop export db
1209    export_db_path = omop.export(
1210        map_path,
1211        export_path,
1212        config["phenotype"]["version"],
1213        config["phenotype"]["omop"],
1214    )
1215
1216    # write to tables
1217    # export as csv
1218    _logger.info(f"Phenotype exported successfully")

Exports a phen repo at a specific tagged version into a target directory

def copy(phen_dir: str, target_dir: str, version: str):
1221def copy(phen_dir: str, target_dir: str, version: str):
1222    """Copys a phen repo at a specific tagged version into a target directory"""
1223
1224    # Validate
1225    validate(phen_dir)
1226    phen_path = Path(phen_dir)
1227
1228    # Check target directory exists
1229    target_path = Path(target_dir)
1230    if not target_path.exists():
1231        raise FileNotFoundError(f"The target directory {target_path} does not exist")
1232
1233    # Set copy directory
1234    copy_path = target_path / version
1235    _logger.info(f"Copying repo {phen_path} to {copy_path}")
1236
1237    if (
1238        copy_path.exists() and copy_path.is_dir()
1239    ):  # Check if it exists and is a directory
1240        copy = _check_delete_dir(
1241            copy_path,
1242            f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ",
1243        )
1244    else:
1245        copy = True
1246
1247    if not copy:
1248        _logger.info(f"Not copying the version {version}")
1249        return
1250
1251    _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...")
1252    repo = git.Repo.clone_from(phen_path, copy_path)
1253
1254    # Check out the latest commit or specified version
1255    if version:
1256        # Checkout a specific version (e.g., branch, tag, or commit hash)
1257        _logger.info(f"Checking out version {version}...")
1258        repo.git.checkout(version)
1259    else:
1260        # Checkout the latest commit (HEAD)
1261        _logger.info(f"Checking out the latest commit...")
1262        repo.git.checkout("HEAD")
1263
1264    _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}")
1265
1266    _logger.info(f"Phenotype copied successfully")

Copys a phen repo at a specific tagged version into a target directory

def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1270def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]:
1271    """Extracts concepts as {name: file_path} dictionary and a name set."""
1272    concepts_dict = {
1273        item["name"]: [file["path"] for file in item["files"]]
1274        for item in config_data["phenotype"]["concept_sets"]
1275    }
1276    name_set = set(concepts_dict.keys())
1277    return concepts_dict, name_set

Extracts concepts as {name: file_path} dictionary and a name set.

def diff_config(old_config: dict, new_config: dict) -> str:
1291def diff_config(old_config: dict, new_config: dict) -> str:
1292    report = f"\n# Changes to phenotype configuration\n"
1293    report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n"
1294
1295    old_concepts, old_names = extract_concepts(old_config)
1296    new_concepts, new_names = extract_concepts(new_config)
1297
1298    # Check added and removed concept set names
1299    added_names = new_names - old_names  # Names that appear in new but not in old
1300    removed_names = old_names - new_names  # Names that were in old but not in new
1301
1302    # find file path changes for unchanged names
1303    unchanged_names = old_names & new_names  # Names that exist in both
1304    file_diff = DeepDiff(
1305        {name: old_concepts[name] for name in unchanged_names},
1306        {name: new_concepts[name] for name in unchanged_names},
1307    )
1308
1309    # Find renamed concepts (same file, different name)
1310    renamed_concepts = []
1311    for removed in removed_names:
1312        old_path = old_concepts[removed]
1313        for added in added_names:
1314            new_path = new_concepts[added]
1315            if old_path == new_path:
1316                renamed_concepts.append((removed, added))
1317
1318    # Remove renamed concepts from added and removed sets
1319    for old_name, new_name in renamed_concepts:
1320        added_names.discard(new_name)
1321        removed_names.discard(old_name)
1322
1323    # generate config report
1324    if added_names:
1325        report += "## Added Concepts\n"
1326        for name in added_names:
1327            report += f"- `{name}` (File: `{new_concepts[name]}`)\n"
1328        report += "\n"
1329
1330    if removed_names:
1331        report += "## Removed Concepts\n"
1332        for name in removed_names:
1333            report += f"- `{name}` (File: `{old_concepts[name]}`)\n"
1334        report += "\n"
1335
1336    if renamed_concepts:
1337        report += "## Renamed Concepts\n"
1338        for old_name, new_name in renamed_concepts:
1339            report += (
1340                f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n"
1341            )
1342        report += "\n"
1343
1344    if "values_changed" in file_diff:
1345        report += "## Updated File Paths\n"
1346        for name, change in file_diff["values_changed"].items():
1347            old_file = change["old_value"]
1348            new_file = change["new_value"]
1349            clean_name = name.split("root['")[1].split("']")[0]
1350            report += (
1351                f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n"
1352            )
1353        report += "\n"
1354
1355    if not (
1356        added_names
1357        or removed_names
1358        or renamed_concepts
1359        or file_diff.get("values_changed")
1360    ):
1361        report += "No changes in concept sets.\n"
1362
1363    return report
def diff_map_files(old_map_path: pathlib.Path, new_map_path: pathlib.Path) -> str:
1366def diff_map_files(old_map_path: Path, new_map_path: Path) -> str:
1367    old_output_files = [
1368        file.name
1369        for file in old_map_path.iterdir()
1370        if file.is_file() and not file.name.startswith(".")
1371    ]
1372    new_output_files = [
1373        file.name
1374        for file in new_map_path.iterdir()
1375        if file.is_file() and not file.name.startswith(".")
1376    ]
1377
1378    # Convert the lists to sets for easy comparison
1379    old_output_set = set(old_output_files)
1380    new_output_set = set(new_output_files)
1381
1382    # Outputs that are in old_output_set but not in new_output_set (removed files)
1383    removed_outputs = old_output_set - new_output_set
1384    # Outputs that are in new_output_set but not in old_output_set (added files)
1385    added_outputs = new_output_set - old_output_set
1386    # Outputs that are the intersection of old_output_set and new_output_set
1387    common_outputs = old_output_set & new_output_set
1388
1389    report = f"\n# Changes to available translations\n"
1390    report += f"This compares the coding translations files available.\n\n"
1391    report += f"- Removed outputs: {sorted(list(removed_outputs))}\n"
1392    report += f"- Added outputs: {sorted(list(added_outputs))}\n"
1393    report += f"- Common outputs: {sorted(list(common_outputs))}\n\n"
1394
1395    # Step N: Compare common outputs between versions
1396    report += f"# Changes to concepts in translation files\n\n"
1397    report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n"
1398    for file in common_outputs:
1399        old_output = old_map_path / file
1400        new_output = new_map_path / file
1401
1402        _logger.debug(f"Old ouptput: {str(old_output.resolve())}")
1403        _logger.debug(f"New ouptput: {str(new_output.resolve())}")
1404
1405        df1 = pd.read_csv(old_output)
1406        df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1407        df2 = pd.read_csv(new_output)
1408        df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count()
1409
1410        # Check for added and removed concepts
1411        report += f"- File {file}\n"
1412        sorted_list = sorted(list(set(df1.index) - set(df2.index)))
1413        report += f"- Removed concepts {sorted_list}\n"
1414        sorted_list = sorted(list(set(df2.index) - set(df1.index)))
1415        report += f"- Added concepts {sorted_list}\n"
1416
1417        # Check for changed concepts
1418        diff = df2 - df1  # diff in counts
1419        diff = diff[
1420            (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna()
1421        ]  # get non-zero counts
1422        s = "\n"
1423        if len(diff.index) > 0:
1424            for concept, row in diff.iterrows():
1425                s += "\t - {} {}\n".format(concept, row["CONCEPT"])
1426            report += f"- Changed concepts {s}\n\n"
1427        else:
1428            report += f"- Changed concepts []\n\n"
1429
1430    return report
def diff_phen( new_phen_path: pathlib.Path, new_version: str, old_phen_path: pathlib.Path, old_version: str, report_path: pathlib.Path, not_check_config: bool):
1433def diff_phen(
1434    new_phen_path: Path,
1435    new_version: str,
1436    old_phen_path: Path,
1437    old_version: str,
1438    report_path: Path,
1439    not_check_config: bool,
1440):
1441    """Compare the differences between two versions of a phenotype"""
1442
1443    # write report heading
1444    report = f"# Phenotype Comparison Report\n"
1445
1446    # Step 1: check differences configuration files
1447    if not not_check_config:
1448        # validate phenotypes
1449        _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}")
1450        validate(str(old_phen_path.resolve()))
1451        _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}")
1452        validate(str(new_phen_path.resolve()))
1453
1454        # get old and new config
1455        old_config_path = old_phen_path / CONFIG_FILE
1456        with old_config_path.open("r") as file:
1457            old_config = yaml.safe_load(file)
1458        new_config_path = new_phen_path / CONFIG_FILE
1459        with new_config_path.open("r") as file:
1460            new_config = yaml.safe_load(file)
1461
1462        # write report
1463        report += f"## Original phenotype\n"
1464        report += f"  - {old_config['phenotype']['omop']['vocabulary_id']}\n"
1465        report += f"  - {old_version}\n"
1466        report += f"  - {str(old_phen_path.resolve())}\n"
1467        report += f"## Changed phenotype:\n"
1468        report += f"  - {new_config['phenotype']['omop']['vocabulary_id']}\n"
1469        report += f"  - {new_version}\n"
1470        report += f"  - {str(new_phen_path.resolve())}\n"
1471
1472        # Convert list of dicts into a dict: {name: file}
1473        report += diff_config(old_config, new_config)
1474
1475    # Step 2: check differences between map files
1476    # List files from output directories
1477    old_map_path = old_phen_path / MAP_DIR
1478    new_map_path = new_phen_path / MAP_DIR
1479    report += diff_map_files(old_map_path, new_map_path)
1480
1481    # initialise report file
1482    _logger.debug(f"Writing to report file {str(report_path.resolve())}")
1483    report_file = open(report_path, "w")
1484    report_file.write(report)
1485    report_file.close()
1486
1487    _logger.info(f"Phenotypes diff'd successfully")

Compare the differences between two versions of a phenotype

def diff( phen_dir: str, version: str, old_phen_dir: str, old_version: str, not_check_config: bool):
1490def diff(
1491    phen_dir: str,
1492    version: str,
1493    old_phen_dir: str,
1494    old_version: str,
1495    not_check_config: bool,
1496):
1497    # make tmp directory .acmc
1498    timestamp = time.strftime("%Y%m%d_%H%M%S")
1499    temp_dir = Path(f".acmc/diff_{timestamp}")
1500
1501    changed_phen_path = Path(phen_dir)
1502    if not changed_phen_path.exists():
1503        raise ValueError(
1504            f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}"
1505        )
1506
1507    old_phen_path = Path(old_phen_dir)
1508    if not old_phen_path.exists():
1509        raise ValueError(
1510            f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}"
1511        )
1512
1513    #    t_path = old_phen_path / "config.yml"
1514    #    with t_path.open("r") as file:
1515    #        c = yaml.safe_load(file)
1516
1517    try:
1518        # Create the directory
1519        temp_dir.mkdir(parents=True, exist_ok=True)
1520        _logger.debug(f"Temporary directory created: {temp_dir}")
1521
1522        # Create temporary directories
1523        changed_path = temp_dir / "changed"
1524        changed_path.mkdir(parents=True, exist_ok=True)
1525        old_path = temp_dir / "old"
1526        old_path.mkdir(parents=True, exist_ok=True)
1527
1528        # checkout changed
1529        if version == "latest":
1530            _logger.debug(
1531                f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..."
1532            )
1533            shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True)
1534        else:
1535            _logger.debug(
1536                f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..."
1537            )
1538            changed_repo = git.Repo.clone_from(changed_phen_path, changed_path)
1539            changed_repo.git.checkout(version)
1540
1541        # checkout old
1542        if old_version == "latest":
1543            _logger.debug(
1544                f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1545            )
1546            shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True)
1547        else:
1548            _logger.debug(
1549                f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..."
1550            )
1551            old_repo = git.Repo.clone_from(old_phen_dir, old_path)
1552            old_repo.git.checkout(old_version)
1553
1554        report_filename = f"{version}_{old_version}_diff.md"
1555        report_path = changed_phen_path / report_filename
1556        # diff old with new
1557        diff_phen(
1558            changed_path, version, old_path, old_version, report_path, not_check_config
1559        )
1560
1561    finally:
1562        # clean up tmp directory
1563        if temp_dir.exists():
1564            shutil.rmtree(temp_dir)