acmc.phen
phenotype.py module
This module provides functionality for managing phenotypes.
1""" 2phenotype.py module 3 4This module provides functionality for managing phenotypes. 5""" 6 7import argparse 8import pandas as pd 9import numpy as np 10import json 11import os 12import sqlite3 13import sys 14import shutil 15import time 16import git 17import re 18import logging 19import requests 20import yaml 21import semver 22from git import Repo 23from cerberus import Validator # type: ignore 24from deepdiff import DeepDiff 25from pathlib import Path 26from urllib.parse import urlparse, urlunparse 27from typing import Tuple, Set, Any 28import acmc 29from acmc import trud, omop, parse, util, logging_config as lc 30 31# set up logging 32_logger = lc.setup_logger() 33 34pd.set_option("mode.chained_assignment", None) 35 36PHEN_DIR = "phen" 37"""Default phenotype directory name""" 38 39DEFAULT_PHEN_PATH = Path("./workspace") / PHEN_DIR 40"""Default phenotype directory path""" 41 42CONCEPTS_DIR = "concepts" 43"""Default concepts directory name""" 44 45MAP_DIR = "map" 46"""Default map directory name""" 47 48CONCEPT_SET_DIR = "concept-sets" 49"""Default concept set directory name""" 50 51CSV_PATH = Path(CONCEPT_SET_DIR) / "csv" 52"""Default CSV concept set directory path""" 53 54OMOP_PATH = Path(CONCEPT_SET_DIR) / "omop" 55"""Default OMOP concept set directory path""" 56 57DEFAULT_PHEN_DIR_LIST = [CONCEPTS_DIR, MAP_DIR, CONCEPT_SET_DIR] 58"""List of default phenotype directories""" 59 60CONFIG_FILE = "config.yml" 61"""Default configuration filename""" 62 63VOCAB_VERSION_FILE = "vocab_version.yml" 64"""Default vocabulary version filename""" 65 66SEMANTIC_VERSION_TYPES = ["major", "minor", "patch"] 67"""List of semantic version increment types""" 68 69DEFAULT_VERSION_INC = "patch" 70"""Default semantic version increment type""" 71 72DEFAULT_GIT_BRANCH = "main" 73"""Default phenotype repo branch name""" 74 75SPLIT_COL_ACTION = "split_col" 76"""Split column preprocessing action type""" 77 78CODES_COL_ACTION = "codes_col" 79"""Codes column preprocessing action type""" 80 81DIVIDE_COL_ACTION = "divide_col" 82"""Divide column preprocessing action type""" 83 84COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION] 85"""List of column preprocessing action types""" 86 87CODE_FILE_TYPES = [".xlsx", ".xls", ".csv"] 88"""List of supported source concept coding list file types""" 89 90# config.yaml schema 91CONFIG_SCHEMA = { 92 "phenotype": { 93 "type": "dict", 94 "required": True, 95 "schema": { 96 "version": { 97 "type": "string", 98 "required": True, 99 "regex": r"^\d+\.\d+\.\d+$", # Enforces 'vN.N.N' format 100 }, 101 "omop": { 102 "type": "dict", 103 "required": True, 104 "schema": { 105 "vocabulary_id": {"type": "string", "required": True}, 106 "vocabulary_name": {"type": "string", "required": True}, 107 "vocabulary_reference": { 108 "type": "string", 109 "required": True, 110 "regex": r"^https?://.*", # Ensures it's a URL 111 }, 112 }, 113 }, 114 "map": { 115 "type": "list", 116 "schema": { 117 "type": "string", 118 "allowed": list( 119 parse.SUPPORTED_CODE_TYPES 120 ), # Ensure only predefined values are allowed 121 }, 122 }, 123 "concept_sets": { 124 "type": "list", 125 "required": True, 126 "schema": { 127 "type": "dict", 128 "schema": { 129 "name": {"type": "string", "required": True}, 130 "file": { 131 "type": "dict", 132 "required": False, 133 "schema": { 134 "path": {"type": "string", "required": True}, 135 "columns": {"type": "dict", "required": True}, 136 "category": { 137 "type": "string" 138 }, # Optional but must be string if present 139 "actions": { 140 "type": "dict", 141 "schema": {"divide_col": {"type": "string"}}, 142 }, 143 }, 144 }, 145 "metadata": {"type": "dict", "required": True}, 146 }, 147 }, 148 }, 149 }, 150 } 151} 152"""Phenotype config.yml schema definition""" 153 154 155class PhenValidationException(Exception): 156 """Custom exception class raised when validation errors in phenotype configuration file""" 157 158 def __init__(self, message, validation_errors=None): 159 super().__init__(message) 160 self.validation_errors = validation_errors 161 162 163def _construct_git_url(remote_url: str): 164 """Constructs a git url for github or gitlab including a PAT token environment variable""" 165 # check the url 166 parsed_url = urlparse(remote_url) 167 168 # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd 169 # need to update this function if the URL scheme is different. 170 if "github.com" in parsed_url.netloc: 171 # get GitHub PAT from environment variable 172 auth = os.getenv("ACMC_GITHUB_PAT") 173 if not auth: 174 raise ValueError( 175 "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable." 176 ) 177 else: 178 # get GitLab PAT from environment variable 179 auth = os.getenv("ACMC_GITLAB_PAT") 180 if not auth: 181 raise ValueError( 182 "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable." 183 ) 184 auth = f"oauth2:{auth}" 185 186 # Construct the new URL with credentials 187 new_netloc = f"{auth}@{parsed_url.netloc}" 188 return urlunparse( 189 ( 190 parsed_url.scheme, 191 new_netloc, 192 parsed_url.path, 193 parsed_url.params, 194 parsed_url.query, 195 parsed_url.fragment, 196 ) 197 ) 198 199 200def _create_empty_git_dir(path: Path): 201 """Creates a directory with a .gitkeep file so that it's tracked in git""" 202 path.mkdir(exist_ok=True) 203 keep_path = path / ".gitkeep" 204 keep_path.touch(exist_ok=True) 205 206 207def _check_delete_dir(path: Path, msg: str) -> bool: 208 """Checks on the command line if a user wants to delete a directory 209 210 Args: 211 path (Path): path of the directory to be deleted 212 msg (str): message to be displayed to the user 213 214 Returns: 215 Boolean: True if deleted 216 """ 217 deleted = False 218 219 user_input = input(f"{msg}").strip().lower() 220 if user_input in ["yes", "y"]: 221 shutil.rmtree(path) 222 deleted = True 223 else: 224 _logger.info("Directory was not deleted.") 225 226 return deleted 227 228 229def init(phen_dir: str, remote_url: str): 230 """Initial phenotype directory as git repo with standard structure""" 231 _logger.info(f"Initialising Phenotype in directory: {phen_dir}") 232 phen_path = Path(phen_dir) 233 234 # check if directory already exists and ask user if they want to recreate it 235 if ( 236 phen_path.exists() and phen_path.is_dir() 237 ): # Check if it exists and is a directory 238 configure = _check_delete_dir( 239 phen_path, 240 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 241 ) 242 else: 243 configure = True 244 245 if not configure: 246 _logger.info(f"Exiting, phenotype not initiatised") 247 return 248 249 # Initialise repo from local or remote 250 repo: Repo 251 252 # if remote then clone the repo otherwise init a local repo 253 if remote_url != None: 254 # add PAT token to the URL 255 git_url = _construct_git_url(remote_url) 256 257 # clone the repo 258 git_cmd = git.cmd.Git() 259 git_cmd.clone(git_url, phen_path) 260 261 # open repo 262 repo = Repo(phen_path) 263 # check if there are any commits (new repo has no commits) 264 if ( 265 len(repo.branches) == 0 or repo.head.is_detached 266 ): # Handle detached HEAD (e.g., after init) 267 _logger.debug("The phen repository has no commits yet.") 268 commit_count = 0 269 else: 270 # Get the total number of commits in the default branch 271 commit_count = sum(1 for _ in repo.iter_commits()) 272 _logger.debug(f"Repo has previous commits: {commit_count}") 273 else: 274 # local repo, create the directories and init 275 phen_path.mkdir(parents=True, exist_ok=True) 276 _logger.debug(f"Phen directory '{phen_path}' has been created.") 277 repo = git.Repo.init(phen_path) 278 commit_count = 0 279 280 phen_path = phen_path.resolve() 281 # initialise empty repos 282 if commit_count == 0: 283 # create initial commit 284 initial_file_path = phen_path / "README.md" 285 with open(initial_file_path, "w") as file: 286 file.write( 287 "# Initial commit\nThis is the first commit in the phen repository.\n" 288 ) 289 repo.index.add([initial_file_path]) 290 repo.index.commit("Initial commit") 291 commit_count = 1 292 293 # Checkout the phens default branch, creating it if it does not exist 294 if DEFAULT_GIT_BRANCH in repo.branches: 295 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 296 main_branch.checkout() 297 else: 298 main_branch = repo.create_head(DEFAULT_GIT_BRANCH) 299 main_branch.checkout() 300 301 # if the phen path does not contain the config file then initialise the phen type 302 config_path = phen_path / CONFIG_FILE 303 if config_path.exists(): 304 _logger.debug(f"Phenotype configuration files already exist") 305 return 306 307 _logger.info("Creating phen directory structure and config files") 308 for d in DEFAULT_PHEN_DIR_LIST: 309 _create_empty_git_dir(phen_path / d) 310 311 # create empty phen config file 312 config = { 313 "phenotype": { 314 "version": "0.0.0", 315 "omop": { 316 "vocabulary_id": "", 317 "vocabulary_name": "", 318 "vocabulary_reference": "", 319 }, 320 "translate": [], 321 "concept_sets": [], 322 } 323 } 324 325 with open(phen_path / CONFIG_FILE, "w") as file: 326 yaml.dump( 327 config, 328 file, 329 Dumper=util.QuotedDumper, 330 default_flow_style=False, 331 sort_keys=False, 332 default_style='"', 333 ) 334 335 # add git ignore 336 ignore_content = """# Ignore SQLite database files 337*.db 338*.sqlite3 339 340# Ignore SQLite journal and metadata files 341*.db-journal 342*.sqlite3-journal 343 344# python 345.ipynb_checkpoints 346 """ 347 ignore_path = phen_path / ".gitignore" 348 with open(ignore_path, "w") as file: 349 file.write(ignore_content) 350 351 # add to git repo and commit 352 for d in DEFAULT_PHEN_DIR_LIST: 353 repo.git.add(phen_path / d) 354 repo.git.add(all=True) 355 repo.index.commit("initialised the phen git repo.") 356 357 _logger.info(f"Phenotype initialised successfully") 358 359 360def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str): 361 """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin" 362 363 Args: 364 phen_dir (str): local directory path where the upstream repo is to be cloned 365 upstream_url (str): url to the upstream repo 366 upstream_version (str): version in the upstream repo to clone 367 new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None. 368 369 Raises: 370 ValueError: if the specified version is not in the upstream repo 371 ValueError: if the upstream repo is not a valid phenotype repo 372 ValueError: if there's any other problems with Git 373 """ 374 _logger.info( 375 f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}" 376 ) 377 378 phen_path = Path(phen_dir) 379 # check if directory already exists and ask user if they want to recreate it 380 if ( 381 phen_path.exists() and phen_path.is_dir() 382 ): # Check if it exists and is a directory 383 configure = _check_delete_dir( 384 phen_path, 385 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 386 ) 387 else: 388 configure = True 389 390 if not configure: 391 _logger.info(f"Exiting, phenotype not initiatised") 392 return 393 394 try: 395 # Clone repo 396 git_url = _construct_git_url(upstream_url) 397 repo = git.Repo.clone_from(git_url, phen_path) 398 399 # Fetch all branches and tags 400 repo.remotes.origin.fetch() 401 402 # Check if the version exists 403 available_refs = [ref.name.split("/")[-1] for ref in repo.references] 404 if upstream_version not in available_refs: 405 raise ValueError( 406 f"Version '{upstream_version}' not found in the repository: {upstream_url}." 407 ) 408 409 # Checkout the specified version 410 repo.git.checkout(upstream_version) 411 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 412 main_branch.checkout() 413 414 # Check if 'config.yml' exists in the root directory 415 config_path = phen_path / "config.yml" 416 if not os.path.isfile(config_path): 417 raise ValueError( 418 f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory." 419 ) 420 421 # Validate the phenotype is compatible with the acmc tool 422 validate(str(phen_path.resolve())) 423 424 # Delete each tag locally 425 tags = repo.tags 426 for tag in tags: 427 repo.delete_tag(tag) 428 _logger.debug(f"Deleted tags from forked repo: {tag}") 429 430 # Add upstream remote 431 repo.create_remote("upstream", upstream_url) 432 remote = repo.remotes["origin"] 433 repo.delete_remote(remote) # Remove existing origin 434 435 # Optionally set a new origin remote 436 if new_origin_url: 437 git_url = _construct_git_url(new_origin_url) 438 repo.create_remote("origin", git_url) 439 repo.git.push("--set-upstream", "origin", "main") 440 441 _logger.info(f"Repository forked successfully at {phen_path}") 442 _logger.info(f"Upstream set to {upstream_url}") 443 if new_origin_url: 444 _logger.info(f"Origin set to {new_origin_url}") 445 446 except Exception as e: 447 if phen_path.exists(): 448 shutil.rmtree(phen_path) 449 raise ValueError(f"Error occurred during repository fork: {str(e)}") 450 451 452def validate(phen_dir: str): 453 """Validates the phenotype directory is a git repo with standard structure""" 454 _logger.info(f"Validating phenotype: {phen_dir}") 455 phen_path = Path(phen_dir) 456 if not phen_path.is_dir(): 457 raise NotADirectoryError( 458 f"Error: '{str(phen_path.resolve())}' is not a directory" 459 ) 460 461 config_path = phen_path / CONFIG_FILE 462 if not config_path.is_file(): 463 raise FileNotFoundError( 464 f"Error: phen configuration file '{config_path}' does not exist." 465 ) 466 467 concepts_path = phen_path / CONCEPTS_DIR 468 if not concepts_path.is_dir(): 469 raise FileNotFoundError( 470 f"Error: source concepts directory {concepts_path} does not exist." 471 ) 472 473 # Calidate the directory is a git repo 474 try: 475 git.Repo(phen_path) 476 except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError): 477 raise Exception(f"Phen directory {phen_path} is not a git repo") 478 479 # Load configuration File 480 if config_path.suffix == ".yml": 481 try: 482 with config_path.open("r") as file: 483 phenotype = yaml.safe_load(file) 484 485 validator = Validator(CONFIG_SCHEMA) 486 if validator.validate(phenotype): 487 _logger.debug("YAML structure is valid.") 488 else: 489 _logger.error(f"YAML structure validation failed: {validator.errors}") 490 raise Exception(f"YAML structure validation failed: {validator.errors}") 491 except yaml.YAMLError as e: 492 _logger.error(f"YAML syntax error: {e}") 493 raise e 494 else: 495 raise Exception( 496 f"Unsupported configuration filetype: {str(config_path.resolve())}" 497 ) 498 499 # initiatise 500 validation_errors = [] 501 phenotype = phenotype["phenotype"] 502 code_types = parse.CodeTypeParser().code_types 503 504 # check the version number is of the format vn.n.n 505 match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"]) 506 if not match: 507 validation_errors.append( 508 f"Invalid version format in configuration file: {phenotype['version']}" 509 ) 510 511 # create a list of all the concept set names defined in the concept set configuration 512 concept_set_names = [] 513 for item in phenotype["concept_sets"]: 514 if item["name"] in concept_set_names: 515 validation_errors.append( 516 f"Duplicate concept set defined in concept sets {item['name'] }" 517 ) 518 else: 519 concept_set_names.append(item["name"]) 520 521 # check codes definition 522 for item in phenotype["concept_sets"]: 523 # check concepte code file exists 524 concept_code_file_path = concepts_path / item["file"]["path"] 525 if not concept_code_file_path.exists(): 526 validation_errors.append( 527 f"Coding file {str(concept_code_file_path.resolve())} does not exist" 528 ) 529 530 # check concepte code file is not empty 531 if concept_code_file_path.stat().st_size == 0: 532 validation_errors.append( 533 f"Coding file {str(concept_code_file_path.resolve())} is an empty file" 534 ) 535 536 # check code file type is supported 537 if concept_code_file_path.suffix not in CODE_FILE_TYPES: 538 raise ValueError( 539 f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types" 540 ) 541 542 # check columns specified are a supported medical coding type 543 for column in item["file"]["columns"]: 544 if column not in code_types: 545 validation_errors.append( 546 f"Column type {column} for file {concept_code_file_path} is not supported" 547 ) 548 549 # check the actions are supported 550 if "actions" in item["file"]: 551 for action in item["file"]["actions"]: 552 if action not in COL_ACTIONS: 553 validation_errors.append(f"Action {action} is not supported") 554 555 if len(validation_errors) > 0: 556 _logger.error(validation_errors) 557 raise PhenValidationException( 558 f"Configuration file {str(config_path.resolve())} failed validation", 559 validation_errors, 560 ) 561 562 _logger.info(f"Phenotype validated successfully") 563 564 565def _read_table_file(path: Path, excel_sheet: str = ""): 566 """ 567 Load Code List File 568 """ 569 570 path = path.resolve() 571 if path.suffix == ".csv": 572 df = pd.read_csv(path, dtype=str) 573 elif path.suffix == ".xlsx" or path.suffix == ".xls": 574 if excel_sheet != "": 575 df = pd.read_excel(path, sheet_name=excel_sheet, dtype=str) 576 else: 577 df = pd.read_excel(path, dtype=str) 578 elif path.suffix == ".dta": 579 df = pd.read_stata(path) 580 else: 581 raise ValueError( 582 f"Unsupported filetype {path.suffix}, only support{CODE_FILE_TYPES} code file types" 583 ) 584 585 return df 586 587 588def _process_actions(df: pd.DataFrame, concept_set: dict) -> pd.DataFrame: 589 # Perform Structural Changes to file before preprocessing 590 _logger.debug("Processing file structural actions") 591 if ( 592 "actions" in concept_set["file"] 593 and "split_col" in concept_set["file"]["actions"] 594 and "codes_col" in concept_set["file"]["actions"] 595 ): 596 split_col = concept_set["file"]["actions"]["split_col"] 597 codes_col = concept_set["file"]["actions"]["codes_col"] 598 _logger.debug( 599 "Action: Splitting", 600 split_col, 601 "column into:", 602 df[split_col].unique(), 603 ) 604 codes = df[codes_col] 605 oh = pd.get_dummies(df[split_col], dtype=bool) # one hot encode 606 oh = oh.where((oh != True), codes, axis=0) # fill in 1s with codes 607 oh[oh == False] = np.nan # replace 0s with None 608 df = pd.concat([df, oh], axis=1) # merge in new columns 609 610 return df 611 612 613def _preprocess_source_concepts( 614 df: pd.DataFrame, concept_set: dict, code_file_path: Path 615) -> Tuple[pd.DataFrame, list]: 616 """Perform QA Checks on columns individually and append to df""" 617 out = pd.DataFrame([]) # create output df to append to 618 code_errors = [] # list of errors from processing 619 620 # remove unnamed columns due to extra commas, missing headers, or incorrect parsing 621 df = df.drop(columns=[col for col in df.columns if "Unnamed" in col]) 622 623 # Preprocess codes 624 code_types = parse.CodeTypeParser().code_types 625 for code_type in concept_set["file"]["columns"]: 626 parser = code_types[code_type] 627 _logger.info(f"Processing {code_type} codes for {code_file_path}") 628 629 # get codes by column name 630 source_col_name = concept_set["file"]["columns"][code_type] 631 codes = df[source_col_name].dropna() 632 codes = codes.astype(str) # convert to string 633 codes = codes.str.strip() # remove excess spaces 634 635 # process codes, validating them using parser and returning the errors 636 codes, errors = parser.process(codes, code_file_path) 637 if len(errors) > 0: 638 code_errors.extend(errors) 639 _logger.warning(f"Codes validation failed with {len(errors)} errors") 640 641 # add processed codes to df 642 new_col_name = f"{source_col_name}_SOURCE" 643 df = df.rename(columns={source_col_name: new_col_name}) 644 process_codes = pd.DataFrame({code_type: codes}).join(df) 645 out = pd.concat( 646 [out, process_codes], 647 ignore_index=True, 648 ) 649 650 _logger.debug(out.head()) 651 652 return out, code_errors 653 654 655# Translate Df with multiple codes into single code type Series 656def translate_codes( 657 source_df: pd.DataFrame, target_code_type: str, concept_name: str 658) -> pd.DataFrame: 659 """Translates each source code type the source coding list into a target type and returns all conversions as a concept set""" 660 661 # codes = pd.DataFrame([], dtype=str) 662 codes = pd.DataFrame( 663 columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string" 664 ) 665 # Convert codes to target type 666 _logger.info(f"Converting to target code type {target_code_type}") 667 668 for source_code_type in source_df.columns: 669 # if target code type is the same as thet source code type, no translation, just appending source as target 670 if source_code_type == target_code_type: 671 copy_df = pd.DataFrame( 672 { 673 "SOURCE_CONCEPT": source_df[source_code_type], 674 "SOURCE_CONCEPT_TYPE": source_code_type, 675 "CONCEPT": source_df[source_code_type], 676 } 677 ) 678 codes = pd.concat([codes, copy_df]) 679 _logger.debug( 680 f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating" 681 ) 682 else: 683 # get the translation filename using source to target code types 684 filename = f"{source_code_type}_to_{target_code_type}.parquet" 685 map_path = trud.PROCESSED_PATH / filename 686 687 # do the mapping if it exists 688 if map_path.exists(): 689 # get mapping 690 df_map = pd.read_parquet(map_path) 691 692 # do mapping 693 translated_df = pd.merge( 694 source_df[source_code_type], df_map, how="left" 695 ) 696 697 # normalise the output 698 translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"]) 699 translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type 700 701 # add to list of codes 702 codes = pd.concat([codes, translated_df]) 703 704 else: 705 _logger.warning( 706 f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" 707 ) 708 709 codes = codes.dropna() # delete NaNs 710 711 # added concept set type to output if any translations 712 if len(codes.index) > 0: 713 codes["CONCEPT_SET"] = concept_name 714 else: 715 _logger.debug(f"No codes converted with target code type {target_code_type}") 716 717 return codes 718 719 720def _write_code_errors(code_errors: list, code_errors_path: Path): 721 err_df = pd.DataFrame( 722 [ 723 { 724 "CONCEPT": ", ".join(err.codes[~err.mask].tolist()), 725 "VOCABULARY": err.code_type, 726 "SOURCE": err.codes_file, 727 "CAUSE": err.message, 728 } 729 for err in code_errors 730 ] 731 ) 732 733 err_df = err_df.drop_duplicates() # Remove Duplicates from Error file 734 err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"]) 735 err_df.to_csv(code_errors_path, index=False, mode="w") 736 737 738def write_vocab_version(phen_path: Path): 739 # write the vocab version files 740 741 if not trud.VERSION_PATH.exists(): 742 raise FileNotFoundError( 743 f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed" 744 ) 745 746 if not omop.VERSION_PATH.exists(): 747 raise FileNotFoundError( 748 f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed" 749 ) 750 751 with trud.VERSION_PATH.open("r") as file: 752 trud_version = yaml.safe_load(file) 753 754 with omop.VERSION_PATH.open("r") as file: 755 omop_version = yaml.safe_load(file) 756 757 # Create the combined YAML structure 758 version_data = { 759 "versions": { 760 "acmc": acmc.__version__, 761 "trud": trud_version, 762 "omop": omop_version, 763 } 764 } 765 766 with open(phen_path / VOCAB_VERSION_FILE, "w") as file: 767 yaml.dump( 768 version_data, 769 file, 770 Dumper=util.QuotedDumper, 771 default_flow_style=False, 772 sort_keys=False, 773 default_style='"', 774 ) 775 776 777def map(phen_dir: str, target_code_type: str): 778 _logger.info(f"Processing phenotype: {phen_dir}") 779 780 # Validate configuration 781 validate(phen_dir) 782 783 # initialise paths 784 phen_path = Path(phen_dir) 785 config_path = phen_path / CONFIG_FILE 786 787 # load configuration 788 with config_path.open("r") as file: 789 config = yaml.safe_load(file) 790 phenotype = config["phenotype"] 791 792 if len(phenotype["map"]) == 0: 793 raise ValueError(f"No map codes defined in the phenotype configuration") 794 795 if target_code_type is not None and target_code_type not in phenotype["map"]: 796 raise ValueError( 797 f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}" 798 ) 799 800 if target_code_type is not None: 801 _map_target_code_type(phen_path, phenotype, target_code_type) 802 else: 803 for t in phenotype["map"]: 804 _map_target_code_type(phen_path, phenotype, t) 805 806 _logger.info(f"Phenotype processed successfully") 807 808 809def _map_target_code_type(phen_path: Path, phenotype: dict, target_code_type: str): 810 _logger.debug(f"Target coding format: {target_code_type}") 811 concepts_path = phen_path / CONCEPTS_DIR 812 # Create output dataframe 813 out = pd.DataFrame([]) 814 code_errors = [] 815 816 # Process each folder in codes section 817 for concept_set in phenotype["concept_sets"]: 818 _logger.debug(f"--- {concept_set['file']} ---") 819 820 # Load code file 821 codes_file_path = Path(concepts_path / concept_set["file"]["path"]) 822 df = _read_table_file(codes_file_path) 823 824 # process structural actions 825 df = _process_actions(df, concept_set) 826 827 # preprocessing and validate of source concepts 828 _logger.debug("Processing and validating source concept codes") 829 df, errors = _preprocess_source_concepts( 830 df, 831 concept_set, 832 codes_file_path, 833 ) 834 835 # create df with just the source code columns 836 source_column_names = list(concept_set["file"]["columns"].keys()) 837 source_df = df[source_column_names] 838 839 _logger.debug(source_df.columns) 840 _logger.debug(source_df.head()) 841 842 _logger.debug( 843 f"Length of errors from _preprocess_source_concepts {len(errors)}" 844 ) 845 if len(errors) > 0: 846 code_errors.extend(errors) 847 _logger.debug(f" Length of code_errors {len(code_errors)}") 848 849 # Map source concepts codes to target codes 850 # if processing a source coding list with categorical data 851 if ( 852 "actions" in concept_set["file"] 853 and "divide_col" in concept_set["file"]["actions"] 854 and len(df) > 0 855 ): 856 divide_col = concept_set["file"]["actions"]["divide_col"] 857 _logger.debug(f"Action: Dividing Table by {divide_col}") 858 _logger.debug(f"column into: {df[divide_col].unique()}") 859 df_grp = df.groupby(divide_col) 860 for cat, grp in df_grp: 861 if cat == concept_set["file"]["category"]: 862 grp = grp.drop(columns=[divide_col]) # delete categorical column 863 source_df = grp[source_column_names] 864 trans_out = translate_codes( 865 source_df, 866 target_code_type=target_code_type, 867 concept_name=concept_set["name"], 868 ) 869 out = pd.concat([out, trans_out]) 870 else: 871 source_df = df[source_column_names] 872 trans_out = translate_codes( 873 source_df, 874 target_code_type=target_code_type, 875 concept_name=concept_set["name"], 876 ) 877 out = pd.concat([out, trans_out]) 878 879 if len(code_errors) > 0: 880 _logger.error(f"The map processing has {len(code_errors)} errors") 881 error_path = phen_path / MAP_DIR / "errors" 882 error_path.mkdir(parents=True, exist_ok=True) 883 error_filename = f"{target_code_type}-code-errors.csv" 884 _write_code_errors(code_errors, error_path / error_filename) 885 886 # Check there is output from processing 887 if len(out.index) == 0: 888 _logger.error(f"No output after map processing") 889 raise Exception( 890 f"No output after map processing, check config {str(phen_path.resolve())}" 891 ) 892 893 # final processing 894 out = out.reset_index(drop=True) 895 out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) 896 out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"]) 897 898 out_count = len(out.index) 899 # added metadata 900 # Loop over each source_concept_type and perform the left join on all columns apart from source code columns 901 result_list = [] 902 source_column_names = list(concept_set["file"]["columns"].keys()) 903 for source_concept_type in source_column_names: 904 # Filter output based on the current source_concept_type 905 out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type] 906 filtered_count = len(out_filtered_df.index) 907 908 # Remove the source type columns except the current type will leave the metadata and the join 909 remove_types = [ 910 type for type in source_column_names if type != source_concept_type 911 ] 912 metadata_df = df.drop(columns=remove_types) 913 metadata_df = metadata_df.rename( 914 columns={source_concept_type: "SOURCE_CONCEPT"} 915 ) 916 metadata_df_count = len(metadata_df.index) 917 918 # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata 919 result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT") 920 result_count = len(result.index) 921 922 _logger.debug( 923 f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}" 924 ) 925 926 # Append the result to the result_list 927 result_list.append(result) 928 929 # Concatenate all the results into a single DataFrame 930 final_out = pd.concat(result_list, ignore_index=True) 931 final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) 932 _logger.debug( 933 f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}" 934 ) 935 936 # Save output to map directory 937 output_filename = target_code_type + ".csv" 938 map_path = phen_path / MAP_DIR / output_filename 939 final_out.to_csv(map_path, index=False) 940 _logger.info(f"Saved mapped concepts to {str(map_path.resolve())}") 941 942 # save concept sets as separate files 943 concept_set_path = phen_path / CSV_PATH / target_code_type 944 945 # empty the concept-set directory except for hiddle files, e.g. .git 946 if concept_set_path.exists(): 947 for item in concept_set_path.iterdir(): 948 if not item.name.startswith("."): 949 item.unlink() 950 else: 951 concept_set_path.mkdir(parents=True, exist_ok=True) 952 953 # write each concept as a separate file 954 for name, concept in final_out.groupby("CONCEPT_SET"): 955 concept = concept.sort_values(by="CONCEPT") # sort rows 956 concept = concept.dropna(how="all", axis=1) # remove empty cols 957 concept = concept.reindex( 958 sorted(concept.columns), axis=1 959 ) # sort cols alphabetically 960 filename = f"{name}.csv" 961 concept_path = concept_set_path / filename 962 concept.to_csv(concept_path, index=False) 963 964 write_vocab_version(phen_path) 965 966 _logger.info(f"Phenotype processed target code type {target_code_type}") 967 968 969def _generate_version_tag( 970 repo: git.Repo, increment: str = DEFAULT_VERSION_INC, use_v_prefix: bool = False 971) -> str: 972 # Get all valid semantic version tags 973 versions = [] 974 for tag in repo.tags: 975 if tag.name.startswith("v"): 976 tag_name = tag.name[1:] # Remove the first character 977 else: 978 tag_name = tag.name 979 if semver.Version.is_valid(tag_name): 980 versions.append(semver.Version.parse(tag_name)) 981 982 _logger.debug(f"Versions: {versions}") 983 # Determine the next version 984 if not versions: 985 new_version = semver.Version(0, 0, 1) 986 else: 987 latest_version = max(versions) 988 if increment == "major": 989 new_version = latest_version.bump_major() 990 elif increment == "minor": 991 new_version = latest_version.bump_minor() 992 else: 993 new_version = latest_version.bump_patch() 994 995 # Create the new tag 996 new_version_str = f"v{new_version}" if use_v_prefix else str(new_version) 997 998 return new_version_str 999 1000 1001def publish( 1002 phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC 1003): 1004 """Publishes updates to the phenotype by commiting all changes to the repo directory""" 1005 1006 # Validate config 1007 validate(phen_dir) 1008 phen_path = Path(phen_dir) 1009 1010 # load git repo and set the branch 1011 repo = git.Repo(phen_path) 1012 if DEFAULT_GIT_BRANCH in repo.branches: 1013 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 1014 main_branch.checkout() 1015 else: 1016 raise AttributeError( 1017 f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}" 1018 ) 1019 1020 # check if any changes to publish 1021 if not repo.is_dirty() and not repo.untracked_files: 1022 if remote_url is not None and "origin" not in repo.remotes: 1023 _logger.info(f"First publish to remote url {remote_url}") 1024 else: 1025 _logger.info("Nothing to publish, no changes to the repo") 1026 return 1027 1028 # get next version 1029 new_version_str = _generate_version_tag(repo, increment) 1030 _logger.info(f"New version: {new_version_str}") 1031 1032 # Write version in configuration file 1033 config_path = phen_path / CONFIG_FILE 1034 with config_path.open("r") as file: 1035 config = yaml.safe_load(file) 1036 1037 config["phenotype"]["version"] = new_version_str 1038 with open(config_path, "w") as file: 1039 yaml.dump( 1040 config, 1041 file, 1042 Dumper=util.QuotedDumper, 1043 default_flow_style=False, 1044 sort_keys=False, 1045 default_style='"', 1046 ) 1047 1048 # Add and commit changes to repo including version updates 1049 commit_message = f"Committing updates to phenotype {phen_path}" 1050 repo.git.add("--all") 1051 repo.index.commit(commit_message) 1052 1053 # Add tag to the repo 1054 repo.create_tag(new_version_str) 1055 1056 # push to origin if a remote repo 1057 if remote_url is not None and "origin" not in repo.remotes: 1058 git_url = _construct_git_url(remote_url) 1059 repo.create_remote("origin", git_url) 1060 1061 try: 1062 if "origin" in repo.remotes: 1063 _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}") 1064 origin = repo.remotes.origin 1065 _logger.info(f"Pushing main branch to remote repo") 1066 repo.git.push("--set-upstream", "origin", "main") 1067 _logger.info(f"Pushing version tags to remote git repo") 1068 origin.push(tags=True) 1069 _logger.debug("Changes pushed to 'origin'") 1070 else: 1071 _logger.debug("Remote 'origin' is not set") 1072 except Exception as e: 1073 tag_ref = repo.tags[new_version_str] 1074 repo.delete_tag(tag_ref) 1075 repo.git.reset("--soft", "HEAD~1") 1076 raise e 1077 1078 _logger.info(f"Phenotype published successfully") 1079 1080 1081def export(phen_dir: str, version: str): 1082 """Exports a phen repo at a specific tagged version into a target directory""" 1083 _logger.info(f"Exporting phenotype {phen_dir} at version {version}") 1084 1085 # validate configuration 1086 validate(phen_dir) 1087 phen_path = Path(phen_dir) 1088 1089 # load configuration 1090 config_path = phen_path / CONFIG_FILE 1091 with config_path.open("r") as file: 1092 config = yaml.safe_load(file) 1093 1094 map_path = phen_path / MAP_DIR 1095 if not map_path.exists(): 1096 _logger.warning(f"Map path does not exist '{map_path}'") 1097 1098 export_path = phen_path / OMOP_PATH 1099 # check export directory exists and if not create it 1100 if not export_path.exists(): 1101 export_path.mkdir(parents=True) 1102 _logger.debug(f"OMOP export directory '{export_path}' created.") 1103 1104 # omop export db 1105 export_db_path = omop.export( 1106 map_path, 1107 export_path, 1108 config["phenotype"]["version"], 1109 config["phenotype"]["omop"], 1110 ) 1111 1112 # write to tables 1113 # export as csv 1114 _logger.info(f"Phenotype exported successfully") 1115 1116 1117def copy(phen_dir: str, target_dir: str, version: str): 1118 """Copys a phen repo at a specific tagged version into a target directory""" 1119 1120 # Validate 1121 validate(phen_dir) 1122 phen_path = Path(phen_dir) 1123 1124 # Check target directory exists 1125 target_path = Path(target_dir) 1126 if not target_path.exists(): 1127 raise FileNotFoundError(f"The target directory {target_path} does not exist") 1128 1129 # Set copy directory 1130 copy_path = target_path / version 1131 _logger.info(f"Copying repo {phen_path} to {copy_path}") 1132 1133 if ( 1134 copy_path.exists() and copy_path.is_dir() 1135 ): # Check if it exists and is a directory 1136 copy = _check_delete_dir( 1137 copy_path, 1138 f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ", 1139 ) 1140 else: 1141 copy = True 1142 1143 if not copy: 1144 _logger.info(f"Not copying the version {version}") 1145 return 1146 1147 _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...") 1148 repo = git.Repo.clone_from(phen_path, copy_path) 1149 1150 # Check out the latest commit or specified version 1151 if version: 1152 # Checkout a specific version (e.g., branch, tag, or commit hash) 1153 _logger.info(f"Checking out version {version}...") 1154 repo.git.checkout(version) 1155 else: 1156 # Checkout the latest commit (HEAD) 1157 _logger.info(f"Checking out the latest commit...") 1158 repo.git.checkout("HEAD") 1159 1160 _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}") 1161 1162 _logger.info(f"Phenotype copied successfully") 1163 1164 1165# Convert concept_sets list into dictionaries 1166def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]: 1167 """Extracts concepts as {name: file_path} dictionary and a name set.""" 1168 concepts_dict = { 1169 item["name"]: item["file"]["path"] 1170 for item in config_data["phenotype"]["concept_sets"] 1171 } 1172 name_set = set(concepts_dict.keys()) 1173 return concepts_dict, name_set 1174 1175 1176def _extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]: 1177 """ 1178 Extracts clean keys from a DeepDiff dictionary. 1179 1180 :param diff: DeepDiff result dictionary 1181 :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed") 1182 :return: A set of clean key names 1183 """ 1184 return {key.split("root['")[1].split("']")[0] for key in diff.get(key_type, [])} 1185 1186 1187def diff_config(old_config: dict, new_config: dict) -> str: 1188 report = f"\n# Changes to phenotype configuration\n" 1189 report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n" 1190 1191 old_concepts, old_names = extract_concepts(old_config) 1192 new_concepts, new_names = extract_concepts(new_config) 1193 1194 # Check added and removed names 1195 added_names = new_names - old_names # Names that appear in new but not in old 1196 removed_names = old_names - new_names # Names that were in old but not in new 1197 1198 # find file path changes for unchanged names 1199 unchanged_names = old_names & new_names # Names that exist in both 1200 file_diff = DeepDiff( 1201 {name: old_concepts[name] for name in unchanged_names}, 1202 {name: new_concepts[name] for name in unchanged_names}, 1203 ) 1204 1205 # Find renamed concepts (same file, different name) 1206 renamed_concepts = [] 1207 for removed in removed_names: 1208 old_path = old_concepts[removed] 1209 for added in added_names: 1210 new_path = new_concepts[added] 1211 if old_path == new_path: 1212 renamed_concepts.append((removed, added)) 1213 1214 # Remove renamed concepts from added and removed sets 1215 for old_name, new_name in renamed_concepts: 1216 added_names.discard(new_name) 1217 removed_names.discard(old_name) 1218 1219 # generate config report 1220 if added_names: 1221 report += "## Added Concepts\n" 1222 for name in added_names: 1223 report += f"- `{name}` (File: `{new_concepts[name]}`)\n" 1224 report += "\n" 1225 1226 if removed_names: 1227 report += "## Removed Concepts\n" 1228 for name in removed_names: 1229 report += f"- `{name}` (File: `{old_concepts[name]}`)\n" 1230 report += "\n" 1231 1232 if renamed_concepts: 1233 report += "## Renamed Concepts\n" 1234 for old_name, new_name in renamed_concepts: 1235 report += ( 1236 f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n" 1237 ) 1238 report += "\n" 1239 1240 if "values_changed" in file_diff: 1241 report += "## Updated File Paths\n" 1242 for name, change in file_diff["values_changed"].items(): 1243 old_file = change["old_value"] 1244 new_file = change["new_value"] 1245 clean_name = name.split("root['")[1].split("']")[0] 1246 report += ( 1247 f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n" 1248 ) 1249 report += "\n" 1250 1251 if not ( 1252 added_names 1253 or removed_names 1254 or renamed_concepts 1255 or file_diff.get("values_changed") 1256 ): 1257 report += "No changes in concept sets.\n" 1258 1259 return report 1260 1261 1262def diff_map_files(old_map_path: Path, new_map_path: Path) -> str: 1263 old_output_files = [ 1264 file.name 1265 for file in old_map_path.iterdir() 1266 if file.is_file() and not file.name.startswith(".") 1267 ] 1268 new_output_files = [ 1269 file.name 1270 for file in new_map_path.iterdir() 1271 if file.is_file() and not file.name.startswith(".") 1272 ] 1273 1274 # Convert the lists to sets for easy comparison 1275 old_output_set = set(old_output_files) 1276 new_output_set = set(new_output_files) 1277 1278 # Outputs that are in old_output_set but not in new_output_set (removed files) 1279 removed_outputs = old_output_set - new_output_set 1280 # Outputs that are in new_output_set but not in old_output_set (added files) 1281 added_outputs = new_output_set - old_output_set 1282 # Outputs that are the intersection of old_output_set and new_output_set 1283 common_outputs = old_output_set & new_output_set 1284 1285 report = f"\n# Changes to available translations\n" 1286 report += f"This compares the coding translations files available.\n\n" 1287 report += f"- Removed outputs: {sorted(list(removed_outputs))}\n" 1288 report += f"- Added outputs: {sorted(list(added_outputs))}\n" 1289 report += f"- Common outputs: {sorted(list(common_outputs))}\n\n" 1290 1291 # Step N: Compare common outputs between versions 1292 report += f"# Changes to concepts in translation files\n\n" 1293 report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n" 1294 for file in common_outputs: 1295 old_output = old_map_path / file 1296 new_output = new_map_path / file 1297 1298 _logger.debug(f"Old ouptput: {str(old_output.resolve())}") 1299 _logger.debug(f"New ouptput: {str(new_output.resolve())}") 1300 1301 df1 = pd.read_csv(old_output) 1302 df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1303 df2 = pd.read_csv(new_output) 1304 df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1305 1306 # Check for added and removed concepts 1307 report += f"- File {file}\n" 1308 sorted_list = sorted(list(set(df1.index) - set(df2.index))) 1309 report += f"- Removed concepts {sorted_list}\n" 1310 sorted_list = sorted(list(set(df2.index) - set(df1.index))) 1311 report += f"- Added concepts {sorted_list}\n" 1312 1313 # Check for changed concepts 1314 diff = df2 - df1 # diff in counts 1315 diff = diff[ 1316 (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna() 1317 ] # get non-zero counts 1318 s = "\n" 1319 if len(diff.index) > 0: 1320 for concept, row in diff.iterrows(): 1321 s += "\t - {} {}\n".format(concept, row["CONCEPT"]) 1322 report += f"- Changed concepts {s}\n\n" 1323 else: 1324 report += f"- Changed concepts []\n\n" 1325 1326 return report 1327 1328 1329def diff_phen( 1330 new_phen_path: Path, 1331 new_version: str, 1332 old_phen_path: Path, 1333 old_version: str, 1334 report_path: Path, 1335): 1336 """Compare the differences between two versions of a phenotype""" 1337 1338 # validate phenotypes 1339 _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}") 1340 validate(str(old_phen_path.resolve())) 1341 _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}") 1342 validate(str(new_phen_path.resolve())) 1343 1344 # get old and new config 1345 old_config_path = old_phen_path / CONFIG_FILE 1346 with old_config_path.open("r") as file: 1347 old_config = yaml.safe_load(file) 1348 new_config_path = new_phen_path / CONFIG_FILE 1349 with new_config_path.open("r") as file: 1350 new_config = yaml.safe_load(file) 1351 1352 # write report heading 1353 report = f"# Phenotype Comparison Report\n" 1354 report += f"## Original phenotype\n" 1355 report += f" - {old_config['phenotype']['omop']['vocabulary_id']}\n" 1356 report += f" - {old_version}\n" 1357 report += f" - {str(old_phen_path.resolve())}\n" 1358 report += f"## Changed phenotype:\n" 1359 report += f" - {new_config['phenotype']['omop']['vocabulary_id']}\n" 1360 report += f" - {new_version}\n" 1361 report += f" - {str(new_phen_path.resolve())}\n" 1362 1363 # Step 1: check differences configuration files 1364 # Convert list of dicts into a dict: {name: file} 1365 report += diff_config(old_config, new_config) 1366 1367 # Step 2: check differences between map files 1368 # List files from output directories 1369 old_map_path = old_phen_path / MAP_DIR 1370 new_map_path = new_phen_path / MAP_DIR 1371 report += diff_map_files(old_map_path, new_map_path) 1372 1373 # initialise report file 1374 _logger.debug(f"Writing to report file {str(report_path.resolve())}") 1375 report_file = open(report_path, "w") 1376 report_file.write(report) 1377 report_file.close() 1378 1379 _logger.info(f"Phenotypes diff'd successfully") 1380 1381 1382def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str): 1383 # make tmp directory .acmc 1384 timestamp = time.strftime("%Y%m%d_%H%M%S") 1385 temp_dir = Path(f".acmc/diff_{timestamp}") 1386 1387 changed_phen_path = Path(phen_dir) 1388 if not changed_phen_path.exists(): 1389 raise ValueError( 1390 f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}" 1391 ) 1392 1393 old_phen_path = Path(old_phen_dir) 1394 if not old_phen_path.exists(): 1395 raise ValueError( 1396 f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}" 1397 ) 1398 1399 try: 1400 # Create the directory 1401 temp_dir.mkdir(parents=True, exist_ok=True) 1402 _logger.debug(f"Temporary directory created: {temp_dir}") 1403 1404 # Create temporary directories 1405 changed_path = temp_dir / "changed" 1406 changed_path.mkdir(parents=True, exist_ok=True) 1407 old_path = temp_dir / "old" 1408 old_path.mkdir(parents=True, exist_ok=True) 1409 1410 # checkout changed 1411 if version == "latest": 1412 _logger.debug( 1413 f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..." 1414 ) 1415 shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True) 1416 else: 1417 _logger.debug( 1418 f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..." 1419 ) 1420 changed_repo = git.Repo.clone_from(changed_phen_path, changed_path) 1421 changed_repo.git.checkout(version) 1422 1423 # checkout old 1424 if old_version == "latest": 1425 _logger.debug( 1426 f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1427 ) 1428 shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True) 1429 else: 1430 _logger.debug( 1431 f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1432 ) 1433 old_repo = git.Repo.clone_from(old_phen_dir, old_path) 1434 old_repo.git.checkout(old_version) 1435 1436 report_filename = f"{version}_{old_version}_diff.md" 1437 report_path = changed_phen_path / report_filename 1438 # diff old with new 1439 diff_phen(changed_path, version, old_path, old_version, report_path) 1440 1441 finally: 1442 # clean up tmp directory 1443 if temp_dir.exists(): 1444 shutil.rmtree(temp_dir) 1445 print(f"Temporary directory removed: {temp_dir}")
Default phenotype directory name
Default phenotype directory path
Default concepts directory name
Default map directory name
Default concept set directory name
Default CSV concept set directory path
Default OMOP concept set directory path
List of default phenotype directories
Default configuration filename
Default vocabulary version filename
List of semantic version increment types
Default semantic version increment type
Default phenotype repo branch name
Split column preprocessing action type
Codes column preprocessing action type
Divide column preprocessing action type
List of column preprocessing action types
List of supported source concept coding list file types
Phenotype config.yml schema definition
156class PhenValidationException(Exception): 157 """Custom exception class raised when validation errors in phenotype configuration file""" 158 159 def __init__(self, message, validation_errors=None): 160 super().__init__(message) 161 self.validation_errors = validation_errors
Custom exception class raised when validation errors in phenotype configuration file
230def init(phen_dir: str, remote_url: str): 231 """Initial phenotype directory as git repo with standard structure""" 232 _logger.info(f"Initialising Phenotype in directory: {phen_dir}") 233 phen_path = Path(phen_dir) 234 235 # check if directory already exists and ask user if they want to recreate it 236 if ( 237 phen_path.exists() and phen_path.is_dir() 238 ): # Check if it exists and is a directory 239 configure = _check_delete_dir( 240 phen_path, 241 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 242 ) 243 else: 244 configure = True 245 246 if not configure: 247 _logger.info(f"Exiting, phenotype not initiatised") 248 return 249 250 # Initialise repo from local or remote 251 repo: Repo 252 253 # if remote then clone the repo otherwise init a local repo 254 if remote_url != None: 255 # add PAT token to the URL 256 git_url = _construct_git_url(remote_url) 257 258 # clone the repo 259 git_cmd = git.cmd.Git() 260 git_cmd.clone(git_url, phen_path) 261 262 # open repo 263 repo = Repo(phen_path) 264 # check if there are any commits (new repo has no commits) 265 if ( 266 len(repo.branches) == 0 or repo.head.is_detached 267 ): # Handle detached HEAD (e.g., after init) 268 _logger.debug("The phen repository has no commits yet.") 269 commit_count = 0 270 else: 271 # Get the total number of commits in the default branch 272 commit_count = sum(1 for _ in repo.iter_commits()) 273 _logger.debug(f"Repo has previous commits: {commit_count}") 274 else: 275 # local repo, create the directories and init 276 phen_path.mkdir(parents=True, exist_ok=True) 277 _logger.debug(f"Phen directory '{phen_path}' has been created.") 278 repo = git.Repo.init(phen_path) 279 commit_count = 0 280 281 phen_path = phen_path.resolve() 282 # initialise empty repos 283 if commit_count == 0: 284 # create initial commit 285 initial_file_path = phen_path / "README.md" 286 with open(initial_file_path, "w") as file: 287 file.write( 288 "# Initial commit\nThis is the first commit in the phen repository.\n" 289 ) 290 repo.index.add([initial_file_path]) 291 repo.index.commit("Initial commit") 292 commit_count = 1 293 294 # Checkout the phens default branch, creating it if it does not exist 295 if DEFAULT_GIT_BRANCH in repo.branches: 296 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 297 main_branch.checkout() 298 else: 299 main_branch = repo.create_head(DEFAULT_GIT_BRANCH) 300 main_branch.checkout() 301 302 # if the phen path does not contain the config file then initialise the phen type 303 config_path = phen_path / CONFIG_FILE 304 if config_path.exists(): 305 _logger.debug(f"Phenotype configuration files already exist") 306 return 307 308 _logger.info("Creating phen directory structure and config files") 309 for d in DEFAULT_PHEN_DIR_LIST: 310 _create_empty_git_dir(phen_path / d) 311 312 # create empty phen config file 313 config = { 314 "phenotype": { 315 "version": "0.0.0", 316 "omop": { 317 "vocabulary_id": "", 318 "vocabulary_name": "", 319 "vocabulary_reference": "", 320 }, 321 "translate": [], 322 "concept_sets": [], 323 } 324 } 325 326 with open(phen_path / CONFIG_FILE, "w") as file: 327 yaml.dump( 328 config, 329 file, 330 Dumper=util.QuotedDumper, 331 default_flow_style=False, 332 sort_keys=False, 333 default_style='"', 334 ) 335 336 # add git ignore 337 ignore_content = """# Ignore SQLite database files 338*.db 339*.sqlite3 340 341# Ignore SQLite journal and metadata files 342*.db-journal 343*.sqlite3-journal 344 345# python 346.ipynb_checkpoints 347 """ 348 ignore_path = phen_path / ".gitignore" 349 with open(ignore_path, "w") as file: 350 file.write(ignore_content) 351 352 # add to git repo and commit 353 for d in DEFAULT_PHEN_DIR_LIST: 354 repo.git.add(phen_path / d) 355 repo.git.add(all=True) 356 repo.index.commit("initialised the phen git repo.") 357 358 _logger.info(f"Phenotype initialised successfully")
Initial phenotype directory as git repo with standard structure
361def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str): 362 """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin" 363 364 Args: 365 phen_dir (str): local directory path where the upstream repo is to be cloned 366 upstream_url (str): url to the upstream repo 367 upstream_version (str): version in the upstream repo to clone 368 new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None. 369 370 Raises: 371 ValueError: if the specified version is not in the upstream repo 372 ValueError: if the upstream repo is not a valid phenotype repo 373 ValueError: if there's any other problems with Git 374 """ 375 _logger.info( 376 f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}" 377 ) 378 379 phen_path = Path(phen_dir) 380 # check if directory already exists and ask user if they want to recreate it 381 if ( 382 phen_path.exists() and phen_path.is_dir() 383 ): # Check if it exists and is a directory 384 configure = _check_delete_dir( 385 phen_path, 386 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 387 ) 388 else: 389 configure = True 390 391 if not configure: 392 _logger.info(f"Exiting, phenotype not initiatised") 393 return 394 395 try: 396 # Clone repo 397 git_url = _construct_git_url(upstream_url) 398 repo = git.Repo.clone_from(git_url, phen_path) 399 400 # Fetch all branches and tags 401 repo.remotes.origin.fetch() 402 403 # Check if the version exists 404 available_refs = [ref.name.split("/")[-1] for ref in repo.references] 405 if upstream_version not in available_refs: 406 raise ValueError( 407 f"Version '{upstream_version}' not found in the repository: {upstream_url}." 408 ) 409 410 # Checkout the specified version 411 repo.git.checkout(upstream_version) 412 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 413 main_branch.checkout() 414 415 # Check if 'config.yml' exists in the root directory 416 config_path = phen_path / "config.yml" 417 if not os.path.isfile(config_path): 418 raise ValueError( 419 f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory." 420 ) 421 422 # Validate the phenotype is compatible with the acmc tool 423 validate(str(phen_path.resolve())) 424 425 # Delete each tag locally 426 tags = repo.tags 427 for tag in tags: 428 repo.delete_tag(tag) 429 _logger.debug(f"Deleted tags from forked repo: {tag}") 430 431 # Add upstream remote 432 repo.create_remote("upstream", upstream_url) 433 remote = repo.remotes["origin"] 434 repo.delete_remote(remote) # Remove existing origin 435 436 # Optionally set a new origin remote 437 if new_origin_url: 438 git_url = _construct_git_url(new_origin_url) 439 repo.create_remote("origin", git_url) 440 repo.git.push("--set-upstream", "origin", "main") 441 442 _logger.info(f"Repository forked successfully at {phen_path}") 443 _logger.info(f"Upstream set to {upstream_url}") 444 if new_origin_url: 445 _logger.info(f"Origin set to {new_origin_url}") 446 447 except Exception as e: 448 if phen_path.exists(): 449 shutil.rmtree(phen_path) 450 raise ValueError(f"Error occurred during repository fork: {str(e)}")
Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
Arguments:
- phen_dir (str): local directory path where the upstream repo is to be cloned
- upstream_url (str): url to the upstream repo
- upstream_version (str): version in the upstream repo to clone
- new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
Raises:
- ValueError: if the specified version is not in the upstream repo
- ValueError: if the upstream repo is not a valid phenotype repo
- ValueError: if there's any other problems with Git
453def validate(phen_dir: str): 454 """Validates the phenotype directory is a git repo with standard structure""" 455 _logger.info(f"Validating phenotype: {phen_dir}") 456 phen_path = Path(phen_dir) 457 if not phen_path.is_dir(): 458 raise NotADirectoryError( 459 f"Error: '{str(phen_path.resolve())}' is not a directory" 460 ) 461 462 config_path = phen_path / CONFIG_FILE 463 if not config_path.is_file(): 464 raise FileNotFoundError( 465 f"Error: phen configuration file '{config_path}' does not exist." 466 ) 467 468 concepts_path = phen_path / CONCEPTS_DIR 469 if not concepts_path.is_dir(): 470 raise FileNotFoundError( 471 f"Error: source concepts directory {concepts_path} does not exist." 472 ) 473 474 # Calidate the directory is a git repo 475 try: 476 git.Repo(phen_path) 477 except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError): 478 raise Exception(f"Phen directory {phen_path} is not a git repo") 479 480 # Load configuration File 481 if config_path.suffix == ".yml": 482 try: 483 with config_path.open("r") as file: 484 phenotype = yaml.safe_load(file) 485 486 validator = Validator(CONFIG_SCHEMA) 487 if validator.validate(phenotype): 488 _logger.debug("YAML structure is valid.") 489 else: 490 _logger.error(f"YAML structure validation failed: {validator.errors}") 491 raise Exception(f"YAML structure validation failed: {validator.errors}") 492 except yaml.YAMLError as e: 493 _logger.error(f"YAML syntax error: {e}") 494 raise e 495 else: 496 raise Exception( 497 f"Unsupported configuration filetype: {str(config_path.resolve())}" 498 ) 499 500 # initiatise 501 validation_errors = [] 502 phenotype = phenotype["phenotype"] 503 code_types = parse.CodeTypeParser().code_types 504 505 # check the version number is of the format vn.n.n 506 match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"]) 507 if not match: 508 validation_errors.append( 509 f"Invalid version format in configuration file: {phenotype['version']}" 510 ) 511 512 # create a list of all the concept set names defined in the concept set configuration 513 concept_set_names = [] 514 for item in phenotype["concept_sets"]: 515 if item["name"] in concept_set_names: 516 validation_errors.append( 517 f"Duplicate concept set defined in concept sets {item['name'] }" 518 ) 519 else: 520 concept_set_names.append(item["name"]) 521 522 # check codes definition 523 for item in phenotype["concept_sets"]: 524 # check concepte code file exists 525 concept_code_file_path = concepts_path / item["file"]["path"] 526 if not concept_code_file_path.exists(): 527 validation_errors.append( 528 f"Coding file {str(concept_code_file_path.resolve())} does not exist" 529 ) 530 531 # check concepte code file is not empty 532 if concept_code_file_path.stat().st_size == 0: 533 validation_errors.append( 534 f"Coding file {str(concept_code_file_path.resolve())} is an empty file" 535 ) 536 537 # check code file type is supported 538 if concept_code_file_path.suffix not in CODE_FILE_TYPES: 539 raise ValueError( 540 f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types" 541 ) 542 543 # check columns specified are a supported medical coding type 544 for column in item["file"]["columns"]: 545 if column not in code_types: 546 validation_errors.append( 547 f"Column type {column} for file {concept_code_file_path} is not supported" 548 ) 549 550 # check the actions are supported 551 if "actions" in item["file"]: 552 for action in item["file"]["actions"]: 553 if action not in COL_ACTIONS: 554 validation_errors.append(f"Action {action} is not supported") 555 556 if len(validation_errors) > 0: 557 _logger.error(validation_errors) 558 raise PhenValidationException( 559 f"Configuration file {str(config_path.resolve())} failed validation", 560 validation_errors, 561 ) 562 563 _logger.info(f"Phenotype validated successfully")
Validates the phenotype directory is a git repo with standard structure
657def translate_codes( 658 source_df: pd.DataFrame, target_code_type: str, concept_name: str 659) -> pd.DataFrame: 660 """Translates each source code type the source coding list into a target type and returns all conversions as a concept set""" 661 662 # codes = pd.DataFrame([], dtype=str) 663 codes = pd.DataFrame( 664 columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string" 665 ) 666 # Convert codes to target type 667 _logger.info(f"Converting to target code type {target_code_type}") 668 669 for source_code_type in source_df.columns: 670 # if target code type is the same as thet source code type, no translation, just appending source as target 671 if source_code_type == target_code_type: 672 copy_df = pd.DataFrame( 673 { 674 "SOURCE_CONCEPT": source_df[source_code_type], 675 "SOURCE_CONCEPT_TYPE": source_code_type, 676 "CONCEPT": source_df[source_code_type], 677 } 678 ) 679 codes = pd.concat([codes, copy_df]) 680 _logger.debug( 681 f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating" 682 ) 683 else: 684 # get the translation filename using source to target code types 685 filename = f"{source_code_type}_to_{target_code_type}.parquet" 686 map_path = trud.PROCESSED_PATH / filename 687 688 # do the mapping if it exists 689 if map_path.exists(): 690 # get mapping 691 df_map = pd.read_parquet(map_path) 692 693 # do mapping 694 translated_df = pd.merge( 695 source_df[source_code_type], df_map, how="left" 696 ) 697 698 # normalise the output 699 translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"]) 700 translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type 701 702 # add to list of codes 703 codes = pd.concat([codes, translated_df]) 704 705 else: 706 _logger.warning( 707 f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" 708 ) 709 710 codes = codes.dropna() # delete NaNs 711 712 # added concept set type to output if any translations 713 if len(codes.index) > 0: 714 codes["CONCEPT_SET"] = concept_name 715 else: 716 _logger.debug(f"No codes converted with target code type {target_code_type}") 717 718 return codes
Translates each source code type the source coding list into a target type and returns all conversions as a concept set
739def write_vocab_version(phen_path: Path): 740 # write the vocab version files 741 742 if not trud.VERSION_PATH.exists(): 743 raise FileNotFoundError( 744 f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed" 745 ) 746 747 if not omop.VERSION_PATH.exists(): 748 raise FileNotFoundError( 749 f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed" 750 ) 751 752 with trud.VERSION_PATH.open("r") as file: 753 trud_version = yaml.safe_load(file) 754 755 with omop.VERSION_PATH.open("r") as file: 756 omop_version = yaml.safe_load(file) 757 758 # Create the combined YAML structure 759 version_data = { 760 "versions": { 761 "acmc": acmc.__version__, 762 "trud": trud_version, 763 "omop": omop_version, 764 } 765 } 766 767 with open(phen_path / VOCAB_VERSION_FILE, "w") as file: 768 yaml.dump( 769 version_data, 770 file, 771 Dumper=util.QuotedDumper, 772 default_flow_style=False, 773 sort_keys=False, 774 default_style='"', 775 )
778def map(phen_dir: str, target_code_type: str): 779 _logger.info(f"Processing phenotype: {phen_dir}") 780 781 # Validate configuration 782 validate(phen_dir) 783 784 # initialise paths 785 phen_path = Path(phen_dir) 786 config_path = phen_path / CONFIG_FILE 787 788 # load configuration 789 with config_path.open("r") as file: 790 config = yaml.safe_load(file) 791 phenotype = config["phenotype"] 792 793 if len(phenotype["map"]) == 0: 794 raise ValueError(f"No map codes defined in the phenotype configuration") 795 796 if target_code_type is not None and target_code_type not in phenotype["map"]: 797 raise ValueError( 798 f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}" 799 ) 800 801 if target_code_type is not None: 802 _map_target_code_type(phen_path, phenotype, target_code_type) 803 else: 804 for t in phenotype["map"]: 805 _map_target_code_type(phen_path, phenotype, t) 806 807 _logger.info(f"Phenotype processed successfully")
1002def publish( 1003 phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC 1004): 1005 """Publishes updates to the phenotype by commiting all changes to the repo directory""" 1006 1007 # Validate config 1008 validate(phen_dir) 1009 phen_path = Path(phen_dir) 1010 1011 # load git repo and set the branch 1012 repo = git.Repo(phen_path) 1013 if DEFAULT_GIT_BRANCH in repo.branches: 1014 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 1015 main_branch.checkout() 1016 else: 1017 raise AttributeError( 1018 f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}" 1019 ) 1020 1021 # check if any changes to publish 1022 if not repo.is_dirty() and not repo.untracked_files: 1023 if remote_url is not None and "origin" not in repo.remotes: 1024 _logger.info(f"First publish to remote url {remote_url}") 1025 else: 1026 _logger.info("Nothing to publish, no changes to the repo") 1027 return 1028 1029 # get next version 1030 new_version_str = _generate_version_tag(repo, increment) 1031 _logger.info(f"New version: {new_version_str}") 1032 1033 # Write version in configuration file 1034 config_path = phen_path / CONFIG_FILE 1035 with config_path.open("r") as file: 1036 config = yaml.safe_load(file) 1037 1038 config["phenotype"]["version"] = new_version_str 1039 with open(config_path, "w") as file: 1040 yaml.dump( 1041 config, 1042 file, 1043 Dumper=util.QuotedDumper, 1044 default_flow_style=False, 1045 sort_keys=False, 1046 default_style='"', 1047 ) 1048 1049 # Add and commit changes to repo including version updates 1050 commit_message = f"Committing updates to phenotype {phen_path}" 1051 repo.git.add("--all") 1052 repo.index.commit(commit_message) 1053 1054 # Add tag to the repo 1055 repo.create_tag(new_version_str) 1056 1057 # push to origin if a remote repo 1058 if remote_url is not None and "origin" not in repo.remotes: 1059 git_url = _construct_git_url(remote_url) 1060 repo.create_remote("origin", git_url) 1061 1062 try: 1063 if "origin" in repo.remotes: 1064 _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}") 1065 origin = repo.remotes.origin 1066 _logger.info(f"Pushing main branch to remote repo") 1067 repo.git.push("--set-upstream", "origin", "main") 1068 _logger.info(f"Pushing version tags to remote git repo") 1069 origin.push(tags=True) 1070 _logger.debug("Changes pushed to 'origin'") 1071 else: 1072 _logger.debug("Remote 'origin' is not set") 1073 except Exception as e: 1074 tag_ref = repo.tags[new_version_str] 1075 repo.delete_tag(tag_ref) 1076 repo.git.reset("--soft", "HEAD~1") 1077 raise e 1078 1079 _logger.info(f"Phenotype published successfully")
Publishes updates to the phenotype by commiting all changes to the repo directory
1082def export(phen_dir: str, version: str): 1083 """Exports a phen repo at a specific tagged version into a target directory""" 1084 _logger.info(f"Exporting phenotype {phen_dir} at version {version}") 1085 1086 # validate configuration 1087 validate(phen_dir) 1088 phen_path = Path(phen_dir) 1089 1090 # load configuration 1091 config_path = phen_path / CONFIG_FILE 1092 with config_path.open("r") as file: 1093 config = yaml.safe_load(file) 1094 1095 map_path = phen_path / MAP_DIR 1096 if not map_path.exists(): 1097 _logger.warning(f"Map path does not exist '{map_path}'") 1098 1099 export_path = phen_path / OMOP_PATH 1100 # check export directory exists and if not create it 1101 if not export_path.exists(): 1102 export_path.mkdir(parents=True) 1103 _logger.debug(f"OMOP export directory '{export_path}' created.") 1104 1105 # omop export db 1106 export_db_path = omop.export( 1107 map_path, 1108 export_path, 1109 config["phenotype"]["version"], 1110 config["phenotype"]["omop"], 1111 ) 1112 1113 # write to tables 1114 # export as csv 1115 _logger.info(f"Phenotype exported successfully")
Exports a phen repo at a specific tagged version into a target directory
1118def copy(phen_dir: str, target_dir: str, version: str): 1119 """Copys a phen repo at a specific tagged version into a target directory""" 1120 1121 # Validate 1122 validate(phen_dir) 1123 phen_path = Path(phen_dir) 1124 1125 # Check target directory exists 1126 target_path = Path(target_dir) 1127 if not target_path.exists(): 1128 raise FileNotFoundError(f"The target directory {target_path} does not exist") 1129 1130 # Set copy directory 1131 copy_path = target_path / version 1132 _logger.info(f"Copying repo {phen_path} to {copy_path}") 1133 1134 if ( 1135 copy_path.exists() and copy_path.is_dir() 1136 ): # Check if it exists and is a directory 1137 copy = _check_delete_dir( 1138 copy_path, 1139 f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ", 1140 ) 1141 else: 1142 copy = True 1143 1144 if not copy: 1145 _logger.info(f"Not copying the version {version}") 1146 return 1147 1148 _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...") 1149 repo = git.Repo.clone_from(phen_path, copy_path) 1150 1151 # Check out the latest commit or specified version 1152 if version: 1153 # Checkout a specific version (e.g., branch, tag, or commit hash) 1154 _logger.info(f"Checking out version {version}...") 1155 repo.git.checkout(version) 1156 else: 1157 # Checkout the latest commit (HEAD) 1158 _logger.info(f"Checking out the latest commit...") 1159 repo.git.checkout("HEAD") 1160 1161 _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}") 1162 1163 _logger.info(f"Phenotype copied successfully")
Copys a phen repo at a specific tagged version into a target directory
1167def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]: 1168 """Extracts concepts as {name: file_path} dictionary and a name set.""" 1169 concepts_dict = { 1170 item["name"]: item["file"]["path"] 1171 for item in config_data["phenotype"]["concept_sets"] 1172 } 1173 name_set = set(concepts_dict.keys()) 1174 return concepts_dict, name_set
Extracts concepts as {name: file_path} dictionary and a name set.
1188def diff_config(old_config: dict, new_config: dict) -> str: 1189 report = f"\n# Changes to phenotype configuration\n" 1190 report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n" 1191 1192 old_concepts, old_names = extract_concepts(old_config) 1193 new_concepts, new_names = extract_concepts(new_config) 1194 1195 # Check added and removed names 1196 added_names = new_names - old_names # Names that appear in new but not in old 1197 removed_names = old_names - new_names # Names that were in old but not in new 1198 1199 # find file path changes for unchanged names 1200 unchanged_names = old_names & new_names # Names that exist in both 1201 file_diff = DeepDiff( 1202 {name: old_concepts[name] for name in unchanged_names}, 1203 {name: new_concepts[name] for name in unchanged_names}, 1204 ) 1205 1206 # Find renamed concepts (same file, different name) 1207 renamed_concepts = [] 1208 for removed in removed_names: 1209 old_path = old_concepts[removed] 1210 for added in added_names: 1211 new_path = new_concepts[added] 1212 if old_path == new_path: 1213 renamed_concepts.append((removed, added)) 1214 1215 # Remove renamed concepts from added and removed sets 1216 for old_name, new_name in renamed_concepts: 1217 added_names.discard(new_name) 1218 removed_names.discard(old_name) 1219 1220 # generate config report 1221 if added_names: 1222 report += "## Added Concepts\n" 1223 for name in added_names: 1224 report += f"- `{name}` (File: `{new_concepts[name]}`)\n" 1225 report += "\n" 1226 1227 if removed_names: 1228 report += "## Removed Concepts\n" 1229 for name in removed_names: 1230 report += f"- `{name}` (File: `{old_concepts[name]}`)\n" 1231 report += "\n" 1232 1233 if renamed_concepts: 1234 report += "## Renamed Concepts\n" 1235 for old_name, new_name in renamed_concepts: 1236 report += ( 1237 f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n" 1238 ) 1239 report += "\n" 1240 1241 if "values_changed" in file_diff: 1242 report += "## Updated File Paths\n" 1243 for name, change in file_diff["values_changed"].items(): 1244 old_file = change["old_value"] 1245 new_file = change["new_value"] 1246 clean_name = name.split("root['")[1].split("']")[0] 1247 report += ( 1248 f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n" 1249 ) 1250 report += "\n" 1251 1252 if not ( 1253 added_names 1254 or removed_names 1255 or renamed_concepts 1256 or file_diff.get("values_changed") 1257 ): 1258 report += "No changes in concept sets.\n" 1259 1260 return report
1263def diff_map_files(old_map_path: Path, new_map_path: Path) -> str: 1264 old_output_files = [ 1265 file.name 1266 for file in old_map_path.iterdir() 1267 if file.is_file() and not file.name.startswith(".") 1268 ] 1269 new_output_files = [ 1270 file.name 1271 for file in new_map_path.iterdir() 1272 if file.is_file() and not file.name.startswith(".") 1273 ] 1274 1275 # Convert the lists to sets for easy comparison 1276 old_output_set = set(old_output_files) 1277 new_output_set = set(new_output_files) 1278 1279 # Outputs that are in old_output_set but not in new_output_set (removed files) 1280 removed_outputs = old_output_set - new_output_set 1281 # Outputs that are in new_output_set but not in old_output_set (added files) 1282 added_outputs = new_output_set - old_output_set 1283 # Outputs that are the intersection of old_output_set and new_output_set 1284 common_outputs = old_output_set & new_output_set 1285 1286 report = f"\n# Changes to available translations\n" 1287 report += f"This compares the coding translations files available.\n\n" 1288 report += f"- Removed outputs: {sorted(list(removed_outputs))}\n" 1289 report += f"- Added outputs: {sorted(list(added_outputs))}\n" 1290 report += f"- Common outputs: {sorted(list(common_outputs))}\n\n" 1291 1292 # Step N: Compare common outputs between versions 1293 report += f"# Changes to concepts in translation files\n\n" 1294 report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n" 1295 for file in common_outputs: 1296 old_output = old_map_path / file 1297 new_output = new_map_path / file 1298 1299 _logger.debug(f"Old ouptput: {str(old_output.resolve())}") 1300 _logger.debug(f"New ouptput: {str(new_output.resolve())}") 1301 1302 df1 = pd.read_csv(old_output) 1303 df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1304 df2 = pd.read_csv(new_output) 1305 df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1306 1307 # Check for added and removed concepts 1308 report += f"- File {file}\n" 1309 sorted_list = sorted(list(set(df1.index) - set(df2.index))) 1310 report += f"- Removed concepts {sorted_list}\n" 1311 sorted_list = sorted(list(set(df2.index) - set(df1.index))) 1312 report += f"- Added concepts {sorted_list}\n" 1313 1314 # Check for changed concepts 1315 diff = df2 - df1 # diff in counts 1316 diff = diff[ 1317 (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna() 1318 ] # get non-zero counts 1319 s = "\n" 1320 if len(diff.index) > 0: 1321 for concept, row in diff.iterrows(): 1322 s += "\t - {} {}\n".format(concept, row["CONCEPT"]) 1323 report += f"- Changed concepts {s}\n\n" 1324 else: 1325 report += f"- Changed concepts []\n\n" 1326 1327 return report
1330def diff_phen( 1331 new_phen_path: Path, 1332 new_version: str, 1333 old_phen_path: Path, 1334 old_version: str, 1335 report_path: Path, 1336): 1337 """Compare the differences between two versions of a phenotype""" 1338 1339 # validate phenotypes 1340 _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}") 1341 validate(str(old_phen_path.resolve())) 1342 _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}") 1343 validate(str(new_phen_path.resolve())) 1344 1345 # get old and new config 1346 old_config_path = old_phen_path / CONFIG_FILE 1347 with old_config_path.open("r") as file: 1348 old_config = yaml.safe_load(file) 1349 new_config_path = new_phen_path / CONFIG_FILE 1350 with new_config_path.open("r") as file: 1351 new_config = yaml.safe_load(file) 1352 1353 # write report heading 1354 report = f"# Phenotype Comparison Report\n" 1355 report += f"## Original phenotype\n" 1356 report += f" - {old_config['phenotype']['omop']['vocabulary_id']}\n" 1357 report += f" - {old_version}\n" 1358 report += f" - {str(old_phen_path.resolve())}\n" 1359 report += f"## Changed phenotype:\n" 1360 report += f" - {new_config['phenotype']['omop']['vocabulary_id']}\n" 1361 report += f" - {new_version}\n" 1362 report += f" - {str(new_phen_path.resolve())}\n" 1363 1364 # Step 1: check differences configuration files 1365 # Convert list of dicts into a dict: {name: file} 1366 report += diff_config(old_config, new_config) 1367 1368 # Step 2: check differences between map files 1369 # List files from output directories 1370 old_map_path = old_phen_path / MAP_DIR 1371 new_map_path = new_phen_path / MAP_DIR 1372 report += diff_map_files(old_map_path, new_map_path) 1373 1374 # initialise report file 1375 _logger.debug(f"Writing to report file {str(report_path.resolve())}") 1376 report_file = open(report_path, "w") 1377 report_file.write(report) 1378 report_file.close() 1379 1380 _logger.info(f"Phenotypes diff'd successfully")
Compare the differences between two versions of a phenotype
1383def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str): 1384 # make tmp directory .acmc 1385 timestamp = time.strftime("%Y%m%d_%H%M%S") 1386 temp_dir = Path(f".acmc/diff_{timestamp}") 1387 1388 changed_phen_path = Path(phen_dir) 1389 if not changed_phen_path.exists(): 1390 raise ValueError( 1391 f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}" 1392 ) 1393 1394 old_phen_path = Path(old_phen_dir) 1395 if not old_phen_path.exists(): 1396 raise ValueError( 1397 f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}" 1398 ) 1399 1400 try: 1401 # Create the directory 1402 temp_dir.mkdir(parents=True, exist_ok=True) 1403 _logger.debug(f"Temporary directory created: {temp_dir}") 1404 1405 # Create temporary directories 1406 changed_path = temp_dir / "changed" 1407 changed_path.mkdir(parents=True, exist_ok=True) 1408 old_path = temp_dir / "old" 1409 old_path.mkdir(parents=True, exist_ok=True) 1410 1411 # checkout changed 1412 if version == "latest": 1413 _logger.debug( 1414 f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..." 1415 ) 1416 shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True) 1417 else: 1418 _logger.debug( 1419 f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..." 1420 ) 1421 changed_repo = git.Repo.clone_from(changed_phen_path, changed_path) 1422 changed_repo.git.checkout(version) 1423 1424 # checkout old 1425 if old_version == "latest": 1426 _logger.debug( 1427 f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1428 ) 1429 shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True) 1430 else: 1431 _logger.debug( 1432 f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1433 ) 1434 old_repo = git.Repo.clone_from(old_phen_dir, old_path) 1435 old_repo.git.checkout(old_version) 1436 1437 report_filename = f"{version}_{old_version}_diff.md" 1438 report_path = changed_phen_path / report_filename 1439 # diff old with new 1440 diff_phen(changed_path, version, old_path, old_version, report_path) 1441 1442 finally: 1443 # clean up tmp directory 1444 if temp_dir.exists(): 1445 shutil.rmtree(temp_dir) 1446 print(f"Temporary directory removed: {temp_dir}")