acmc.phen
phenotype.py module
This module provides functionality for managing phenotypes.
1""" 2phenotype.py module 3 4This module provides functionality for managing phenotypes. 5""" 6 7import argparse 8import pandas as pd 9import numpy as np 10import json 11import os 12import sqlite3 13import sys 14import shutil 15import time 16import git 17import re 18import logging 19import requests 20import yaml 21import semver 22from git import Repo 23from cerberus import Validator # type: ignore 24from deepdiff import DeepDiff 25from pathlib import Path 26from urllib.parse import urlparse, urlunparse 27from typing import Tuple, Set, Any 28import acmc 29from acmc import trud, omop, parse, util, logging_config as lc 30 31# set up logging 32_logger = lc.setup_logger() 33 34pd.set_option("mode.chained_assignment", None) 35 36PHEN_DIR = "phen" 37"""Default phenotype directory name""" 38 39DEFAULT_PHEN_PATH = Path("./workspace") / PHEN_DIR 40"""Default phenotype directory path""" 41 42CONCEPTS_DIR = "concepts" 43"""Default concepts directory name""" 44 45MAP_DIR = "map" 46"""Default map directory name""" 47 48CONCEPT_SET_DIR = "concept-sets" 49"""Default concept set directory name""" 50 51CSV_PATH = Path(CONCEPT_SET_DIR) / "csv" 52"""Default CSV concept set directory path""" 53 54OMOP_PATH = Path(CONCEPT_SET_DIR) / "omop" 55"""Default OMOP concept set directory path""" 56 57DEFAULT_PHEN_DIR_LIST = [CONCEPTS_DIR, MAP_DIR, CONCEPT_SET_DIR] 58"""List of default phenotype directories""" 59 60CONFIG_FILE = "config.yml" 61"""Default configuration filename""" 62 63VOCAB_VERSION_FILE = "vocab_version.yml" 64"""Default vocabulary version filename""" 65 66SEMANTIC_VERSION_TYPES = ["major", "minor", "patch"] 67"""List of semantic version increment types""" 68 69DEFAULT_VERSION_INC = "patch" 70"""Default semantic version increment type""" 71 72DEFAULT_GIT_BRANCH = "main" 73"""Default phenotype repo branch name""" 74 75SPLIT_COL_ACTION = "split_col" 76"""Split column preprocessing action type""" 77 78CODES_COL_ACTION = "codes_col" 79"""Codes column preprocessing action type""" 80 81DIVIDE_COL_ACTION = "divide_col" 82"""Divide column preprocessing action type""" 83 84COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION] 85"""List of column preprocessing action types""" 86 87CODE_FILE_TYPES = [".xlsx", ".xls", ".csv"] 88"""List of supported source concept coding list file types""" 89 90# config.yaml schema 91CONFIG_SCHEMA = { 92 "phenotype": { 93 "type": "dict", 94 "required": True, 95 "schema": { 96 "version": { 97 "type": "string", 98 "required": True, 99 "regex": r"^\d+\.\d+\.\d+$", # Enforces 'vN.N.N' format 100 }, 101 "omop": { 102 "type": "dict", 103 "required": True, 104 "schema": { 105 "vocabulary_id": {"type": "string", "required": True}, 106 "vocabulary_name": {"type": "string", "required": True}, 107 "vocabulary_reference": { 108 "type": "string", 109 "required": True, 110 "regex": r"^https?://.*", # Ensures it's a URL 111 }, 112 }, 113 }, 114 "map": { 115 "type": "list", 116 "schema": { 117 "type": "string", 118 "allowed": list( 119 parse.SUPPORTED_CODE_TYPES 120 ), # Ensure only predefined values are allowed 121 }, 122 }, 123 "concept_sets": { 124 "type": "list", 125 "required": True, 126 "schema": { 127 "type": "dict", 128 "schema": { 129 "name": {"type": "string", "required": True}, 130 "file": { 131 "type": "dict", 132 "required": False, 133 "schema": { 134 "path": {"type": "string", "required": True}, 135 "columns": {"type": "dict", "required": True}, 136 "category": { 137 "type": "string" 138 }, # Optional but must be string if present 139 "actions": { 140 "type": "dict", 141 "schema": {"divide_col": {"type": "string"}}, 142 }, 143 }, 144 }, 145 }, 146 }, 147 }, 148 }, 149 } 150} 151"""Phenotype config.yml schema definition""" 152 153 154class PhenValidationException(Exception): 155 """Custom exception class raised when validation errors in phenotype configuration file""" 156 157 def __init__(self, message, validation_errors=None): 158 super().__init__(message) 159 self.validation_errors = validation_errors 160 161 162def _construct_git_url(remote_url: str): 163 """Constructs a git url for github or gitlab including a PAT token environment variable""" 164 # check the url 165 parsed_url = urlparse(remote_url) 166 167 # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd 168 # need to update this function if the URL scheme is different. 169 if "github.com" in parsed_url.netloc: 170 # get GitHub PAT from environment variable 171 auth = os.getenv("ACMC_GITHUB_PAT") 172 if not auth: 173 raise ValueError( 174 "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable." 175 ) 176 else: 177 # get GitLab PAT from environment variable 178 auth = os.getenv("ACMC_GITLAB_PAT") 179 if not auth: 180 raise ValueError( 181 "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable." 182 ) 183 auth = f"oauth2:{auth}" 184 185 # Construct the new URL with credentials 186 new_netloc = f"{auth}@{parsed_url.netloc}" 187 return urlunparse( 188 ( 189 parsed_url.scheme, 190 new_netloc, 191 parsed_url.path, 192 parsed_url.params, 193 parsed_url.query, 194 parsed_url.fragment, 195 ) 196 ) 197 198 199def _create_empty_git_dir(path: Path): 200 """Creates a directory with a .gitkeep file so that it's tracked in git""" 201 path.mkdir(exist_ok=True) 202 keep_path = path / ".gitkeep" 203 keep_path.touch(exist_ok=True) 204 205 206def _check_delete_dir(path: Path, msg: str) -> bool: 207 """Checks on the command line if a user wants to delete a directory 208 209 Args: 210 path (Path): path of the directory to be deleted 211 msg (str): message to be displayed to the user 212 213 Returns: 214 Boolean: True if deleted 215 """ 216 deleted = False 217 218 user_input = input(f"{msg}").strip().lower() 219 if user_input in ["yes", "y"]: 220 shutil.rmtree(path) 221 deleted = True 222 else: 223 _logger.info("Directory was not deleted.") 224 225 return deleted 226 227 228def init(phen_dir: str, remote_url: str): 229 """Initial phenotype directory as git repo with standard structure""" 230 _logger.info(f"Initialising Phenotype in directory: {phen_dir}") 231 phen_path = Path(phen_dir) 232 233 # check if directory already exists and ask user if they want to recreate it 234 if ( 235 phen_path.exists() and phen_path.is_dir() 236 ): # Check if it exists and is a directory 237 configure = _check_delete_dir( 238 phen_path, 239 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 240 ) 241 else: 242 configure = True 243 244 if not configure: 245 _logger.info(f"Exiting, phenotype not initiatised") 246 return 247 248 # Initialise repo from local or remote 249 repo: Repo 250 251 # if remote then clone the repo otherwise init a local repo 252 if remote_url != None: 253 # add PAT token to the URL 254 git_url = _construct_git_url(remote_url) 255 256 # clone the repo 257 git_cmd = git.cmd.Git() 258 git_cmd.clone(git_url, phen_path) 259 260 # open repo 261 repo = Repo(phen_path) 262 # check if there are any commits (new repo has no commits) 263 if ( 264 len(repo.branches) == 0 or repo.head.is_detached 265 ): # Handle detached HEAD (e.g., after init) 266 _logger.debug("The phen repository has no commits yet.") 267 commit_count = 0 268 else: 269 # Get the total number of commits in the default branch 270 commit_count = sum(1 for _ in repo.iter_commits()) 271 _logger.debug(f"Repo has previous commits: {commit_count}") 272 else: 273 # local repo, create the directories and init 274 phen_path.mkdir(parents=True, exist_ok=True) 275 _logger.debug(f"Phen directory '{phen_path}' has been created.") 276 repo = git.Repo.init(phen_path) 277 commit_count = 0 278 279 phen_path = phen_path.resolve() 280 # initialise empty repos 281 if commit_count == 0: 282 # create initial commit 283 initial_file_path = phen_path / "README.md" 284 with open(initial_file_path, "w") as file: 285 file.write( 286 "# Initial commit\nThis is the first commit in the phen repository.\n" 287 ) 288 repo.index.add([initial_file_path]) 289 repo.index.commit("Initial commit") 290 commit_count = 1 291 292 # Checkout the phens default branch, creating it if it does not exist 293 if DEFAULT_GIT_BRANCH in repo.branches: 294 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 295 main_branch.checkout() 296 else: 297 main_branch = repo.create_head(DEFAULT_GIT_BRANCH) 298 main_branch.checkout() 299 300 # if the phen path does not contain the config file then initialise the phen type 301 config_path = phen_path / CONFIG_FILE 302 if config_path.exists(): 303 _logger.debug(f"Phenotype configuration files already exist") 304 return 305 306 _logger.info("Creating phen directory structure and config files") 307 for d in DEFAULT_PHEN_DIR_LIST: 308 _create_empty_git_dir(phen_path / d) 309 310 # create empty phen config file 311 config = { 312 "phenotype": { 313 "version": "0.0.0", 314 "omop": { 315 "vocabulary_id": "", 316 "vocabulary_name": "", 317 "vocabulary_reference": "", 318 }, 319 "translate": [], 320 "concept_sets": [], 321 } 322 } 323 324 with open(phen_path / CONFIG_FILE, "w") as file: 325 yaml.dump( 326 config, 327 file, 328 Dumper=util.QuotedDumper, 329 default_flow_style=False, 330 sort_keys=False, 331 default_style='"', 332 ) 333 334 # add git ignore 335 ignore_content = """# Ignore SQLite database files 336*.db 337*.sqlite3 338 339# Ignore SQLite journal and metadata files 340*.db-journal 341*.sqlite3-journal 342 343# python 344.ipynb_checkpoints 345 """ 346 ignore_path = phen_path / ".gitignore" 347 with open(ignore_path, "w") as file: 348 file.write(ignore_content) 349 350 # add to git repo and commit 351 for d in DEFAULT_PHEN_DIR_LIST: 352 repo.git.add(phen_path / d) 353 repo.git.add(all=True) 354 repo.index.commit("initialised the phen git repo.") 355 356 _logger.info(f"Phenotype initialised successfully") 357 358 359def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str): 360 """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin" 361 362 Args: 363 phen_dir (str): local directory path where the upstream repo is to be cloned 364 upstream_url (str): url to the upstream repo 365 upstream_version (str): version in the upstream repo to clone 366 new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None. 367 368 Raises: 369 ValueError: if the specified version is not in the upstream repo 370 ValueError: if the upstream repo is not a valid phenotype repo 371 ValueError: if there's any other problems with Git 372 """ 373 _logger.info( 374 f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}" 375 ) 376 377 phen_path = Path(phen_dir) 378 # check if directory already exists and ask user if they want to recreate it 379 if ( 380 phen_path.exists() and phen_path.is_dir() 381 ): # Check if it exists and is a directory 382 configure = _check_delete_dir( 383 phen_path, 384 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 385 ) 386 else: 387 configure = True 388 389 if not configure: 390 _logger.info(f"Exiting, phenotype not initiatised") 391 return 392 393 try: 394 # Clone repo 395 git_url = _construct_git_url(upstream_url) 396 repo = git.Repo.clone_from(git_url, phen_path) 397 398 # Fetch all branches and tags 399 repo.remotes.origin.fetch() 400 401 # Check if the version exists 402 available_refs = [ref.name.split("/")[-1] for ref in repo.references] 403 if upstream_version not in available_refs: 404 raise ValueError( 405 f"Version '{upstream_version}' not found in the repository: {upstream_url}." 406 ) 407 408 # Checkout the specified version 409 repo.git.checkout(upstream_version) 410 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 411 main_branch.checkout() 412 413 # Check if 'config.yml' exists in the root directory 414 config_path = phen_path / "config.yml" 415 if not os.path.isfile(config_path): 416 raise ValueError( 417 f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory." 418 ) 419 420 # Validate the phenotype is compatible with the acmc tool 421 validate(str(phen_path.resolve())) 422 423 # Delete each tag locally 424 tags = repo.tags 425 for tag in tags: 426 repo.delete_tag(tag) 427 _logger.debug(f"Deleted tags from forked repo: {tag}") 428 429 # Add upstream remote 430 repo.create_remote("upstream", upstream_url) 431 remote = repo.remotes["origin"] 432 repo.delete_remote(remote) # Remove existing origin 433 434 # Optionally set a new origin remote 435 if new_origin_url: 436 git_url = _construct_git_url(new_origin_url) 437 repo.create_remote("origin", git_url) 438 repo.git.push("--set-upstream", "origin", "main") 439 440 _logger.info(f"Repository forked successfully at {phen_path}") 441 _logger.info(f"Upstream set to {upstream_url}") 442 if new_origin_url: 443 _logger.info(f"Origin set to {new_origin_url}") 444 445 except Exception as e: 446 if phen_path.exists(): 447 shutil.rmtree(phen_path) 448 raise ValueError(f"Error occurred during repository fork: {str(e)}") 449 450 451def validate(phen_dir: str): 452 """Validates the phenotype directory is a git repo with standard structure""" 453 _logger.info(f"Validating phenotype: {phen_dir}") 454 phen_path = Path(phen_dir) 455 if not phen_path.is_dir(): 456 raise NotADirectoryError( 457 f"Error: '{str(phen_path.resolve())}' is not a directory" 458 ) 459 460 config_path = phen_path / CONFIG_FILE 461 if not config_path.is_file(): 462 raise FileNotFoundError( 463 f"Error: phen configuration file '{config_path}' does not exist." 464 ) 465 466 concepts_path = phen_path / CONCEPTS_DIR 467 if not concepts_path.is_dir(): 468 raise FileNotFoundError( 469 f"Error: source concepts directory {concepts_path} does not exist." 470 ) 471 472 # Calidate the directory is a git repo 473 try: 474 git.Repo(phen_path) 475 except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError): 476 raise Exception(f"Phen directory {phen_path} is not a git repo") 477 478 # Load configuration File 479 if config_path.suffix == ".yml": 480 try: 481 with config_path.open("r") as file: 482 phenotype = yaml.safe_load(file) 483 484 validator = Validator(CONFIG_SCHEMA) 485 if validator.validate(phenotype): 486 _logger.debug("YAML structure is valid.") 487 else: 488 _logger.error(f"YAML structure validation failed: {validator.errors}") 489 raise Exception(f"YAML structure validation failed: {validator.errors}") 490 except yaml.YAMLError as e: 491 _logger.error(f"YAML syntax error: {e}") 492 raise e 493 else: 494 raise Exception( 495 f"Unsupported configuration filetype: {str(config_path.resolve())}" 496 ) 497 498 # initiatise 499 validation_errors = [] 500 phenotype = phenotype["phenotype"] 501 code_types = parse.CodeTypeParser().code_types 502 503 # check the version number is of the format vn.n.n 504 match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"]) 505 if not match: 506 validation_errors.append( 507 f"Invalid version format in configuration file: {phenotype['version']}" 508 ) 509 510 # create a list of all the concept set names defined in the concept set configuration 511 concept_set_names = [] 512 for item in phenotype["concept_sets"]: 513 if item["name"] in concept_set_names: 514 validation_errors.append( 515 f"Duplicate concept set defined in concept sets {item['name'] }" 516 ) 517 else: 518 concept_set_names.append(item["name"]) 519 520 # check codes definition 521 for item in phenotype["concept_sets"]: 522 # check concepte code file exists 523 concept_code_file_path = concepts_path / item["file"]["path"] 524 if not concept_code_file_path.exists(): 525 validation_errors.append( 526 f"Coding file {str(concept_code_file_path.resolve())} does not exist" 527 ) 528 529 # check concepte code file is not empty 530 if concept_code_file_path.stat().st_size == 0: 531 validation_errors.append( 532 f"Coding file {str(concept_code_file_path.resolve())} is an empty file" 533 ) 534 535 # check code file type is supported 536 if concept_code_file_path.suffix not in CODE_FILE_TYPES: 537 raise ValueError( 538 f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types" 539 ) 540 541 # check columns specified are a supported medical coding type 542 for column in item["file"]["columns"]: 543 if column not in code_types: 544 validation_errors.append( 545 f"Column type {column} for file {concept_code_file_path} is not supported" 546 ) 547 548 # check the actions are supported 549 if "actions" in item["file"]: 550 for action in item["file"]["actions"]: 551 if action not in COL_ACTIONS: 552 validation_errors.append(f"Action {action} is not supported") 553 554 if len(validation_errors) > 0: 555 _logger.error(validation_errors) 556 raise PhenValidationException( 557 f"Configuration file {str(config_path.resolve())} failed validation", 558 validation_errors, 559 ) 560 561 _logger.info(f"Phenotype validated successfully") 562 563 564def _read_table_file(path: Path, excel_sheet: str = ""): 565 """ 566 Load Code List File 567 """ 568 569 path = path.resolve() 570 if path.suffix == ".csv": 571 df = pd.read_csv(path, dtype=str) 572 elif path.suffix == ".xlsx" or path.suffix == ".xls": 573 if excel_sheet != "": 574 df = pd.read_excel(path, sheet_name=excel_sheet, dtype=str) 575 else: 576 df = pd.read_excel(path, dtype=str) 577 elif path.suffix == ".dta": 578 df = pd.read_stata(path) 579 else: 580 raise ValueError( 581 f"Unsupported filetype {path.suffix}, only support{CODE_FILE_TYPES} code file types" 582 ) 583 584 return df 585 586 587def _process_actions(df: pd.DataFrame, concept_set: dict) -> pd.DataFrame: 588 # Perform Structural Changes to file before preprocessing 589 _logger.debug("Processing file structural actions") 590 if ( 591 "actions" in concept_set["file"] 592 and "split_col" in concept_set["file"]["actions"] 593 and "codes_col" in concept_set["file"]["actions"] 594 ): 595 split_col = concept_set["file"]["actions"]["split_col"] 596 codes_col = concept_set["file"]["actions"]["codes_col"] 597 _logger.debug( 598 "Action: Splitting", 599 split_col, 600 "column into:", 601 df[split_col].unique(), 602 ) 603 codes = df[codes_col] 604 oh = pd.get_dummies(df[split_col], dtype=bool) # one hot encode 605 oh = oh.where((oh != True), codes, axis=0) # fill in 1s with codes 606 oh[oh == False] = np.nan # replace 0s with None 607 df = pd.concat([df, oh], axis=1) # merge in new columns 608 609 return df 610 611 612def _preprocess_source_concepts( 613 df: pd.DataFrame, concept_set: dict, code_file_path: Path 614) -> Tuple[pd.DataFrame, list]: 615 """Perform QA Checks on columns individually and append to df""" 616 out = pd.DataFrame([]) # create output df to append to 617 code_errors = [] # list of errors from processing 618 619 # remove unnamed columns due to extra commas, missing headers, or incorrect parsing 620 df = df.drop(columns=[col for col in df.columns if "Unnamed" in col]) 621 622 # Preprocess codes 623 code_types = parse.CodeTypeParser().code_types 624 for code_type in concept_set["file"]["columns"]: 625 parser = code_types[code_type] 626 _logger.info(f"Processing {code_type} codes for {code_file_path}") 627 628 # get codes by column name 629 source_col_name = concept_set["file"]["columns"][code_type] 630 codes = df[source_col_name].dropna() 631 codes = codes.astype(str) # convert to string 632 codes = codes.str.strip() # remove excess spaces 633 634 # process codes, validating them using parser and returning the errors 635 codes, errors = parser.process(codes, code_file_path) 636 if len(errors) > 0: 637 code_errors.extend(errors) 638 _logger.warning(f"Codes validation failed with {len(errors)} errors") 639 640 # add processed codes to df 641 new_col_name = f"{source_col_name}_SOURCE" 642 df = df.rename(columns={source_col_name: new_col_name}) 643 process_codes = pd.DataFrame({code_type: codes}).join(df) 644 out = pd.concat( 645 [out, process_codes], 646 ignore_index=True, 647 ) 648 649 _logger.debug(out.head()) 650 651 return out, code_errors 652 653 654# Translate Df with multiple codes into single code type Series 655def translate_codes( 656 source_df: pd.DataFrame, target_code_type: str, concept_name: str 657) -> pd.DataFrame: 658 """Translates each source code type the source coding list into a target type and returns all conversions as a concept set""" 659 660 # codes = pd.DataFrame([], dtype=str) 661 codes = pd.DataFrame( 662 columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string" 663 ) 664 # Convert codes to target type 665 _logger.info(f"Converting to target code type {target_code_type}") 666 667 for source_code_type in source_df.columns: 668 # if target code type is the same as thet source code type, no translation, just appending source as target 669 if source_code_type == target_code_type: 670 copy_df = pd.DataFrame( 671 { 672 "SOURCE_CONCEPT": source_df[source_code_type], 673 "SOURCE_CONCEPT_TYPE": source_code_type, 674 "CONCEPT": source_df[source_code_type], 675 } 676 ) 677 codes = pd.concat([codes, copy_df]) 678 _logger.debug( 679 f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating" 680 ) 681 else: 682 # get the translation filename using source to target code types 683 filename = f"{source_code_type}_to_{target_code_type}.parquet" 684 map_path = trud.PROCESSED_PATH / filename 685 686 # do the mapping if it exists 687 if map_path.exists(): 688 # get mapping 689 df_map = pd.read_parquet(map_path) 690 691 # do mapping 692 translated_df = pd.merge( 693 source_df[source_code_type], df_map, how="left" 694 ) 695 696 # normalise the output 697 translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"]) 698 translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type 699 700 # add to list of codes 701 codes = pd.concat([codes, translated_df]) 702 703 else: 704 _logger.warning( 705 f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" 706 ) 707 708 codes = codes.dropna() # delete NaNs 709 710 # added concept set type to output if any translations 711 if len(codes.index) > 0: 712 codes["CONCEPT_SET"] = concept_name 713 else: 714 _logger.debug(f"No codes converted with target code type {target_code_type}") 715 716 return codes 717 718 719def _write_code_errors(code_errors: list, code_errors_path: Path): 720 err_df = pd.DataFrame( 721 [ 722 { 723 "CONCEPT": ", ".join(err.codes[~err.mask].tolist()), 724 "VOCABULARY": err.code_type, 725 "SOURCE": err.codes_file, 726 "CAUSE": err.message, 727 } 728 for err in code_errors 729 ] 730 ) 731 732 err_df = err_df.drop_duplicates() # Remove Duplicates from Error file 733 err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"]) 734 err_df.to_csv(code_errors_path, index=False, mode="w") 735 736 737def write_vocab_version(phen_path: Path): 738 # write the vocab version files 739 740 if not trud.VERSION_PATH.exists(): 741 raise FileNotFoundError( 742 f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed" 743 ) 744 745 if not omop.VERSION_PATH.exists(): 746 raise FileNotFoundError( 747 f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed" 748 ) 749 750 with trud.VERSION_PATH.open("r") as file: 751 trud_version = yaml.safe_load(file) 752 753 with omop.VERSION_PATH.open("r") as file: 754 omop_version = yaml.safe_load(file) 755 756 # Create the combined YAML structure 757 version_data = { 758 "versions": { 759 "acmc": acmc.__version__, 760 "trud": trud_version, 761 "omop": omop_version, 762 } 763 } 764 765 with open(phen_path / VOCAB_VERSION_FILE, "w") as file: 766 yaml.dump( 767 version_data, 768 file, 769 Dumper=util.QuotedDumper, 770 default_flow_style=False, 771 sort_keys=False, 772 default_style='"', 773 ) 774 775 776def map(phen_dir: str, target_code_type: str): 777 _logger.info(f"Processing phenotype: {phen_dir}") 778 779 # Validate configuration 780 validate(phen_dir) 781 782 # initialise paths 783 phen_path = Path(phen_dir) 784 config_path = phen_path / CONFIG_FILE 785 786 # load configuration 787 with config_path.open("r") as file: 788 config = yaml.safe_load(file) 789 phenotype = config["phenotype"] 790 791 if len(phenotype["map"]) == 0: 792 raise ValueError(f"No map codes defined in the phenotype configuration") 793 794 if target_code_type is not None and target_code_type not in phenotype["map"]: 795 raise ValueError( 796 f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}" 797 ) 798 799 if target_code_type is not None: 800 _map_target_code_type(phen_path, phenotype, target_code_type) 801 else: 802 for t in phenotype["map"]: 803 _map_target_code_type(phen_path, phenotype, t) 804 805 _logger.info(f"Phenotype processed successfully") 806 807 808def _map_target_code_type(phen_path: Path, phenotype: dict, target_code_type: str): 809 _logger.debug(f"Target coding format: {target_code_type}") 810 concepts_path = phen_path / CONCEPTS_DIR 811 # Create output dataframe 812 out = pd.DataFrame([]) 813 code_errors = [] 814 815 # Process each folder in codes section 816 for concept_set in phenotype["concept_sets"]: 817 _logger.debug(f"--- {concept_set['file']} ---") 818 819 # Load code file 820 codes_file_path = Path(concepts_path / concept_set["file"]["path"]) 821 df = _read_table_file(codes_file_path) 822 823 # process structural actions 824 df = _process_actions(df, concept_set) 825 826 # preprocessing and validate of source concepts 827 _logger.debug("Processing and validating source concept codes") 828 df, errors = _preprocess_source_concepts( 829 df, 830 concept_set, 831 codes_file_path, 832 ) 833 834 # create df with just the source code columns 835 source_column_names = list(concept_set["file"]["columns"].keys()) 836 source_df = df[source_column_names] 837 838 _logger.debug(source_df.columns) 839 _logger.debug(source_df.head()) 840 841 _logger.debug( 842 f"Length of errors from _preprocess_source_concepts {len(errors)}" 843 ) 844 if len(errors) > 0: 845 code_errors.extend(errors) 846 _logger.debug(f" Length of code_errors {len(code_errors)}") 847 848 # Map source concepts codes to target codes 849 # if processing a source coding list with categorical data 850 if ( 851 "actions" in concept_set["file"] 852 and "divide_col" in concept_set["file"]["actions"] 853 and len(df) > 0 854 ): 855 divide_col = concept_set["file"]["actions"]["divide_col"] 856 _logger.debug(f"Action: Dividing Table by {divide_col}") 857 _logger.debug(f"column into: {df[divide_col].unique()}") 858 df_grp = df.groupby(divide_col) 859 for cat, grp in df_grp: 860 if cat == concept_set["file"]["category"]: 861 grp = grp.drop(columns=[divide_col]) # delete categorical column 862 source_df = grp[source_column_names] 863 trans_out = translate_codes( 864 source_df, 865 target_code_type=target_code_type, 866 concept_name=concept_set["name"], 867 ) 868 out = pd.concat([out, trans_out]) 869 else: 870 source_df = df[source_column_names] 871 trans_out = translate_codes( 872 source_df, 873 target_code_type=target_code_type, 874 concept_name=concept_set["name"], 875 ) 876 out = pd.concat([out, trans_out]) 877 878 if len(code_errors) > 0: 879 _logger.error(f"The map processing has {len(code_errors)} errors") 880 error_path = phen_path / MAP_DIR / "errors" 881 error_path.mkdir(parents=True, exist_ok=True) 882 error_filename = f"{target_code_type}-code-errors.csv" 883 _write_code_errors(code_errors, error_path / error_filename) 884 885 # Check there is output from processing 886 if len(out.index) == 0: 887 _logger.error(f"No output after map processing") 888 raise Exception( 889 f"No output after map processing, check config {str(phen_path.resolve())}" 890 ) 891 892 # final processing 893 out = out.reset_index(drop=True) 894 out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) 895 out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"]) 896 897 out_count = len(out.index) 898 # added metadata 899 # Loop over each source_concept_type and perform the left join on all columns apart from source code columns 900 result_list = [] 901 source_column_names = list(concept_set["file"]["columns"].keys()) 902 for source_concept_type in source_column_names: 903 # Filter output based on the current source_concept_type 904 out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type] 905 filtered_count = len(out_filtered_df.index) 906 907 # Remove the source type columns except the current type will leave the metadata and the join 908 remove_types = [ 909 type for type in source_column_names if type != source_concept_type 910 ] 911 metadata_df = df.drop(columns=remove_types) 912 metadata_df = metadata_df.rename( 913 columns={source_concept_type: "SOURCE_CONCEPT"} 914 ) 915 metadata_df_count = len(metadata_df.index) 916 917 # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata 918 result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT") 919 result_count = len(result.index) 920 921 _logger.debug( 922 f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}" 923 ) 924 925 # Append the result to the result_list 926 result_list.append(result) 927 928 # Concatenate all the results into a single DataFrame 929 final_out = pd.concat(result_list, ignore_index=True) 930 final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) 931 _logger.debug( 932 f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}" 933 ) 934 935 # Save output to map directory 936 output_filename = target_code_type + ".csv" 937 map_path = phen_path / MAP_DIR / output_filename 938 final_out.to_csv(map_path, index=False) 939 _logger.info(f"Saved mapped concepts to {str(map_path.resolve())}") 940 941 # save concept sets as separate files 942 concept_set_path = phen_path / CSV_PATH / target_code_type 943 944 # empty the concept-set directory except for hiddle files, e.g. .git 945 if concept_set_path.exists(): 946 for item in concept_set_path.iterdir(): 947 if not item.name.startswith("."): 948 item.unlink() 949 else: 950 concept_set_path.mkdir(parents=True, exist_ok=True) 951 952 # write each concept as a separate file 953 for name, concept in final_out.groupby("CONCEPT_SET"): 954 concept = concept.sort_values(by="CONCEPT") # sort rows 955 concept = concept.dropna(how="all", axis=1) # remove empty cols 956 concept = concept.reindex( 957 sorted(concept.columns), axis=1 958 ) # sort cols alphabetically 959 filename = f"{name}.csv" 960 concept_path = concept_set_path / filename 961 concept.to_csv(concept_path, index=False) 962 963 write_vocab_version(phen_path) 964 965 _logger.info(f"Phenotype processed target code type {target_code_type}") 966 967 968def _generate_version_tag( 969 repo: git.Repo, increment: str = DEFAULT_VERSION_INC, use_v_prefix: bool = False 970) -> str: 971 # Get all valid semantic version tags 972 versions = [] 973 for tag in repo.tags: 974 if tag.name.startswith("v"): 975 tag_name = tag.name[1:] # Remove the first character 976 else: 977 tag_name = tag.name 978 if semver.Version.is_valid(tag_name): 979 versions.append(semver.Version.parse(tag_name)) 980 981 _logger.debug(f"Versions: {versions}") 982 # Determine the next version 983 if not versions: 984 new_version = semver.Version(0, 0, 1) 985 else: 986 latest_version = max(versions) 987 if increment == "major": 988 new_version = latest_version.bump_major() 989 elif increment == "minor": 990 new_version = latest_version.bump_minor() 991 else: 992 new_version = latest_version.bump_patch() 993 994 # Create the new tag 995 new_version_str = f"v{new_version}" if use_v_prefix else str(new_version) 996 997 return new_version_str 998 999 1000def publish( 1001 phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC 1002): 1003 """Publishes updates to the phenotype by commiting all changes to the repo directory""" 1004 1005 # Validate config 1006 validate(phen_dir) 1007 phen_path = Path(phen_dir) 1008 1009 # load git repo and set the branch 1010 repo = git.Repo(phen_path) 1011 if DEFAULT_GIT_BRANCH in repo.branches: 1012 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 1013 main_branch.checkout() 1014 else: 1015 raise AttributeError( 1016 f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}" 1017 ) 1018 1019 # check if any changes to publish 1020 if not repo.is_dirty() and not repo.untracked_files: 1021 if remote_url is not None and "origin" not in repo.remotes: 1022 _logger.info(f"First publish to remote url {remote_url}") 1023 else: 1024 _logger.info("Nothing to publish, no changes to the repo") 1025 return 1026 1027 # get next version 1028 new_version_str = _generate_version_tag(repo, increment) 1029 _logger.info(f"New version: {new_version_str}") 1030 1031 # Write version in configuration file 1032 config_path = phen_path / CONFIG_FILE 1033 with config_path.open("r") as file: 1034 config = yaml.safe_load(file) 1035 1036 config["phenotype"]["version"] = new_version_str 1037 with open(config_path, "w") as file: 1038 yaml.dump( 1039 config, 1040 file, 1041 Dumper=util.QuotedDumper, 1042 default_flow_style=False, 1043 sort_keys=False, 1044 default_style='"', 1045 ) 1046 1047 # Add and commit changes to repo including version updates 1048 commit_message = f"Committing updates to phenotype {phen_path}" 1049 repo.git.add("--all") 1050 repo.index.commit(commit_message) 1051 1052 # Add tag to the repo 1053 repo.create_tag(new_version_str) 1054 1055 # push to origin if a remote repo 1056 if remote_url is not None and "origin" not in repo.remotes: 1057 git_url = _construct_git_url(remote_url) 1058 repo.create_remote("origin", git_url) 1059 1060 try: 1061 if "origin" in repo.remotes: 1062 _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}") 1063 origin = repo.remotes.origin 1064 _logger.info(f"Pushing main branch to remote repo") 1065 repo.git.push("--set-upstream", "origin", "main") 1066 _logger.info(f"Pushing version tags to remote git repo") 1067 origin.push(tags=True) 1068 _logger.debug("Changes pushed to 'origin'") 1069 else: 1070 _logger.debug("Remote 'origin' is not set") 1071 except Exception as e: 1072 tag_ref = repo.tags[new_version_str] 1073 repo.delete_tag(tag_ref) 1074 repo.git.reset("--soft", "HEAD~1") 1075 raise e 1076 1077 _logger.info(f"Phenotype published successfully") 1078 1079 1080def export(phen_dir: str, version: str): 1081 """Exports a phen repo at a specific tagged version into a target directory""" 1082 _logger.info(f"Exporting phenotype {phen_dir} at version {version}") 1083 1084 # validate configuration 1085 validate(phen_dir) 1086 phen_path = Path(phen_dir) 1087 1088 # load configuration 1089 config_path = phen_path / CONFIG_FILE 1090 with config_path.open("r") as file: 1091 config = yaml.safe_load(file) 1092 1093 map_path = phen_path / MAP_DIR 1094 if not map_path.exists(): 1095 _logger.warning(f"Map path does not exist '{map_path}'") 1096 1097 export_path = phen_path / OMOP_PATH 1098 # check export directory exists and if not create it 1099 if not export_path.exists(): 1100 export_path.mkdir(parents=True) 1101 _logger.debug(f"OMOP export directory '{export_path}' created.") 1102 1103 # omop export db 1104 export_db_path = omop.export( 1105 map_path, 1106 export_path, 1107 config["phenotype"]["version"], 1108 config["phenotype"]["omop"], 1109 ) 1110 1111 # write to tables 1112 # export as csv 1113 _logger.info(f"Phenotype exported successfully") 1114 1115 1116def copy(phen_dir: str, target_dir: str, version: str): 1117 """Copys a phen repo at a specific tagged version into a target directory""" 1118 1119 # Validate 1120 validate(phen_dir) 1121 phen_path = Path(phen_dir) 1122 1123 # Check target directory exists 1124 target_path = Path(target_dir) 1125 if not target_path.exists(): 1126 raise FileNotFoundError(f"The target directory {target_path} does not exist") 1127 1128 # Set copy directory 1129 copy_path = target_path / version 1130 _logger.info(f"Copying repo {phen_path} to {copy_path}") 1131 1132 if ( 1133 copy_path.exists() and copy_path.is_dir() 1134 ): # Check if it exists and is a directory 1135 copy = _check_delete_dir( 1136 copy_path, 1137 f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ", 1138 ) 1139 else: 1140 copy = True 1141 1142 if not copy: 1143 _logger.info(f"Not copying the version {version}") 1144 return 1145 1146 _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...") 1147 repo = git.Repo.clone_from(phen_path, copy_path) 1148 1149 # Check out the latest commit or specified version 1150 if version: 1151 # Checkout a specific version (e.g., branch, tag, or commit hash) 1152 _logger.info(f"Checking out version {version}...") 1153 repo.git.checkout(version) 1154 else: 1155 # Checkout the latest commit (HEAD) 1156 _logger.info(f"Checking out the latest commit...") 1157 repo.git.checkout("HEAD") 1158 1159 _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}") 1160 1161 _logger.info(f"Phenotype copied successfully") 1162 1163 1164# Convert concept_sets list into dictionaries 1165def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]: 1166 """Extracts concepts as {name: file_path} dictionary and a name set.""" 1167 concepts_dict = { 1168 item["name"]: item["file"]["path"] 1169 for item in config_data["phenotype"]["concept_sets"] 1170 } 1171 name_set = set(concepts_dict.keys()) 1172 return concepts_dict, name_set 1173 1174 1175def _extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]: 1176 """ 1177 Extracts clean keys from a DeepDiff dictionary. 1178 1179 :param diff: DeepDiff result dictionary 1180 :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed") 1181 :return: A set of clean key names 1182 """ 1183 return {key.split("root['")[1].split("']")[0] for key in diff.get(key_type, [])} 1184 1185 1186def diff_config(old_config: dict, new_config: dict) -> str: 1187 report = f"\n# Changes to phenotype configuration\n" 1188 report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n" 1189 1190 old_concepts, old_names = extract_concepts(old_config) 1191 new_concepts, new_names = extract_concepts(new_config) 1192 1193 # Check added and removed names 1194 added_names = new_names - old_names # Names that appear in new but not in old 1195 removed_names = old_names - new_names # Names that were in old but not in new 1196 1197 # find file path changes for unchanged names 1198 unchanged_names = old_names & new_names # Names that exist in both 1199 file_diff = DeepDiff( 1200 {name: old_concepts[name] for name in unchanged_names}, 1201 {name: new_concepts[name] for name in unchanged_names}, 1202 ) 1203 1204 # Find renamed concepts (same file, different name) 1205 renamed_concepts = [] 1206 for removed in removed_names: 1207 old_path = old_concepts[removed] 1208 for added in added_names: 1209 new_path = new_concepts[added] 1210 if old_path == new_path: 1211 renamed_concepts.append((removed, added)) 1212 1213 # Remove renamed concepts from added and removed sets 1214 for old_name, new_name in renamed_concepts: 1215 added_names.discard(new_name) 1216 removed_names.discard(old_name) 1217 1218 # generate config report 1219 if added_names: 1220 report += "## Added Concepts\n" 1221 for name in added_names: 1222 report += f"- `{name}` (File: `{new_concepts[name]}`)\n" 1223 report += "\n" 1224 1225 if removed_names: 1226 report += "## Removed Concepts\n" 1227 for name in removed_names: 1228 report += f"- `{name}` (File: `{old_concepts[name]}`)\n" 1229 report += "\n" 1230 1231 if renamed_concepts: 1232 report += "## Renamed Concepts\n" 1233 for old_name, new_name in renamed_concepts: 1234 report += ( 1235 f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n" 1236 ) 1237 report += "\n" 1238 1239 if "values_changed" in file_diff: 1240 report += "## Updated File Paths\n" 1241 for name, change in file_diff["values_changed"].items(): 1242 old_file = change["old_value"] 1243 new_file = change["new_value"] 1244 clean_name = name.split("root['")[1].split("']")[0] 1245 report += ( 1246 f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n" 1247 ) 1248 report += "\n" 1249 1250 if not ( 1251 added_names 1252 or removed_names 1253 or renamed_concepts 1254 or file_diff.get("values_changed") 1255 ): 1256 report += "No changes in concept sets.\n" 1257 1258 return report 1259 1260 1261def diff_map_files(old_map_path: Path, new_map_path: Path) -> str: 1262 old_output_files = [ 1263 file.name 1264 for file in old_map_path.iterdir() 1265 if file.is_file() and not file.name.startswith(".") 1266 ] 1267 new_output_files = [ 1268 file.name 1269 for file in new_map_path.iterdir() 1270 if file.is_file() and not file.name.startswith(".") 1271 ] 1272 1273 # Convert the lists to sets for easy comparison 1274 old_output_set = set(old_output_files) 1275 new_output_set = set(new_output_files) 1276 1277 # Outputs that are in old_output_set but not in new_output_set (removed files) 1278 removed_outputs = old_output_set - new_output_set 1279 # Outputs that are in new_output_set but not in old_output_set (added files) 1280 added_outputs = new_output_set - old_output_set 1281 # Outputs that are the intersection of old_output_set and new_output_set 1282 common_outputs = old_output_set & new_output_set 1283 1284 report = f"\n# Changes to available translations\n" 1285 report += f"This compares the coding translations files available.\n\n" 1286 report += f"- Removed outputs: {sorted(list(removed_outputs))}\n" 1287 report += f"- Added outputs: {sorted(list(added_outputs))}\n" 1288 report += f"- Common outputs: {sorted(list(common_outputs))}\n\n" 1289 1290 # Step N: Compare common outputs between versions 1291 report += f"# Changes to concepts in translation files\n\n" 1292 report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n" 1293 for file in common_outputs: 1294 old_output = old_map_path / file 1295 new_output = new_map_path / file 1296 1297 _logger.debug(f"Old ouptput: {str(old_output.resolve())}") 1298 _logger.debug(f"New ouptput: {str(new_output.resolve())}") 1299 1300 df1 = pd.read_csv(old_output) 1301 df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1302 df2 = pd.read_csv(new_output) 1303 df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1304 1305 # Check for added and removed concepts 1306 report += f"- File {file}\n" 1307 sorted_list = sorted(list(set(df1.index) - set(df2.index))) 1308 report += f"- Removed concepts {sorted_list}\n" 1309 sorted_list = sorted(list(set(df2.index) - set(df1.index))) 1310 report += f"- Added concepts {sorted_list}\n" 1311 1312 # Check for changed concepts 1313 diff = df2 - df1 # diff in counts 1314 diff = diff[ 1315 (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna() 1316 ] # get non-zero counts 1317 s = "\n" 1318 if len(diff.index) > 0: 1319 for concept, row in diff.iterrows(): 1320 s += "\t - {} {}\n".format(concept, row["CONCEPT"]) 1321 report += f"- Changed concepts {s}\n\n" 1322 else: 1323 report += f"- Changed concepts []\n\n" 1324 1325 return report 1326 1327 1328def diff_phen( 1329 new_phen_path: Path, 1330 new_version: str, 1331 old_phen_path: Path, 1332 old_version: str, 1333 report_path: Path, 1334): 1335 """Compare the differences between two versions of a phenotype""" 1336 1337 # validate phenotypes 1338 _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}") 1339 validate(str(old_phen_path.resolve())) 1340 _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}") 1341 validate(str(new_phen_path.resolve())) 1342 1343 # get old and new config 1344 old_config_path = old_phen_path / CONFIG_FILE 1345 with old_config_path.open("r") as file: 1346 old_config = yaml.safe_load(file) 1347 new_config_path = new_phen_path / CONFIG_FILE 1348 with new_config_path.open("r") as file: 1349 new_config = yaml.safe_load(file) 1350 1351 # write report heading 1352 report = f"# Phenotype Comparison Report\n" 1353 report += f"## Original phenotype\n" 1354 report += f" - {old_config['phenotype']['omop']['vocabulary_id']}\n" 1355 report += f" - {old_version}\n" 1356 report += f" - {str(old_phen_path.resolve())}\n" 1357 report += f"## Changed phenotype:\n" 1358 report += f" - {new_config['phenotype']['omop']['vocabulary_id']}\n" 1359 report += f" - {new_version}\n" 1360 report += f" - {str(new_phen_path.resolve())}\n" 1361 1362 # Step 1: check differences configuration files 1363 # Convert list of dicts into a dict: {name: file} 1364 report += diff_config(old_config, new_config) 1365 1366 # Step 2: check differences between map files 1367 # List files from output directories 1368 old_map_path = old_phen_path / MAP_DIR 1369 new_map_path = new_phen_path / MAP_DIR 1370 report += diff_map_files(old_map_path, new_map_path) 1371 1372 # initialise report file 1373 _logger.debug(f"Writing to report file {str(report_path.resolve())}") 1374 report_file = open(report_path, "w") 1375 report_file.write(report) 1376 report_file.close() 1377 1378 _logger.info(f"Phenotypes diff'd successfully") 1379 1380 1381def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str): 1382 # make tmp directory .acmc 1383 timestamp = time.strftime("%Y%m%d_%H%M%S") 1384 temp_dir = Path(f".acmc/diff_{timestamp}") 1385 1386 changed_phen_path = Path(phen_dir) 1387 if not changed_phen_path.exists(): 1388 raise ValueError( 1389 f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}" 1390 ) 1391 1392 old_phen_path = Path(old_phen_dir) 1393 if not old_phen_path.exists(): 1394 raise ValueError( 1395 f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}" 1396 ) 1397 1398 t_path = old_phen_path / "config.yml" 1399 with t_path.open("r") as file: 1400 c = yaml.safe_load(file) 1401 1402 try: 1403 # Create the directory 1404 temp_dir.mkdir(parents=True, exist_ok=True) 1405 _logger.debug(f"Temporary directory created: {temp_dir}") 1406 1407 # Create temporary directories 1408 changed_path = temp_dir / "changed" 1409 changed_path.mkdir(parents=True, exist_ok=True) 1410 old_path = temp_dir / "old" 1411 old_path.mkdir(parents=True, exist_ok=True) 1412 1413 # checkout changed 1414 if version == "latest": 1415 _logger.debug( 1416 f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..." 1417 ) 1418 shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True) 1419 else: 1420 _logger.debug( 1421 f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..." 1422 ) 1423 changed_repo = git.Repo.clone_from(changed_phen_path, changed_path) 1424 changed_repo.git.checkout(version) 1425 1426 # checkout old 1427 if old_version == "latest": 1428 _logger.debug( 1429 f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1430 ) 1431 shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True) 1432 else: 1433 _logger.debug( 1434 f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1435 ) 1436 old_repo = git.Repo.clone_from(old_phen_dir, old_path) 1437 old_repo.git.checkout(old_version) 1438 1439 report_filename = f"{version}_{old_version}_diff.md" 1440 report_path = changed_phen_path / report_filename 1441 # diff old with new 1442 diff_phen(changed_path, version, old_path, old_version, report_path) 1443 1444 finally: 1445 # clean up tmp directory 1446 if temp_dir.exists(): 1447 shutil.rmtree(temp_dir) 1448 print(f"Temporary directory removed: {temp_dir}")
Default phenotype directory name
Default phenotype directory path
Default concepts directory name
Default map directory name
Default concept set directory name
Default CSV concept set directory path
Default OMOP concept set directory path
List of default phenotype directories
Default configuration filename
Default vocabulary version filename
List of semantic version increment types
Default semantic version increment type
Default phenotype repo branch name
Split column preprocessing action type
Codes column preprocessing action type
Divide column preprocessing action type
List of column preprocessing action types
List of supported source concept coding list file types
Phenotype config.yml schema definition
155class PhenValidationException(Exception): 156 """Custom exception class raised when validation errors in phenotype configuration file""" 157 158 def __init__(self, message, validation_errors=None): 159 super().__init__(message) 160 self.validation_errors = validation_errors
Custom exception class raised when validation errors in phenotype configuration file
229def init(phen_dir: str, remote_url: str): 230 """Initial phenotype directory as git repo with standard structure""" 231 _logger.info(f"Initialising Phenotype in directory: {phen_dir}") 232 phen_path = Path(phen_dir) 233 234 # check if directory already exists and ask user if they want to recreate it 235 if ( 236 phen_path.exists() and phen_path.is_dir() 237 ): # Check if it exists and is a directory 238 configure = _check_delete_dir( 239 phen_path, 240 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 241 ) 242 else: 243 configure = True 244 245 if not configure: 246 _logger.info(f"Exiting, phenotype not initiatised") 247 return 248 249 # Initialise repo from local or remote 250 repo: Repo 251 252 # if remote then clone the repo otherwise init a local repo 253 if remote_url != None: 254 # add PAT token to the URL 255 git_url = _construct_git_url(remote_url) 256 257 # clone the repo 258 git_cmd = git.cmd.Git() 259 git_cmd.clone(git_url, phen_path) 260 261 # open repo 262 repo = Repo(phen_path) 263 # check if there are any commits (new repo has no commits) 264 if ( 265 len(repo.branches) == 0 or repo.head.is_detached 266 ): # Handle detached HEAD (e.g., after init) 267 _logger.debug("The phen repository has no commits yet.") 268 commit_count = 0 269 else: 270 # Get the total number of commits in the default branch 271 commit_count = sum(1 for _ in repo.iter_commits()) 272 _logger.debug(f"Repo has previous commits: {commit_count}") 273 else: 274 # local repo, create the directories and init 275 phen_path.mkdir(parents=True, exist_ok=True) 276 _logger.debug(f"Phen directory '{phen_path}' has been created.") 277 repo = git.Repo.init(phen_path) 278 commit_count = 0 279 280 phen_path = phen_path.resolve() 281 # initialise empty repos 282 if commit_count == 0: 283 # create initial commit 284 initial_file_path = phen_path / "README.md" 285 with open(initial_file_path, "w") as file: 286 file.write( 287 "# Initial commit\nThis is the first commit in the phen repository.\n" 288 ) 289 repo.index.add([initial_file_path]) 290 repo.index.commit("Initial commit") 291 commit_count = 1 292 293 # Checkout the phens default branch, creating it if it does not exist 294 if DEFAULT_GIT_BRANCH in repo.branches: 295 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 296 main_branch.checkout() 297 else: 298 main_branch = repo.create_head(DEFAULT_GIT_BRANCH) 299 main_branch.checkout() 300 301 # if the phen path does not contain the config file then initialise the phen type 302 config_path = phen_path / CONFIG_FILE 303 if config_path.exists(): 304 _logger.debug(f"Phenotype configuration files already exist") 305 return 306 307 _logger.info("Creating phen directory structure and config files") 308 for d in DEFAULT_PHEN_DIR_LIST: 309 _create_empty_git_dir(phen_path / d) 310 311 # create empty phen config file 312 config = { 313 "phenotype": { 314 "version": "0.0.0", 315 "omop": { 316 "vocabulary_id": "", 317 "vocabulary_name": "", 318 "vocabulary_reference": "", 319 }, 320 "translate": [], 321 "concept_sets": [], 322 } 323 } 324 325 with open(phen_path / CONFIG_FILE, "w") as file: 326 yaml.dump( 327 config, 328 file, 329 Dumper=util.QuotedDumper, 330 default_flow_style=False, 331 sort_keys=False, 332 default_style='"', 333 ) 334 335 # add git ignore 336 ignore_content = """# Ignore SQLite database files 337*.db 338*.sqlite3 339 340# Ignore SQLite journal and metadata files 341*.db-journal 342*.sqlite3-journal 343 344# python 345.ipynb_checkpoints 346 """ 347 ignore_path = phen_path / ".gitignore" 348 with open(ignore_path, "w") as file: 349 file.write(ignore_content) 350 351 # add to git repo and commit 352 for d in DEFAULT_PHEN_DIR_LIST: 353 repo.git.add(phen_path / d) 354 repo.git.add(all=True) 355 repo.index.commit("initialised the phen git repo.") 356 357 _logger.info(f"Phenotype initialised successfully")
Initial phenotype directory as git repo with standard structure
360def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str): 361 """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin" 362 363 Args: 364 phen_dir (str): local directory path where the upstream repo is to be cloned 365 upstream_url (str): url to the upstream repo 366 upstream_version (str): version in the upstream repo to clone 367 new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None. 368 369 Raises: 370 ValueError: if the specified version is not in the upstream repo 371 ValueError: if the upstream repo is not a valid phenotype repo 372 ValueError: if there's any other problems with Git 373 """ 374 _logger.info( 375 f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}" 376 ) 377 378 phen_path = Path(phen_dir) 379 # check if directory already exists and ask user if they want to recreate it 380 if ( 381 phen_path.exists() and phen_path.is_dir() 382 ): # Check if it exists and is a directory 383 configure = _check_delete_dir( 384 phen_path, 385 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 386 ) 387 else: 388 configure = True 389 390 if not configure: 391 _logger.info(f"Exiting, phenotype not initiatised") 392 return 393 394 try: 395 # Clone repo 396 git_url = _construct_git_url(upstream_url) 397 repo = git.Repo.clone_from(git_url, phen_path) 398 399 # Fetch all branches and tags 400 repo.remotes.origin.fetch() 401 402 # Check if the version exists 403 available_refs = [ref.name.split("/")[-1] for ref in repo.references] 404 if upstream_version not in available_refs: 405 raise ValueError( 406 f"Version '{upstream_version}' not found in the repository: {upstream_url}." 407 ) 408 409 # Checkout the specified version 410 repo.git.checkout(upstream_version) 411 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 412 main_branch.checkout() 413 414 # Check if 'config.yml' exists in the root directory 415 config_path = phen_path / "config.yml" 416 if not os.path.isfile(config_path): 417 raise ValueError( 418 f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory." 419 ) 420 421 # Validate the phenotype is compatible with the acmc tool 422 validate(str(phen_path.resolve())) 423 424 # Delete each tag locally 425 tags = repo.tags 426 for tag in tags: 427 repo.delete_tag(tag) 428 _logger.debug(f"Deleted tags from forked repo: {tag}") 429 430 # Add upstream remote 431 repo.create_remote("upstream", upstream_url) 432 remote = repo.remotes["origin"] 433 repo.delete_remote(remote) # Remove existing origin 434 435 # Optionally set a new origin remote 436 if new_origin_url: 437 git_url = _construct_git_url(new_origin_url) 438 repo.create_remote("origin", git_url) 439 repo.git.push("--set-upstream", "origin", "main") 440 441 _logger.info(f"Repository forked successfully at {phen_path}") 442 _logger.info(f"Upstream set to {upstream_url}") 443 if new_origin_url: 444 _logger.info(f"Origin set to {new_origin_url}") 445 446 except Exception as e: 447 if phen_path.exists(): 448 shutil.rmtree(phen_path) 449 raise ValueError(f"Error occurred during repository fork: {str(e)}")
Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
Arguments:
- phen_dir (str): local directory path where the upstream repo is to be cloned
- upstream_url (str): url to the upstream repo
- upstream_version (str): version in the upstream repo to clone
- new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
Raises:
- ValueError: if the specified version is not in the upstream repo
- ValueError: if the upstream repo is not a valid phenotype repo
- ValueError: if there's any other problems with Git
452def validate(phen_dir: str): 453 """Validates the phenotype directory is a git repo with standard structure""" 454 _logger.info(f"Validating phenotype: {phen_dir}") 455 phen_path = Path(phen_dir) 456 if not phen_path.is_dir(): 457 raise NotADirectoryError( 458 f"Error: '{str(phen_path.resolve())}' is not a directory" 459 ) 460 461 config_path = phen_path / CONFIG_FILE 462 if not config_path.is_file(): 463 raise FileNotFoundError( 464 f"Error: phen configuration file '{config_path}' does not exist." 465 ) 466 467 concepts_path = phen_path / CONCEPTS_DIR 468 if not concepts_path.is_dir(): 469 raise FileNotFoundError( 470 f"Error: source concepts directory {concepts_path} does not exist." 471 ) 472 473 # Calidate the directory is a git repo 474 try: 475 git.Repo(phen_path) 476 except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError): 477 raise Exception(f"Phen directory {phen_path} is not a git repo") 478 479 # Load configuration File 480 if config_path.suffix == ".yml": 481 try: 482 with config_path.open("r") as file: 483 phenotype = yaml.safe_load(file) 484 485 validator = Validator(CONFIG_SCHEMA) 486 if validator.validate(phenotype): 487 _logger.debug("YAML structure is valid.") 488 else: 489 _logger.error(f"YAML structure validation failed: {validator.errors}") 490 raise Exception(f"YAML structure validation failed: {validator.errors}") 491 except yaml.YAMLError as e: 492 _logger.error(f"YAML syntax error: {e}") 493 raise e 494 else: 495 raise Exception( 496 f"Unsupported configuration filetype: {str(config_path.resolve())}" 497 ) 498 499 # initiatise 500 validation_errors = [] 501 phenotype = phenotype["phenotype"] 502 code_types = parse.CodeTypeParser().code_types 503 504 # check the version number is of the format vn.n.n 505 match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"]) 506 if not match: 507 validation_errors.append( 508 f"Invalid version format in configuration file: {phenotype['version']}" 509 ) 510 511 # create a list of all the concept set names defined in the concept set configuration 512 concept_set_names = [] 513 for item in phenotype["concept_sets"]: 514 if item["name"] in concept_set_names: 515 validation_errors.append( 516 f"Duplicate concept set defined in concept sets {item['name'] }" 517 ) 518 else: 519 concept_set_names.append(item["name"]) 520 521 # check codes definition 522 for item in phenotype["concept_sets"]: 523 # check concepte code file exists 524 concept_code_file_path = concepts_path / item["file"]["path"] 525 if not concept_code_file_path.exists(): 526 validation_errors.append( 527 f"Coding file {str(concept_code_file_path.resolve())} does not exist" 528 ) 529 530 # check concepte code file is not empty 531 if concept_code_file_path.stat().st_size == 0: 532 validation_errors.append( 533 f"Coding file {str(concept_code_file_path.resolve())} is an empty file" 534 ) 535 536 # check code file type is supported 537 if concept_code_file_path.suffix not in CODE_FILE_TYPES: 538 raise ValueError( 539 f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types" 540 ) 541 542 # check columns specified are a supported medical coding type 543 for column in item["file"]["columns"]: 544 if column not in code_types: 545 validation_errors.append( 546 f"Column type {column} for file {concept_code_file_path} is not supported" 547 ) 548 549 # check the actions are supported 550 if "actions" in item["file"]: 551 for action in item["file"]["actions"]: 552 if action not in COL_ACTIONS: 553 validation_errors.append(f"Action {action} is not supported") 554 555 if len(validation_errors) > 0: 556 _logger.error(validation_errors) 557 raise PhenValidationException( 558 f"Configuration file {str(config_path.resolve())} failed validation", 559 validation_errors, 560 ) 561 562 _logger.info(f"Phenotype validated successfully")
Validates the phenotype directory is a git repo with standard structure
656def translate_codes( 657 source_df: pd.DataFrame, target_code_type: str, concept_name: str 658) -> pd.DataFrame: 659 """Translates each source code type the source coding list into a target type and returns all conversions as a concept set""" 660 661 # codes = pd.DataFrame([], dtype=str) 662 codes = pd.DataFrame( 663 columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string" 664 ) 665 # Convert codes to target type 666 _logger.info(f"Converting to target code type {target_code_type}") 667 668 for source_code_type in source_df.columns: 669 # if target code type is the same as thet source code type, no translation, just appending source as target 670 if source_code_type == target_code_type: 671 copy_df = pd.DataFrame( 672 { 673 "SOURCE_CONCEPT": source_df[source_code_type], 674 "SOURCE_CONCEPT_TYPE": source_code_type, 675 "CONCEPT": source_df[source_code_type], 676 } 677 ) 678 codes = pd.concat([codes, copy_df]) 679 _logger.debug( 680 f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating" 681 ) 682 else: 683 # get the translation filename using source to target code types 684 filename = f"{source_code_type}_to_{target_code_type}.parquet" 685 map_path = trud.PROCESSED_PATH / filename 686 687 # do the mapping if it exists 688 if map_path.exists(): 689 # get mapping 690 df_map = pd.read_parquet(map_path) 691 692 # do mapping 693 translated_df = pd.merge( 694 source_df[source_code_type], df_map, how="left" 695 ) 696 697 # normalise the output 698 translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"]) 699 translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type 700 701 # add to list of codes 702 codes = pd.concat([codes, translated_df]) 703 704 else: 705 _logger.warning( 706 f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" 707 ) 708 709 codes = codes.dropna() # delete NaNs 710 711 # added concept set type to output if any translations 712 if len(codes.index) > 0: 713 codes["CONCEPT_SET"] = concept_name 714 else: 715 _logger.debug(f"No codes converted with target code type {target_code_type}") 716 717 return codes
Translates each source code type the source coding list into a target type and returns all conversions as a concept set
738def write_vocab_version(phen_path: Path): 739 # write the vocab version files 740 741 if not trud.VERSION_PATH.exists(): 742 raise FileNotFoundError( 743 f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed" 744 ) 745 746 if not omop.VERSION_PATH.exists(): 747 raise FileNotFoundError( 748 f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed" 749 ) 750 751 with trud.VERSION_PATH.open("r") as file: 752 trud_version = yaml.safe_load(file) 753 754 with omop.VERSION_PATH.open("r") as file: 755 omop_version = yaml.safe_load(file) 756 757 # Create the combined YAML structure 758 version_data = { 759 "versions": { 760 "acmc": acmc.__version__, 761 "trud": trud_version, 762 "omop": omop_version, 763 } 764 } 765 766 with open(phen_path / VOCAB_VERSION_FILE, "w") as file: 767 yaml.dump( 768 version_data, 769 file, 770 Dumper=util.QuotedDumper, 771 default_flow_style=False, 772 sort_keys=False, 773 default_style='"', 774 )
777def map(phen_dir: str, target_code_type: str): 778 _logger.info(f"Processing phenotype: {phen_dir}") 779 780 # Validate configuration 781 validate(phen_dir) 782 783 # initialise paths 784 phen_path = Path(phen_dir) 785 config_path = phen_path / CONFIG_FILE 786 787 # load configuration 788 with config_path.open("r") as file: 789 config = yaml.safe_load(file) 790 phenotype = config["phenotype"] 791 792 if len(phenotype["map"]) == 0: 793 raise ValueError(f"No map codes defined in the phenotype configuration") 794 795 if target_code_type is not None and target_code_type not in phenotype["map"]: 796 raise ValueError( 797 f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}" 798 ) 799 800 if target_code_type is not None: 801 _map_target_code_type(phen_path, phenotype, target_code_type) 802 else: 803 for t in phenotype["map"]: 804 _map_target_code_type(phen_path, phenotype, t) 805 806 _logger.info(f"Phenotype processed successfully")
1001def publish( 1002 phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC 1003): 1004 """Publishes updates to the phenotype by commiting all changes to the repo directory""" 1005 1006 # Validate config 1007 validate(phen_dir) 1008 phen_path = Path(phen_dir) 1009 1010 # load git repo and set the branch 1011 repo = git.Repo(phen_path) 1012 if DEFAULT_GIT_BRANCH in repo.branches: 1013 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 1014 main_branch.checkout() 1015 else: 1016 raise AttributeError( 1017 f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}" 1018 ) 1019 1020 # check if any changes to publish 1021 if not repo.is_dirty() and not repo.untracked_files: 1022 if remote_url is not None and "origin" not in repo.remotes: 1023 _logger.info(f"First publish to remote url {remote_url}") 1024 else: 1025 _logger.info("Nothing to publish, no changes to the repo") 1026 return 1027 1028 # get next version 1029 new_version_str = _generate_version_tag(repo, increment) 1030 _logger.info(f"New version: {new_version_str}") 1031 1032 # Write version in configuration file 1033 config_path = phen_path / CONFIG_FILE 1034 with config_path.open("r") as file: 1035 config = yaml.safe_load(file) 1036 1037 config["phenotype"]["version"] = new_version_str 1038 with open(config_path, "w") as file: 1039 yaml.dump( 1040 config, 1041 file, 1042 Dumper=util.QuotedDumper, 1043 default_flow_style=False, 1044 sort_keys=False, 1045 default_style='"', 1046 ) 1047 1048 # Add and commit changes to repo including version updates 1049 commit_message = f"Committing updates to phenotype {phen_path}" 1050 repo.git.add("--all") 1051 repo.index.commit(commit_message) 1052 1053 # Add tag to the repo 1054 repo.create_tag(new_version_str) 1055 1056 # push to origin if a remote repo 1057 if remote_url is not None and "origin" not in repo.remotes: 1058 git_url = _construct_git_url(remote_url) 1059 repo.create_remote("origin", git_url) 1060 1061 try: 1062 if "origin" in repo.remotes: 1063 _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}") 1064 origin = repo.remotes.origin 1065 _logger.info(f"Pushing main branch to remote repo") 1066 repo.git.push("--set-upstream", "origin", "main") 1067 _logger.info(f"Pushing version tags to remote git repo") 1068 origin.push(tags=True) 1069 _logger.debug("Changes pushed to 'origin'") 1070 else: 1071 _logger.debug("Remote 'origin' is not set") 1072 except Exception as e: 1073 tag_ref = repo.tags[new_version_str] 1074 repo.delete_tag(tag_ref) 1075 repo.git.reset("--soft", "HEAD~1") 1076 raise e 1077 1078 _logger.info(f"Phenotype published successfully")
Publishes updates to the phenotype by commiting all changes to the repo directory
1081def export(phen_dir: str, version: str): 1082 """Exports a phen repo at a specific tagged version into a target directory""" 1083 _logger.info(f"Exporting phenotype {phen_dir} at version {version}") 1084 1085 # validate configuration 1086 validate(phen_dir) 1087 phen_path = Path(phen_dir) 1088 1089 # load configuration 1090 config_path = phen_path / CONFIG_FILE 1091 with config_path.open("r") as file: 1092 config = yaml.safe_load(file) 1093 1094 map_path = phen_path / MAP_DIR 1095 if not map_path.exists(): 1096 _logger.warning(f"Map path does not exist '{map_path}'") 1097 1098 export_path = phen_path / OMOP_PATH 1099 # check export directory exists and if not create it 1100 if not export_path.exists(): 1101 export_path.mkdir(parents=True) 1102 _logger.debug(f"OMOP export directory '{export_path}' created.") 1103 1104 # omop export db 1105 export_db_path = omop.export( 1106 map_path, 1107 export_path, 1108 config["phenotype"]["version"], 1109 config["phenotype"]["omop"], 1110 ) 1111 1112 # write to tables 1113 # export as csv 1114 _logger.info(f"Phenotype exported successfully")
Exports a phen repo at a specific tagged version into a target directory
1117def copy(phen_dir: str, target_dir: str, version: str): 1118 """Copys a phen repo at a specific tagged version into a target directory""" 1119 1120 # Validate 1121 validate(phen_dir) 1122 phen_path = Path(phen_dir) 1123 1124 # Check target directory exists 1125 target_path = Path(target_dir) 1126 if not target_path.exists(): 1127 raise FileNotFoundError(f"The target directory {target_path} does not exist") 1128 1129 # Set copy directory 1130 copy_path = target_path / version 1131 _logger.info(f"Copying repo {phen_path} to {copy_path}") 1132 1133 if ( 1134 copy_path.exists() and copy_path.is_dir() 1135 ): # Check if it exists and is a directory 1136 copy = _check_delete_dir( 1137 copy_path, 1138 f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ", 1139 ) 1140 else: 1141 copy = True 1142 1143 if not copy: 1144 _logger.info(f"Not copying the version {version}") 1145 return 1146 1147 _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...") 1148 repo = git.Repo.clone_from(phen_path, copy_path) 1149 1150 # Check out the latest commit or specified version 1151 if version: 1152 # Checkout a specific version (e.g., branch, tag, or commit hash) 1153 _logger.info(f"Checking out version {version}...") 1154 repo.git.checkout(version) 1155 else: 1156 # Checkout the latest commit (HEAD) 1157 _logger.info(f"Checking out the latest commit...") 1158 repo.git.checkout("HEAD") 1159 1160 _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}") 1161 1162 _logger.info(f"Phenotype copied successfully")
Copys a phen repo at a specific tagged version into a target directory
1166def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]: 1167 """Extracts concepts as {name: file_path} dictionary and a name set.""" 1168 concepts_dict = { 1169 item["name"]: item["file"]["path"] 1170 for item in config_data["phenotype"]["concept_sets"] 1171 } 1172 name_set = set(concepts_dict.keys()) 1173 return concepts_dict, name_set
Extracts concepts as {name: file_path} dictionary and a name set.
1187def diff_config(old_config: dict, new_config: dict) -> str: 1188 report = f"\n# Changes to phenotype configuration\n" 1189 report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n" 1190 1191 old_concepts, old_names = extract_concepts(old_config) 1192 new_concepts, new_names = extract_concepts(new_config) 1193 1194 # Check added and removed names 1195 added_names = new_names - old_names # Names that appear in new but not in old 1196 removed_names = old_names - new_names # Names that were in old but not in new 1197 1198 # find file path changes for unchanged names 1199 unchanged_names = old_names & new_names # Names that exist in both 1200 file_diff = DeepDiff( 1201 {name: old_concepts[name] for name in unchanged_names}, 1202 {name: new_concepts[name] for name in unchanged_names}, 1203 ) 1204 1205 # Find renamed concepts (same file, different name) 1206 renamed_concepts = [] 1207 for removed in removed_names: 1208 old_path = old_concepts[removed] 1209 for added in added_names: 1210 new_path = new_concepts[added] 1211 if old_path == new_path: 1212 renamed_concepts.append((removed, added)) 1213 1214 # Remove renamed concepts from added and removed sets 1215 for old_name, new_name in renamed_concepts: 1216 added_names.discard(new_name) 1217 removed_names.discard(old_name) 1218 1219 # generate config report 1220 if added_names: 1221 report += "## Added Concepts\n" 1222 for name in added_names: 1223 report += f"- `{name}` (File: `{new_concepts[name]}`)\n" 1224 report += "\n" 1225 1226 if removed_names: 1227 report += "## Removed Concepts\n" 1228 for name in removed_names: 1229 report += f"- `{name}` (File: `{old_concepts[name]}`)\n" 1230 report += "\n" 1231 1232 if renamed_concepts: 1233 report += "## Renamed Concepts\n" 1234 for old_name, new_name in renamed_concepts: 1235 report += ( 1236 f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n" 1237 ) 1238 report += "\n" 1239 1240 if "values_changed" in file_diff: 1241 report += "## Updated File Paths\n" 1242 for name, change in file_diff["values_changed"].items(): 1243 old_file = change["old_value"] 1244 new_file = change["new_value"] 1245 clean_name = name.split("root['")[1].split("']")[0] 1246 report += ( 1247 f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n" 1248 ) 1249 report += "\n" 1250 1251 if not ( 1252 added_names 1253 or removed_names 1254 or renamed_concepts 1255 or file_diff.get("values_changed") 1256 ): 1257 report += "No changes in concept sets.\n" 1258 1259 return report
1262def diff_map_files(old_map_path: Path, new_map_path: Path) -> str: 1263 old_output_files = [ 1264 file.name 1265 for file in old_map_path.iterdir() 1266 if file.is_file() and not file.name.startswith(".") 1267 ] 1268 new_output_files = [ 1269 file.name 1270 for file in new_map_path.iterdir() 1271 if file.is_file() and not file.name.startswith(".") 1272 ] 1273 1274 # Convert the lists to sets for easy comparison 1275 old_output_set = set(old_output_files) 1276 new_output_set = set(new_output_files) 1277 1278 # Outputs that are in old_output_set but not in new_output_set (removed files) 1279 removed_outputs = old_output_set - new_output_set 1280 # Outputs that are in new_output_set but not in old_output_set (added files) 1281 added_outputs = new_output_set - old_output_set 1282 # Outputs that are the intersection of old_output_set and new_output_set 1283 common_outputs = old_output_set & new_output_set 1284 1285 report = f"\n# Changes to available translations\n" 1286 report += f"This compares the coding translations files available.\n\n" 1287 report += f"- Removed outputs: {sorted(list(removed_outputs))}\n" 1288 report += f"- Added outputs: {sorted(list(added_outputs))}\n" 1289 report += f"- Common outputs: {sorted(list(common_outputs))}\n\n" 1290 1291 # Step N: Compare common outputs between versions 1292 report += f"# Changes to concepts in translation files\n\n" 1293 report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n" 1294 for file in common_outputs: 1295 old_output = old_map_path / file 1296 new_output = new_map_path / file 1297 1298 _logger.debug(f"Old ouptput: {str(old_output.resolve())}") 1299 _logger.debug(f"New ouptput: {str(new_output.resolve())}") 1300 1301 df1 = pd.read_csv(old_output) 1302 df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1303 df2 = pd.read_csv(new_output) 1304 df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1305 1306 # Check for added and removed concepts 1307 report += f"- File {file}\n" 1308 sorted_list = sorted(list(set(df1.index) - set(df2.index))) 1309 report += f"- Removed concepts {sorted_list}\n" 1310 sorted_list = sorted(list(set(df2.index) - set(df1.index))) 1311 report += f"- Added concepts {sorted_list}\n" 1312 1313 # Check for changed concepts 1314 diff = df2 - df1 # diff in counts 1315 diff = diff[ 1316 (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna() 1317 ] # get non-zero counts 1318 s = "\n" 1319 if len(diff.index) > 0: 1320 for concept, row in diff.iterrows(): 1321 s += "\t - {} {}\n".format(concept, row["CONCEPT"]) 1322 report += f"- Changed concepts {s}\n\n" 1323 else: 1324 report += f"- Changed concepts []\n\n" 1325 1326 return report
1329def diff_phen( 1330 new_phen_path: Path, 1331 new_version: str, 1332 old_phen_path: Path, 1333 old_version: str, 1334 report_path: Path, 1335): 1336 """Compare the differences between two versions of a phenotype""" 1337 1338 # validate phenotypes 1339 _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}") 1340 validate(str(old_phen_path.resolve())) 1341 _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}") 1342 validate(str(new_phen_path.resolve())) 1343 1344 # get old and new config 1345 old_config_path = old_phen_path / CONFIG_FILE 1346 with old_config_path.open("r") as file: 1347 old_config = yaml.safe_load(file) 1348 new_config_path = new_phen_path / CONFIG_FILE 1349 with new_config_path.open("r") as file: 1350 new_config = yaml.safe_load(file) 1351 1352 # write report heading 1353 report = f"# Phenotype Comparison Report\n" 1354 report += f"## Original phenotype\n" 1355 report += f" - {old_config['phenotype']['omop']['vocabulary_id']}\n" 1356 report += f" - {old_version}\n" 1357 report += f" - {str(old_phen_path.resolve())}\n" 1358 report += f"## Changed phenotype:\n" 1359 report += f" - {new_config['phenotype']['omop']['vocabulary_id']}\n" 1360 report += f" - {new_version}\n" 1361 report += f" - {str(new_phen_path.resolve())}\n" 1362 1363 # Step 1: check differences configuration files 1364 # Convert list of dicts into a dict: {name: file} 1365 report += diff_config(old_config, new_config) 1366 1367 # Step 2: check differences between map files 1368 # List files from output directories 1369 old_map_path = old_phen_path / MAP_DIR 1370 new_map_path = new_phen_path / MAP_DIR 1371 report += diff_map_files(old_map_path, new_map_path) 1372 1373 # initialise report file 1374 _logger.debug(f"Writing to report file {str(report_path.resolve())}") 1375 report_file = open(report_path, "w") 1376 report_file.write(report) 1377 report_file.close() 1378 1379 _logger.info(f"Phenotypes diff'd successfully")
Compare the differences between two versions of a phenotype
1382def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str): 1383 # make tmp directory .acmc 1384 timestamp = time.strftime("%Y%m%d_%H%M%S") 1385 temp_dir = Path(f".acmc/diff_{timestamp}") 1386 1387 changed_phen_path = Path(phen_dir) 1388 if not changed_phen_path.exists(): 1389 raise ValueError( 1390 f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}" 1391 ) 1392 1393 old_phen_path = Path(old_phen_dir) 1394 if not old_phen_path.exists(): 1395 raise ValueError( 1396 f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}" 1397 ) 1398 1399 t_path = old_phen_path / "config.yml" 1400 with t_path.open("r") as file: 1401 c = yaml.safe_load(file) 1402 1403 try: 1404 # Create the directory 1405 temp_dir.mkdir(parents=True, exist_ok=True) 1406 _logger.debug(f"Temporary directory created: {temp_dir}") 1407 1408 # Create temporary directories 1409 changed_path = temp_dir / "changed" 1410 changed_path.mkdir(parents=True, exist_ok=True) 1411 old_path = temp_dir / "old" 1412 old_path.mkdir(parents=True, exist_ok=True) 1413 1414 # checkout changed 1415 if version == "latest": 1416 _logger.debug( 1417 f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..." 1418 ) 1419 shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True) 1420 else: 1421 _logger.debug( 1422 f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..." 1423 ) 1424 changed_repo = git.Repo.clone_from(changed_phen_path, changed_path) 1425 changed_repo.git.checkout(version) 1426 1427 # checkout old 1428 if old_version == "latest": 1429 _logger.debug( 1430 f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1431 ) 1432 shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True) 1433 else: 1434 _logger.debug( 1435 f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1436 ) 1437 old_repo = git.Repo.clone_from(old_phen_dir, old_path) 1438 old_repo.git.checkout(old_version) 1439 1440 report_filename = f"{version}_{old_version}_diff.md" 1441 report_path = changed_phen_path / report_filename 1442 # diff old with new 1443 diff_phen(changed_path, version, old_path, old_version, report_path) 1444 1445 finally: 1446 # clean up tmp directory 1447 if temp_dir.exists(): 1448 shutil.rmtree(temp_dir) 1449 print(f"Temporary directory removed: {temp_dir}")