acmc.phen
phenotype.py module
This module provides functionality for managing phenotypes.
1""" 2phenotype.py module 3 4This module provides functionality for managing phenotypes. 5""" 6 7import argparse 8import pandas as pd 9import numpy as np 10import json 11import os 12import sqlite3 13import sys 14import shutil 15import time 16import git 17import re 18import logging 19import requests 20import yaml 21import semver 22from git import Repo 23from cerberus import Validator # type: ignore 24from deepdiff import DeepDiff 25from pathlib import Path 26from urllib.parse import urlparse, urlunparse 27from typing import Tuple, Set, Any 28import acmc 29from acmc import trud, omop, parse, util, logging_config as lc 30 31# set up logging 32_logger = lc.setup_logger() 33 34pd.set_option("mode.chained_assignment", None) 35 36PHEN_DIR = "phen" 37"""Default phenotype directory name""" 38 39DEFAULT_PHEN_PATH = Path("./workspace") / PHEN_DIR 40"""Default phenotype directory path""" 41 42CONCEPTS_DIR = "concepts" 43"""Default concepts directory name""" 44 45MAP_DIR = "map" 46"""Default map directory name""" 47 48CONCEPT_SET_DIR = "concept-sets" 49"""Default concept set directory name""" 50 51CSV_PATH = Path(CONCEPT_SET_DIR) / "csv" 52"""Default CSV concept set directory path""" 53 54OMOP_PATH = Path(CONCEPT_SET_DIR) / "omop" 55"""Default OMOP concept set directory path""" 56 57DEFAULT_PHEN_DIR_LIST = [CONCEPTS_DIR, MAP_DIR, CONCEPT_SET_DIR] 58"""List of default phenotype directories""" 59 60CONFIG_FILE = "config.yml" 61"""Default configuration filename""" 62 63VOCAB_VERSION_FILE = "vocab_version.yml" 64"""Default vocabulary version filename""" 65 66SEMANTIC_VERSION_TYPES = ["major", "minor", "patch"] 67"""List of semantic version increment types""" 68 69DEFAULT_VERSION_INC = "patch" 70"""Default semantic version increment type""" 71 72DEFAULT_GIT_BRANCH = "main" 73"""Default phenotype repo branch name""" 74 75SPLIT_COL_ACTION = "split_col" 76"""Split column preprocessing action type""" 77 78CODES_COL_ACTION = "codes_col" 79"""Codes column preprocessing action type""" 80 81DIVIDE_COL_ACTION = "divide_col" 82"""Divide column preprocessing action type""" 83 84COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION] 85"""List of column preprocessing action types""" 86 87CODE_FILE_TYPES = [".xlsx", ".xls", ".csv"] 88"""List of supported source concept coding list file types""" 89 90# config.yaml schema 91CONFIG_SCHEMA = { 92 "phenotype": { 93 "type": "dict", 94 "required": True, 95 "schema": { 96 "version": { 97 "type": "string", 98 "required": True, 99 "regex": r"^\d+\.\d+\.\d+$", # Enforces 'vN.N.N' format 100 }, 101 "omop": { 102 "type": "dict", 103 "required": True, 104 "schema": { 105 "vocabulary_id": {"type": "string", "required": True}, 106 "vocabulary_name": {"type": "string", "required": True}, 107 "vocabulary_reference": { 108 "type": "string", 109 "required": True, 110 "regex": r"^https?://.*", # Ensures it's a URL 111 }, 112 }, 113 }, 114 "map": { 115 "type": "list", 116 "schema": { 117 "type": "string", 118 "allowed": list( 119 parse.SUPPORTED_CODE_TYPES 120 ), # Ensure only predefined values are allowed 121 }, 122 }, 123 "concept_sets": { 124 "type": "list", 125 "required": True, 126 "schema": { 127 "type": "dict", 128 "schema": { 129 "name": {"type": "string", "required": True}, 130 "file": { 131 "type": "dict", 132 "required": False, 133 "schema": { 134 "path": {"type": "string", "required": True}, 135 "columns": {"type": "dict", "required": True}, 136 "category": { 137 "type": "string" 138 }, # Optional but must be string if present 139 "actions": { 140 "type": "dict", 141 "schema": {"divide_col": {"type": "string"}}, 142 }, 143 }, 144 }, 145 }, 146 }, 147 }, 148 }, 149 } 150} 151"""Phenotype config.yml schema definition""" 152 153# "metadata": {"type": "dict", "required": True}, 154 155 156class PhenValidationException(Exception): 157 """Custom exception class raised when validation errors in phenotype configuration file""" 158 159 def __init__(self, message, validation_errors=None): 160 super().__init__(message) 161 self.validation_errors = validation_errors 162 163 164def _construct_git_url(remote_url: str): 165 """Constructs a git url for github or gitlab including a PAT token environment variable""" 166 # check the url 167 parsed_url = urlparse(remote_url) 168 169 # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd 170 # need to update this function if the URL scheme is different. 171 if "github.com" in parsed_url.netloc: 172 # get GitHub PAT from environment variable 173 auth = os.getenv("ACMC_GITHUB_PAT") 174 if not auth: 175 raise ValueError( 176 "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable." 177 ) 178 else: 179 # get GitLab PAT from environment variable 180 auth = os.getenv("ACMC_GITLAB_PAT") 181 if not auth: 182 raise ValueError( 183 "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable." 184 ) 185 auth = f"oauth2:{auth}" 186 187 # Construct the new URL with credentials 188 new_netloc = f"{auth}@{parsed_url.netloc}" 189 return urlunparse( 190 ( 191 parsed_url.scheme, 192 new_netloc, 193 parsed_url.path, 194 parsed_url.params, 195 parsed_url.query, 196 parsed_url.fragment, 197 ) 198 ) 199 200 201def _create_empty_git_dir(path: Path): 202 """Creates a directory with a .gitkeep file so that it's tracked in git""" 203 path.mkdir(exist_ok=True) 204 keep_path = path / ".gitkeep" 205 keep_path.touch(exist_ok=True) 206 207 208def _check_delete_dir(path: Path, msg: str) -> bool: 209 """Checks on the command line if a user wants to delete a directory 210 211 Args: 212 path (Path): path of the directory to be deleted 213 msg (str): message to be displayed to the user 214 215 Returns: 216 Boolean: True if deleted 217 """ 218 deleted = False 219 220 user_input = input(f"{msg}").strip().lower() 221 if user_input in ["yes", "y"]: 222 shutil.rmtree(path) 223 deleted = True 224 else: 225 _logger.info("Directory was not deleted.") 226 227 return deleted 228 229 230def init(phen_dir: str, remote_url: str): 231 """Initial phenotype directory as git repo with standard structure""" 232 _logger.info(f"Initialising Phenotype in directory: {phen_dir}") 233 phen_path = Path(phen_dir) 234 235 # check if directory already exists and ask user if they want to recreate it 236 if ( 237 phen_path.exists() and phen_path.is_dir() 238 ): # Check if it exists and is a directory 239 configure = _check_delete_dir( 240 phen_path, 241 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 242 ) 243 else: 244 configure = True 245 246 if not configure: 247 _logger.info(f"Exiting, phenotype not initiatised") 248 return 249 250 # Initialise repo from local or remote 251 repo: Repo 252 253 # if remote then clone the repo otherwise init a local repo 254 if remote_url != None: 255 # add PAT token to the URL 256 git_url = _construct_git_url(remote_url) 257 258 # clone the repo 259 git_cmd = git.cmd.Git() 260 git_cmd.clone(git_url, phen_path) 261 262 # open repo 263 repo = Repo(phen_path) 264 # check if there are any commits (new repo has no commits) 265 if ( 266 len(repo.branches) == 0 or repo.head.is_detached 267 ): # Handle detached HEAD (e.g., after init) 268 _logger.debug("The phen repository has no commits yet.") 269 commit_count = 0 270 else: 271 # Get the total number of commits in the default branch 272 commit_count = sum(1 for _ in repo.iter_commits()) 273 _logger.debug(f"Repo has previous commits: {commit_count}") 274 else: 275 # local repo, create the directories and init 276 phen_path.mkdir(parents=True, exist_ok=True) 277 _logger.debug(f"Phen directory '{phen_path}' has been created.") 278 repo = git.Repo.init(phen_path) 279 commit_count = 0 280 281 phen_path = phen_path.resolve() 282 # initialise empty repos 283 if commit_count == 0: 284 # create initial commit 285 initial_file_path = phen_path / "README.md" 286 with open(initial_file_path, "w") as file: 287 file.write( 288 "# Initial commit\nThis is the first commit in the phen repository.\n" 289 ) 290 repo.index.add([initial_file_path]) 291 repo.index.commit("Initial commit") 292 commit_count = 1 293 294 # Checkout the phens default branch, creating it if it does not exist 295 if DEFAULT_GIT_BRANCH in repo.branches: 296 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 297 main_branch.checkout() 298 else: 299 main_branch = repo.create_head(DEFAULT_GIT_BRANCH) 300 main_branch.checkout() 301 302 # if the phen path does not contain the config file then initialise the phen type 303 config_path = phen_path / CONFIG_FILE 304 if config_path.exists(): 305 _logger.debug(f"Phenotype configuration files already exist") 306 return 307 308 _logger.info("Creating phen directory structure and config files") 309 for d in DEFAULT_PHEN_DIR_LIST: 310 _create_empty_git_dir(phen_path / d) 311 312 # create empty phen config file 313 config = { 314 "phenotype": { 315 "version": "0.0.0", 316 "omop": { 317 "vocabulary_id": "", 318 "vocabulary_name": "", 319 "vocabulary_reference": "", 320 }, 321 "translate": [], 322 "concept_sets": [], 323 } 324 } 325 326 with open(phen_path / CONFIG_FILE, "w") as file: 327 yaml.dump( 328 config, 329 file, 330 Dumper=util.QuotedDumper, 331 default_flow_style=False, 332 sort_keys=False, 333 default_style='"', 334 ) 335 336 # add git ignore 337 ignore_content = """# Ignore SQLite database files 338*.db 339*.sqlite3 340 341# Ignore SQLite journal and metadata files 342*.db-journal 343*.sqlite3-journal 344 345# python 346.ipynb_checkpoints 347 """ 348 ignore_path = phen_path / ".gitignore" 349 with open(ignore_path, "w") as file: 350 file.write(ignore_content) 351 352 # add to git repo and commit 353 for d in DEFAULT_PHEN_DIR_LIST: 354 repo.git.add(phen_path / d) 355 repo.git.add(all=True) 356 repo.index.commit("initialised the phen git repo.") 357 358 _logger.info(f"Phenotype initialised successfully") 359 360 361def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str): 362 """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin" 363 364 Args: 365 phen_dir (str): local directory path where the upstream repo is to be cloned 366 upstream_url (str): url to the upstream repo 367 upstream_version (str): version in the upstream repo to clone 368 new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None. 369 370 Raises: 371 ValueError: if the specified version is not in the upstream repo 372 ValueError: if the upstream repo is not a valid phenotype repo 373 ValueError: if there's any other problems with Git 374 """ 375 _logger.info( 376 f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}" 377 ) 378 379 phen_path = Path(phen_dir) 380 # check if directory already exists and ask user if they want to recreate it 381 if ( 382 phen_path.exists() and phen_path.is_dir() 383 ): # Check if it exists and is a directory 384 configure = _check_delete_dir( 385 phen_path, 386 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 387 ) 388 else: 389 configure = True 390 391 if not configure: 392 _logger.info(f"Exiting, phenotype not initiatised") 393 return 394 395 try: 396 # Clone repo 397 git_url = _construct_git_url(upstream_url) 398 repo = git.Repo.clone_from(git_url, phen_path) 399 400 # Fetch all branches and tags 401 repo.remotes.origin.fetch() 402 403 # Check if the version exists 404 available_refs = [ref.name.split("/")[-1] for ref in repo.references] 405 if upstream_version not in available_refs: 406 raise ValueError( 407 f"Version '{upstream_version}' not found in the repository: {upstream_url}." 408 ) 409 410 # Checkout the specified version 411 repo.git.checkout(upstream_version) 412 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 413 main_branch.checkout() 414 415 # Check if 'config.yml' exists in the root directory 416 config_path = phen_path / "config.yml" 417 if not os.path.isfile(config_path): 418 raise ValueError( 419 f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory." 420 ) 421 422 # Validate the phenotype is compatible with the acmc tool 423 validate(str(phen_path.resolve())) 424 425 # Delete each tag locally 426 tags = repo.tags 427 for tag in tags: 428 repo.delete_tag(tag) 429 _logger.debug(f"Deleted tags from forked repo: {tag}") 430 431 # Add upstream remote 432 repo.create_remote("upstream", upstream_url) 433 remote = repo.remotes["origin"] 434 repo.delete_remote(remote) # Remove existing origin 435 436 # Optionally set a new origin remote 437 if new_origin_url: 438 git_url = _construct_git_url(new_origin_url) 439 repo.create_remote("origin", git_url) 440 repo.git.push("--set-upstream", "origin", "main") 441 442 _logger.info(f"Repository forked successfully at {phen_path}") 443 _logger.info(f"Upstream set to {upstream_url}") 444 if new_origin_url: 445 _logger.info(f"Origin set to {new_origin_url}") 446 447 except Exception as e: 448 if phen_path.exists(): 449 shutil.rmtree(phen_path) 450 raise ValueError(f"Error occurred during repository fork: {str(e)}") 451 452 453def validate(phen_dir: str): 454 """Validates the phenotype directory is a git repo with standard structure""" 455 _logger.info(f"Validating phenotype: {phen_dir}") 456 phen_path = Path(phen_dir) 457 if not phen_path.is_dir(): 458 raise NotADirectoryError( 459 f"Error: '{str(phen_path.resolve())}' is not a directory" 460 ) 461 462 config_path = phen_path / CONFIG_FILE 463 if not config_path.is_file(): 464 raise FileNotFoundError( 465 f"Error: phen configuration file '{config_path}' does not exist." 466 ) 467 468 concepts_path = phen_path / CONCEPTS_DIR 469 if not concepts_path.is_dir(): 470 raise FileNotFoundError( 471 f"Error: source concepts directory {concepts_path} does not exist." 472 ) 473 474 # Calidate the directory is a git repo 475 try: 476 git.Repo(phen_path) 477 except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError): 478 raise Exception(f"Phen directory {phen_path} is not a git repo") 479 480 # Load configuration File 481 if config_path.suffix == ".yml": 482 try: 483 with config_path.open("r") as file: 484 phenotype = yaml.safe_load(file) 485 486 validator = Validator(CONFIG_SCHEMA) 487 if validator.validate(phenotype): 488 _logger.debug("YAML structure is valid.") 489 else: 490 _logger.error(f"YAML structure validation failed: {validator.errors}") 491 raise Exception(f"YAML structure validation failed: {validator.errors}") 492 except yaml.YAMLError as e: 493 _logger.error(f"YAML syntax error: {e}") 494 raise e 495 else: 496 raise Exception( 497 f"Unsupported configuration filetype: {str(config_path.resolve())}" 498 ) 499 500 # initiatise 501 validation_errors = [] 502 phenotype = phenotype["phenotype"] 503 code_types = parse.CodeTypeParser().code_types 504 505 # check the version number is of the format vn.n.n 506 match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"]) 507 if not match: 508 validation_errors.append( 509 f"Invalid version format in configuration file: {phenotype['version']}" 510 ) 511 512 # create a list of all the concept set names defined in the concept set configuration 513 concept_set_names = [] 514 for item in phenotype["concept_sets"]: 515 if item["name"] in concept_set_names: 516 validation_errors.append( 517 f"Duplicate concept set defined in concept sets {item['name'] }" 518 ) 519 else: 520 concept_set_names.append(item["name"]) 521 522 # check codes definition 523 for item in phenotype["concept_sets"]: 524 # check concepte code file exists 525 concept_code_file_path = concepts_path / item["file"]["path"] 526 if not concept_code_file_path.exists(): 527 validation_errors.append( 528 f"Coding file {str(concept_code_file_path.resolve())} does not exist" 529 ) 530 531 # check concepte code file is not empty 532 if concept_code_file_path.stat().st_size == 0: 533 validation_errors.append( 534 f"Coding file {str(concept_code_file_path.resolve())} is an empty file" 535 ) 536 537 # check code file type is supported 538 if concept_code_file_path.suffix not in CODE_FILE_TYPES: 539 raise ValueError( 540 f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types" 541 ) 542 543 # check columns specified are a supported medical coding type 544 for column in item["file"]["columns"]: 545 if column not in code_types: 546 validation_errors.append( 547 f"Column type {column} for file {concept_code_file_path} is not supported" 548 ) 549 550 # check the actions are supported 551 if "actions" in item["file"]: 552 for action in item["file"]["actions"]: 553 if action not in COL_ACTIONS: 554 validation_errors.append(f"Action {action} is not supported") 555 556 if len(validation_errors) > 0: 557 _logger.error(validation_errors) 558 raise PhenValidationException( 559 f"Configuration file {str(config_path.resolve())} failed validation", 560 validation_errors, 561 ) 562 563 _logger.info(f"Phenotype validated successfully") 564 565 566def _read_table_file(path: Path, excel_sheet: str = ""): 567 """ 568 Load Code List File 569 """ 570 571 path = path.resolve() 572 if path.suffix == ".csv": 573 df = pd.read_csv(path, dtype=str) 574 elif path.suffix == ".xlsx" or path.suffix == ".xls": 575 if excel_sheet != "": 576 df = pd.read_excel(path, sheet_name=excel_sheet, dtype=str) 577 else: 578 df = pd.read_excel(path, dtype=str) 579 elif path.suffix == ".dta": 580 df = pd.read_stata(path) 581 else: 582 raise ValueError( 583 f"Unsupported filetype {path.suffix}, only support{CODE_FILE_TYPES} code file types" 584 ) 585 586 return df 587 588 589def _process_actions(df: pd.DataFrame, concept_set: dict) -> pd.DataFrame: 590 # Perform Structural Changes to file before preprocessing 591 _logger.debug("Processing file structural actions") 592 if ( 593 "actions" in concept_set["file"] 594 and "split_col" in concept_set["file"]["actions"] 595 and "codes_col" in concept_set["file"]["actions"] 596 ): 597 split_col = concept_set["file"]["actions"]["split_col"] 598 codes_col = concept_set["file"]["actions"]["codes_col"] 599 _logger.debug( 600 "Action: Splitting", 601 split_col, 602 "column into:", 603 df[split_col].unique(), 604 ) 605 codes = df[codes_col] 606 oh = pd.get_dummies(df[split_col], dtype=bool) # one hot encode 607 oh = oh.where((oh != True), codes, axis=0) # fill in 1s with codes 608 oh[oh == False] = np.nan # replace 0s with None 609 df = pd.concat([df, oh], axis=1) # merge in new columns 610 611 return df 612 613 614def _preprocess_source_concepts( 615 df: pd.DataFrame, concept_set: dict, code_file_path: Path 616) -> Tuple[pd.DataFrame, list]: 617 """Perform QA Checks on columns individually and append to df""" 618 out = pd.DataFrame([]) # create output df to append to 619 code_errors = [] # list of errors from processing 620 621 # remove unnamed columns due to extra commas, missing headers, or incorrect parsing 622 df = df.drop(columns=[col for col in df.columns if "Unnamed" in col]) 623 624 # Preprocess codes 625 code_types = parse.CodeTypeParser().code_types 626 for code_type in concept_set["file"]["columns"]: 627 parser = code_types[code_type] 628 _logger.info(f"Processing {code_type} codes for {code_file_path}") 629 630 # get codes by column name 631 source_col_name = concept_set["file"]["columns"][code_type] 632 codes = df[source_col_name].dropna() 633 codes = codes.astype(str) # convert to string 634 codes = codes.str.strip() # remove excess spaces 635 636 # process codes, validating them using parser and returning the errors 637 codes, errors = parser.process(codes, code_file_path) 638 if len(errors) > 0: 639 code_errors.extend(errors) 640 _logger.warning(f"Codes validation failed with {len(errors)} errors") 641 642 # add processed codes to df 643 new_col_name = f"{source_col_name}_SOURCE" 644 df = df.rename(columns={source_col_name: new_col_name}) 645 process_codes = pd.DataFrame({code_type: codes}).join(df) 646 out = pd.concat( 647 [out, process_codes], 648 ignore_index=True, 649 ) 650 651 _logger.debug(out.head()) 652 653 return out, code_errors 654 655 656# Translate Df with multiple codes into single code type Series 657def translate_codes( 658 source_df: pd.DataFrame, target_code_type: str, concept_name: str 659) -> pd.DataFrame: 660 """Translates each source code type the source coding list into a target type and returns all conversions as a concept set""" 661 662 # codes = pd.DataFrame([], dtype=str) 663 codes = pd.DataFrame( 664 columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string" 665 ) 666 # Convert codes to target type 667 _logger.info(f"Converting to target code type {target_code_type}") 668 669 for source_code_type in source_df.columns: 670 # if target code type is the same as thet source code type, no translation, just appending source as target 671 if source_code_type == target_code_type: 672 copy_df = pd.DataFrame( 673 { 674 "SOURCE_CONCEPT": source_df[source_code_type], 675 "SOURCE_CONCEPT_TYPE": source_code_type, 676 "CONCEPT": source_df[source_code_type], 677 } 678 ) 679 codes = pd.concat([codes, copy_df]) 680 _logger.debug( 681 f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating" 682 ) 683 else: 684 # get the translation filename using source to target code types 685 filename = f"{source_code_type}_to_{target_code_type}.parquet" 686 map_path = trud.PROCESSED_PATH / filename 687 688 # do the mapping if it exists 689 if map_path.exists(): 690 # get mapping 691 df_map = pd.read_parquet(map_path) 692 693 # do mapping 694 translated_df = pd.merge( 695 source_df[source_code_type], df_map, how="left" 696 ) 697 698 # normalise the output 699 translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"]) 700 translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type 701 702 # add to list of codes 703 codes = pd.concat([codes, translated_df]) 704 705 else: 706 _logger.warning( 707 f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" 708 ) 709 710 codes = codes.dropna() # delete NaNs 711 712 # added concept set type to output if any translations 713 if len(codes.index) > 0: 714 codes["CONCEPT_SET"] = concept_name 715 else: 716 _logger.debug(f"No codes converted with target code type {target_code_type}") 717 718 return codes 719 720 721def _write_code_errors(code_errors: list, code_errors_path: Path): 722 err_df = pd.DataFrame( 723 [ 724 { 725 "CONCEPT": ", ".join(err.codes[~err.mask].tolist()), 726 "VOCABULARY": err.code_type, 727 "SOURCE": err.codes_file, 728 "CAUSE": err.message, 729 } 730 for err in code_errors 731 ] 732 ) 733 734 err_df = err_df.drop_duplicates() # Remove Duplicates from Error file 735 err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"]) 736 err_df.to_csv(code_errors_path, index=False, mode="w") 737 738 739def write_vocab_version(phen_path: Path): 740 # write the vocab version files 741 742 if not trud.VERSION_PATH.exists(): 743 raise FileNotFoundError( 744 f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed" 745 ) 746 747 if not omop.VERSION_PATH.exists(): 748 raise FileNotFoundError( 749 f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed" 750 ) 751 752 with trud.VERSION_PATH.open("r") as file: 753 trud_version = yaml.safe_load(file) 754 755 with omop.VERSION_PATH.open("r") as file: 756 omop_version = yaml.safe_load(file) 757 758 # Create the combined YAML structure 759 version_data = { 760 "versions": { 761 "acmc": acmc.__version__, 762 "trud": trud_version, 763 "omop": omop_version, 764 } 765 } 766 767 with open(phen_path / VOCAB_VERSION_FILE, "w") as file: 768 yaml.dump( 769 version_data, 770 file, 771 Dumper=util.QuotedDumper, 772 default_flow_style=False, 773 sort_keys=False, 774 default_style='"', 775 ) 776 777 778def map(phen_dir: str, target_code_type: str): 779 _logger.info(f"Processing phenotype: {phen_dir}") 780 781 # Validate configuration 782 validate(phen_dir) 783 784 # initialise paths 785 phen_path = Path(phen_dir) 786 config_path = phen_path / CONFIG_FILE 787 788 # load configuration 789 with config_path.open("r") as file: 790 config = yaml.safe_load(file) 791 phenotype = config["phenotype"] 792 793 if len(phenotype["map"]) == 0: 794 raise ValueError(f"No map codes defined in the phenotype configuration") 795 796 if target_code_type is not None and target_code_type not in phenotype["map"]: 797 raise ValueError( 798 f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}" 799 ) 800 801 if target_code_type is not None: 802 _map_target_code_type(phen_path, phenotype, target_code_type) 803 else: 804 for t in phenotype["map"]: 805 _map_target_code_type(phen_path, phenotype, t) 806 807 _logger.info(f"Phenotype processed successfully") 808 809 810def _map_target_code_type(phen_path: Path, phenotype: dict, target_code_type: str): 811 _logger.debug(f"Target coding format: {target_code_type}") 812 concepts_path = phen_path / CONCEPTS_DIR 813 # Create output dataframe 814 out = pd.DataFrame([]) 815 code_errors = [] 816 817 # Process each folder in codes section 818 for concept_set in phenotype["concept_sets"]: 819 _logger.debug(f"--- {concept_set['file']} ---") 820 821 # Load code file 822 codes_file_path = Path(concepts_path / concept_set["file"]["path"]) 823 df = _read_table_file(codes_file_path) 824 825 # process structural actions 826 df = _process_actions(df, concept_set) 827 828 # preprocessing and validate of source concepts 829 _logger.debug("Processing and validating source concept codes") 830 df, errors = _preprocess_source_concepts( 831 df, 832 concept_set, 833 codes_file_path, 834 ) 835 836 # create df with just the source code columns 837 source_column_names = list(concept_set["file"]["columns"].keys()) 838 source_df = df[source_column_names] 839 840 _logger.debug(source_df.columns) 841 _logger.debug(source_df.head()) 842 843 _logger.debug( 844 f"Length of errors from _preprocess_source_concepts {len(errors)}" 845 ) 846 if len(errors) > 0: 847 code_errors.extend(errors) 848 _logger.debug(f" Length of code_errors {len(code_errors)}") 849 850 # Map source concepts codes to target codes 851 # if processing a source coding list with categorical data 852 if ( 853 "actions" in concept_set["file"] 854 and "divide_col" in concept_set["file"]["actions"] 855 and len(df) > 0 856 ): 857 divide_col = concept_set["file"]["actions"]["divide_col"] 858 _logger.debug(f"Action: Dividing Table by {divide_col}") 859 _logger.debug(f"column into: {df[divide_col].unique()}") 860 df_grp = df.groupby(divide_col) 861 for cat, grp in df_grp: 862 if cat == concept_set["file"]["category"]: 863 grp = grp.drop(columns=[divide_col]) # delete categorical column 864 source_df = grp[source_column_names] 865 trans_out = translate_codes( 866 source_df, 867 target_code_type=target_code_type, 868 concept_name=concept_set["name"], 869 ) 870 out = pd.concat([out, trans_out]) 871 else: 872 source_df = df[source_column_names] 873 trans_out = translate_codes( 874 source_df, 875 target_code_type=target_code_type, 876 concept_name=concept_set["name"], 877 ) 878 out = pd.concat([out, trans_out]) 879 880 if len(code_errors) > 0: 881 _logger.error(f"The map processing has {len(code_errors)} errors") 882 error_path = phen_path / MAP_DIR / "errors" 883 error_path.mkdir(parents=True, exist_ok=True) 884 error_filename = f"{target_code_type}-code-errors.csv" 885 _write_code_errors(code_errors, error_path / error_filename) 886 887 # Check there is output from processing 888 if len(out.index) == 0: 889 _logger.error(f"No output after map processing") 890 raise Exception( 891 f"No output after map processing, check config {str(phen_path.resolve())}" 892 ) 893 894 # final processing 895 out = out.reset_index(drop=True) 896 out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) 897 out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"]) 898 899 out_count = len(out.index) 900 # added metadata 901 # Loop over each source_concept_type and perform the left join on all columns apart from source code columns 902 result_list = [] 903 source_column_names = list(concept_set["file"]["columns"].keys()) 904 for source_concept_type in source_column_names: 905 # Filter output based on the current source_concept_type 906 out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type] 907 filtered_count = len(out_filtered_df.index) 908 909 # Remove the source type columns except the current type will leave the metadata and the join 910 remove_types = [ 911 type for type in source_column_names if type != source_concept_type 912 ] 913 metadata_df = df.drop(columns=remove_types) 914 metadata_df = metadata_df.rename( 915 columns={source_concept_type: "SOURCE_CONCEPT"} 916 ) 917 metadata_df_count = len(metadata_df.index) 918 919 # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata 920 result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT") 921 result_count = len(result.index) 922 923 _logger.debug( 924 f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}" 925 ) 926 927 # Append the result to the result_list 928 result_list.append(result) 929 930 # Concatenate all the results into a single DataFrame 931 final_out = pd.concat(result_list, ignore_index=True) 932 final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) 933 _logger.debug( 934 f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}" 935 ) 936 937 # Save output to map directory 938 output_filename = target_code_type + ".csv" 939 map_path = phen_path / MAP_DIR / output_filename 940 final_out.to_csv(map_path, index=False) 941 _logger.info(f"Saved mapped concepts to {str(map_path.resolve())}") 942 943 # save concept sets as separate files 944 concept_set_path = phen_path / CSV_PATH / target_code_type 945 946 # empty the concept-set directory except for hiddle files, e.g. .git 947 if concept_set_path.exists(): 948 for item in concept_set_path.iterdir(): 949 if not item.name.startswith("."): 950 item.unlink() 951 else: 952 concept_set_path.mkdir(parents=True, exist_ok=True) 953 954 # write each concept as a separate file 955 for name, concept in final_out.groupby("CONCEPT_SET"): 956 concept = concept.sort_values(by="CONCEPT") # sort rows 957 concept = concept.dropna(how="all", axis=1) # remove empty cols 958 concept = concept.reindex( 959 sorted(concept.columns), axis=1 960 ) # sort cols alphabetically 961 filename = f"{name}.csv" 962 concept_path = concept_set_path / filename 963 concept.to_csv(concept_path, index=False) 964 965 write_vocab_version(phen_path) 966 967 _logger.info(f"Phenotype processed target code type {target_code_type}") 968 969 970def _generate_version_tag( 971 repo: git.Repo, increment: str = DEFAULT_VERSION_INC, use_v_prefix: bool = False 972) -> str: 973 # Get all valid semantic version tags 974 versions = [] 975 for tag in repo.tags: 976 if tag.name.startswith("v"): 977 tag_name = tag.name[1:] # Remove the first character 978 else: 979 tag_name = tag.name 980 if semver.Version.is_valid(tag_name): 981 versions.append(semver.Version.parse(tag_name)) 982 983 _logger.debug(f"Versions: {versions}") 984 # Determine the next version 985 if not versions: 986 new_version = semver.Version(0, 0, 1) 987 else: 988 latest_version = max(versions) 989 if increment == "major": 990 new_version = latest_version.bump_major() 991 elif increment == "minor": 992 new_version = latest_version.bump_minor() 993 else: 994 new_version = latest_version.bump_patch() 995 996 # Create the new tag 997 new_version_str = f"v{new_version}" if use_v_prefix else str(new_version) 998 999 return new_version_str 1000 1001 1002def publish( 1003 phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC 1004): 1005 """Publishes updates to the phenotype by commiting all changes to the repo directory""" 1006 1007 # Validate config 1008 validate(phen_dir) 1009 phen_path = Path(phen_dir) 1010 1011 # load git repo and set the branch 1012 repo = git.Repo(phen_path) 1013 if DEFAULT_GIT_BRANCH in repo.branches: 1014 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 1015 main_branch.checkout() 1016 else: 1017 raise AttributeError( 1018 f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}" 1019 ) 1020 1021 # check if any changes to publish 1022 if not repo.is_dirty() and not repo.untracked_files: 1023 if remote_url is not None and "origin" not in repo.remotes: 1024 _logger.info(f"First publish to remote url {remote_url}") 1025 else: 1026 _logger.info("Nothing to publish, no changes to the repo") 1027 return 1028 1029 # get next version 1030 new_version_str = _generate_version_tag(repo, increment) 1031 _logger.info(f"New version: {new_version_str}") 1032 1033 # Write version in configuration file 1034 config_path = phen_path / CONFIG_FILE 1035 with config_path.open("r") as file: 1036 config = yaml.safe_load(file) 1037 1038 config["phenotype"]["version"] = new_version_str 1039 with open(config_path, "w") as file: 1040 yaml.dump( 1041 config, 1042 file, 1043 Dumper=util.QuotedDumper, 1044 default_flow_style=False, 1045 sort_keys=False, 1046 default_style='"', 1047 ) 1048 1049 # Add and commit changes to repo including version updates 1050 commit_message = f"Committing updates to phenotype {phen_path}" 1051 repo.git.add("--all") 1052 repo.index.commit(commit_message) 1053 1054 # Add tag to the repo 1055 repo.create_tag(new_version_str) 1056 1057 # push to origin if a remote repo 1058 if remote_url is not None and "origin" not in repo.remotes: 1059 git_url = _construct_git_url(remote_url) 1060 repo.create_remote("origin", git_url) 1061 1062 try: 1063 if "origin" in repo.remotes: 1064 _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}") 1065 origin = repo.remotes.origin 1066 _logger.info(f"Pushing main branch to remote repo") 1067 repo.git.push("--set-upstream", "origin", "main") 1068 _logger.info(f"Pushing version tags to remote git repo") 1069 origin.push(tags=True) 1070 _logger.debug("Changes pushed to 'origin'") 1071 else: 1072 _logger.debug("Remote 'origin' is not set") 1073 except Exception as e: 1074 tag_ref = repo.tags[new_version_str] 1075 repo.delete_tag(tag_ref) 1076 repo.git.reset("--soft", "HEAD~1") 1077 raise e 1078 1079 _logger.info(f"Phenotype published successfully") 1080 1081 1082def export(phen_dir: str, version: str): 1083 """Exports a phen repo at a specific tagged version into a target directory""" 1084 _logger.info(f"Exporting phenotype {phen_dir} at version {version}") 1085 1086 # validate configuration 1087 validate(phen_dir) 1088 phen_path = Path(phen_dir) 1089 1090 # load configuration 1091 config_path = phen_path / CONFIG_FILE 1092 with config_path.open("r") as file: 1093 config = yaml.safe_load(file) 1094 1095 map_path = phen_path / MAP_DIR 1096 if not map_path.exists(): 1097 _logger.warning(f"Map path does not exist '{map_path}'") 1098 1099 export_path = phen_path / OMOP_PATH 1100 # check export directory exists and if not create it 1101 if not export_path.exists(): 1102 export_path.mkdir(parents=True) 1103 _logger.debug(f"OMOP export directory '{export_path}' created.") 1104 1105 # omop export db 1106 export_db_path = omop.export( 1107 map_path, 1108 export_path, 1109 config["phenotype"]["version"], 1110 config["phenotype"]["omop"], 1111 ) 1112 1113 # write to tables 1114 # export as csv 1115 _logger.info(f"Phenotype exported successfully") 1116 1117 1118def copy(phen_dir: str, target_dir: str, version: str): 1119 """Copys a phen repo at a specific tagged version into a target directory""" 1120 1121 # Validate 1122 validate(phen_dir) 1123 phen_path = Path(phen_dir) 1124 1125 # Check target directory exists 1126 target_path = Path(target_dir) 1127 if not target_path.exists(): 1128 raise FileNotFoundError(f"The target directory {target_path} does not exist") 1129 1130 # Set copy directory 1131 copy_path = target_path / version 1132 _logger.info(f"Copying repo {phen_path} to {copy_path}") 1133 1134 if ( 1135 copy_path.exists() and copy_path.is_dir() 1136 ): # Check if it exists and is a directory 1137 copy = _check_delete_dir( 1138 copy_path, 1139 f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ", 1140 ) 1141 else: 1142 copy = True 1143 1144 if not copy: 1145 _logger.info(f"Not copying the version {version}") 1146 return 1147 1148 _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...") 1149 repo = git.Repo.clone_from(phen_path, copy_path) 1150 1151 # Check out the latest commit or specified version 1152 if version: 1153 # Checkout a specific version (e.g., branch, tag, or commit hash) 1154 _logger.info(f"Checking out version {version}...") 1155 repo.git.checkout(version) 1156 else: 1157 # Checkout the latest commit (HEAD) 1158 _logger.info(f"Checking out the latest commit...") 1159 repo.git.checkout("HEAD") 1160 1161 _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}") 1162 1163 _logger.info(f"Phenotype copied successfully") 1164 1165 1166# Convert concept_sets list into dictionaries 1167def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]: 1168 """Extracts concepts as {name: file_path} dictionary and a name set.""" 1169 concepts_dict = { 1170 item["name"]: item["file"]["path"] 1171 for item in config_data["phenotype"]["concept_sets"] 1172 } 1173 name_set = set(concepts_dict.keys()) 1174 return concepts_dict, name_set 1175 1176 1177def _extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]: 1178 """ 1179 Extracts clean keys from a DeepDiff dictionary. 1180 1181 :param diff: DeepDiff result dictionary 1182 :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed") 1183 :return: A set of clean key names 1184 """ 1185 return {key.split("root['")[1].split("']")[0] for key in diff.get(key_type, [])} 1186 1187 1188def diff_config(old_config: dict, new_config: dict) -> str: 1189 report = f"\n# Changes to phenotype configuration\n" 1190 report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n" 1191 1192 old_concepts, old_names = extract_concepts(old_config) 1193 new_concepts, new_names = extract_concepts(new_config) 1194 1195 # Check added and removed names 1196 added_names = new_names - old_names # Names that appear in new but not in old 1197 removed_names = old_names - new_names # Names that were in old but not in new 1198 1199 # find file path changes for unchanged names 1200 unchanged_names = old_names & new_names # Names that exist in both 1201 file_diff = DeepDiff( 1202 {name: old_concepts[name] for name in unchanged_names}, 1203 {name: new_concepts[name] for name in unchanged_names}, 1204 ) 1205 1206 # Find renamed concepts (same file, different name) 1207 renamed_concepts = [] 1208 for removed in removed_names: 1209 old_path = old_concepts[removed] 1210 for added in added_names: 1211 new_path = new_concepts[added] 1212 if old_path == new_path: 1213 renamed_concepts.append((removed, added)) 1214 1215 # Remove renamed concepts from added and removed sets 1216 for old_name, new_name in renamed_concepts: 1217 added_names.discard(new_name) 1218 removed_names.discard(old_name) 1219 1220 # generate config report 1221 if added_names: 1222 report += "## Added Concepts\n" 1223 for name in added_names: 1224 report += f"- `{name}` (File: `{new_concepts[name]}`)\n" 1225 report += "\n" 1226 1227 if removed_names: 1228 report += "## Removed Concepts\n" 1229 for name in removed_names: 1230 report += f"- `{name}` (File: `{old_concepts[name]}`)\n" 1231 report += "\n" 1232 1233 if renamed_concepts: 1234 report += "## Renamed Concepts\n" 1235 for old_name, new_name in renamed_concepts: 1236 report += ( 1237 f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n" 1238 ) 1239 report += "\n" 1240 1241 if "values_changed" in file_diff: 1242 report += "## Updated File Paths\n" 1243 for name, change in file_diff["values_changed"].items(): 1244 old_file = change["old_value"] 1245 new_file = change["new_value"] 1246 clean_name = name.split("root['")[1].split("']")[0] 1247 report += ( 1248 f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n" 1249 ) 1250 report += "\n" 1251 1252 if not ( 1253 added_names 1254 or removed_names 1255 or renamed_concepts 1256 or file_diff.get("values_changed") 1257 ): 1258 report += "No changes in concept sets.\n" 1259 1260 return report 1261 1262 1263def diff_map_files(old_map_path: Path, new_map_path: Path) -> str: 1264 old_output_files = [ 1265 file.name 1266 for file in old_map_path.iterdir() 1267 if file.is_file() and not file.name.startswith(".") 1268 ] 1269 new_output_files = [ 1270 file.name 1271 for file in new_map_path.iterdir() 1272 if file.is_file() and not file.name.startswith(".") 1273 ] 1274 1275 # Convert the lists to sets for easy comparison 1276 old_output_set = set(old_output_files) 1277 new_output_set = set(new_output_files) 1278 1279 # Outputs that are in old_output_set but not in new_output_set (removed files) 1280 removed_outputs = old_output_set - new_output_set 1281 # Outputs that are in new_output_set but not in old_output_set (added files) 1282 added_outputs = new_output_set - old_output_set 1283 # Outputs that are the intersection of old_output_set and new_output_set 1284 common_outputs = old_output_set & new_output_set 1285 1286 report = f"\n# Changes to available translations\n" 1287 report += f"This compares the coding translations files available.\n\n" 1288 report += f"- Removed outputs: {sorted(list(removed_outputs))}\n" 1289 report += f"- Added outputs: {sorted(list(added_outputs))}\n" 1290 report += f"- Common outputs: {sorted(list(common_outputs))}\n\n" 1291 1292 # Step N: Compare common outputs between versions 1293 report += f"# Changes to concepts in translation files\n\n" 1294 report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n" 1295 for file in common_outputs: 1296 old_output = old_map_path / file 1297 new_output = new_map_path / file 1298 1299 _logger.debug(f"Old ouptput: {str(old_output.resolve())}") 1300 _logger.debug(f"New ouptput: {str(new_output.resolve())}") 1301 1302 df1 = pd.read_csv(old_output) 1303 df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1304 df2 = pd.read_csv(new_output) 1305 df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1306 1307 # Check for added and removed concepts 1308 report += f"- File {file}\n" 1309 sorted_list = sorted(list(set(df1.index) - set(df2.index))) 1310 report += f"- Removed concepts {sorted_list}\n" 1311 sorted_list = sorted(list(set(df2.index) - set(df1.index))) 1312 report += f"- Added concepts {sorted_list}\n" 1313 1314 # Check for changed concepts 1315 diff = df2 - df1 # diff in counts 1316 diff = diff[ 1317 (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna() 1318 ] # get non-zero counts 1319 s = "\n" 1320 if len(diff.index) > 0: 1321 for concept, row in diff.iterrows(): 1322 s += "\t - {} {}\n".format(concept, row["CONCEPT"]) 1323 report += f"- Changed concepts {s}\n\n" 1324 else: 1325 report += f"- Changed concepts []\n\n" 1326 1327 return report 1328 1329 1330def diff_phen( 1331 new_phen_path: Path, 1332 new_version: str, 1333 old_phen_path: Path, 1334 old_version: str, 1335 report_path: Path, 1336): 1337 """Compare the differences between two versions of a phenotype""" 1338 1339 # validate phenotypes 1340 _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}") 1341 validate(str(old_phen_path.resolve())) 1342 _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}") 1343 validate(str(new_phen_path.resolve())) 1344 1345 # get old and new config 1346 old_config_path = old_phen_path / CONFIG_FILE 1347 with old_config_path.open("r") as file: 1348 old_config = yaml.safe_load(file) 1349 new_config_path = new_phen_path / CONFIG_FILE 1350 with new_config_path.open("r") as file: 1351 new_config = yaml.safe_load(file) 1352 1353 # write report heading 1354 report = f"# Phenotype Comparison Report\n" 1355 report += f"## Original phenotype\n" 1356 report += f" - {old_config['phenotype']['omop']['vocabulary_id']}\n" 1357 report += f" - {old_version}\n" 1358 report += f" - {str(old_phen_path.resolve())}\n" 1359 report += f"## Changed phenotype:\n" 1360 report += f" - {new_config['phenotype']['omop']['vocabulary_id']}\n" 1361 report += f" - {new_version}\n" 1362 report += f" - {str(new_phen_path.resolve())}\n" 1363 1364 # Step 1: check differences configuration files 1365 # Convert list of dicts into a dict: {name: file} 1366 report += diff_config(old_config, new_config) 1367 1368 # Step 2: check differences between map files 1369 # List files from output directories 1370 old_map_path = old_phen_path / MAP_DIR 1371 new_map_path = new_phen_path / MAP_DIR 1372 report += diff_map_files(old_map_path, new_map_path) 1373 1374 # initialise report file 1375 _logger.debug(f"Writing to report file {str(report_path.resolve())}") 1376 report_file = open(report_path, "w") 1377 report_file.write(report) 1378 report_file.close() 1379 1380 _logger.info(f"Phenotypes diff'd successfully") 1381 1382 1383def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str): 1384 # make tmp directory .acmc 1385 timestamp = time.strftime("%Y%m%d_%H%M%S") 1386 temp_dir = Path(f".acmc/diff_{timestamp}") 1387 1388 changed_phen_path = Path(phen_dir) 1389 if not changed_phen_path.exists(): 1390 raise ValueError( 1391 f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}" 1392 ) 1393 1394 old_phen_path = Path(old_phen_dir) 1395 if not old_phen_path.exists(): 1396 raise ValueError( 1397 f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}" 1398 ) 1399 1400 t_path = old_phen_path / "config.yml" 1401 with t_path.open("r") as file: 1402 c = yaml.safe_load(file) 1403 1404 try: 1405 # Create the directory 1406 temp_dir.mkdir(parents=True, exist_ok=True) 1407 _logger.debug(f"Temporary directory created: {temp_dir}") 1408 1409 # Create temporary directories 1410 changed_path = temp_dir / "changed" 1411 changed_path.mkdir(parents=True, exist_ok=True) 1412 old_path = temp_dir / "old" 1413 old_path.mkdir(parents=True, exist_ok=True) 1414 1415 # checkout changed 1416 if version == "latest": 1417 _logger.debug( 1418 f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..." 1419 ) 1420 shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True) 1421 else: 1422 _logger.debug( 1423 f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..." 1424 ) 1425 changed_repo = git.Repo.clone_from(changed_phen_path, changed_path) 1426 changed_repo.git.checkout(version) 1427 1428 # checkout old 1429 if old_version == "latest": 1430 _logger.debug( 1431 f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1432 ) 1433 shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True) 1434 else: 1435 _logger.debug( 1436 f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1437 ) 1438 old_repo = git.Repo.clone_from(old_phen_dir, old_path) 1439 old_repo.git.checkout(old_version) 1440 1441 report_filename = f"{version}_{old_version}_diff.md" 1442 report_path = changed_phen_path / report_filename 1443 # diff old with new 1444 diff_phen(changed_path, version, old_path, old_version, report_path) 1445 1446 finally: 1447 # clean up tmp directory 1448 if temp_dir.exists(): 1449 shutil.rmtree(temp_dir) 1450 print(f"Temporary directory removed: {temp_dir}")
Default phenotype directory name
Default phenotype directory path
Default concepts directory name
Default map directory name
Default concept set directory name
Default CSV concept set directory path
Default OMOP concept set directory path
List of default phenotype directories
Default configuration filename
Default vocabulary version filename
List of semantic version increment types
Default semantic version increment type
Default phenotype repo branch name
Split column preprocessing action type
Codes column preprocessing action type
Divide column preprocessing action type
List of column preprocessing action types
List of supported source concept coding list file types
Phenotype config.yml schema definition
157class PhenValidationException(Exception): 158 """Custom exception class raised when validation errors in phenotype configuration file""" 159 160 def __init__(self, message, validation_errors=None): 161 super().__init__(message) 162 self.validation_errors = validation_errors
Custom exception class raised when validation errors in phenotype configuration file
231def init(phen_dir: str, remote_url: str): 232 """Initial phenotype directory as git repo with standard structure""" 233 _logger.info(f"Initialising Phenotype in directory: {phen_dir}") 234 phen_path = Path(phen_dir) 235 236 # check if directory already exists and ask user if they want to recreate it 237 if ( 238 phen_path.exists() and phen_path.is_dir() 239 ): # Check if it exists and is a directory 240 configure = _check_delete_dir( 241 phen_path, 242 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 243 ) 244 else: 245 configure = True 246 247 if not configure: 248 _logger.info(f"Exiting, phenotype not initiatised") 249 return 250 251 # Initialise repo from local or remote 252 repo: Repo 253 254 # if remote then clone the repo otherwise init a local repo 255 if remote_url != None: 256 # add PAT token to the URL 257 git_url = _construct_git_url(remote_url) 258 259 # clone the repo 260 git_cmd = git.cmd.Git() 261 git_cmd.clone(git_url, phen_path) 262 263 # open repo 264 repo = Repo(phen_path) 265 # check if there are any commits (new repo has no commits) 266 if ( 267 len(repo.branches) == 0 or repo.head.is_detached 268 ): # Handle detached HEAD (e.g., after init) 269 _logger.debug("The phen repository has no commits yet.") 270 commit_count = 0 271 else: 272 # Get the total number of commits in the default branch 273 commit_count = sum(1 for _ in repo.iter_commits()) 274 _logger.debug(f"Repo has previous commits: {commit_count}") 275 else: 276 # local repo, create the directories and init 277 phen_path.mkdir(parents=True, exist_ok=True) 278 _logger.debug(f"Phen directory '{phen_path}' has been created.") 279 repo = git.Repo.init(phen_path) 280 commit_count = 0 281 282 phen_path = phen_path.resolve() 283 # initialise empty repos 284 if commit_count == 0: 285 # create initial commit 286 initial_file_path = phen_path / "README.md" 287 with open(initial_file_path, "w") as file: 288 file.write( 289 "# Initial commit\nThis is the first commit in the phen repository.\n" 290 ) 291 repo.index.add([initial_file_path]) 292 repo.index.commit("Initial commit") 293 commit_count = 1 294 295 # Checkout the phens default branch, creating it if it does not exist 296 if DEFAULT_GIT_BRANCH in repo.branches: 297 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 298 main_branch.checkout() 299 else: 300 main_branch = repo.create_head(DEFAULT_GIT_BRANCH) 301 main_branch.checkout() 302 303 # if the phen path does not contain the config file then initialise the phen type 304 config_path = phen_path / CONFIG_FILE 305 if config_path.exists(): 306 _logger.debug(f"Phenotype configuration files already exist") 307 return 308 309 _logger.info("Creating phen directory structure and config files") 310 for d in DEFAULT_PHEN_DIR_LIST: 311 _create_empty_git_dir(phen_path / d) 312 313 # create empty phen config file 314 config = { 315 "phenotype": { 316 "version": "0.0.0", 317 "omop": { 318 "vocabulary_id": "", 319 "vocabulary_name": "", 320 "vocabulary_reference": "", 321 }, 322 "translate": [], 323 "concept_sets": [], 324 } 325 } 326 327 with open(phen_path / CONFIG_FILE, "w") as file: 328 yaml.dump( 329 config, 330 file, 331 Dumper=util.QuotedDumper, 332 default_flow_style=False, 333 sort_keys=False, 334 default_style='"', 335 ) 336 337 # add git ignore 338 ignore_content = """# Ignore SQLite database files 339*.db 340*.sqlite3 341 342# Ignore SQLite journal and metadata files 343*.db-journal 344*.sqlite3-journal 345 346# python 347.ipynb_checkpoints 348 """ 349 ignore_path = phen_path / ".gitignore" 350 with open(ignore_path, "w") as file: 351 file.write(ignore_content) 352 353 # add to git repo and commit 354 for d in DEFAULT_PHEN_DIR_LIST: 355 repo.git.add(phen_path / d) 356 repo.git.add(all=True) 357 repo.index.commit("initialised the phen git repo.") 358 359 _logger.info(f"Phenotype initialised successfully")
Initial phenotype directory as git repo with standard structure
362def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str): 363 """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin" 364 365 Args: 366 phen_dir (str): local directory path where the upstream repo is to be cloned 367 upstream_url (str): url to the upstream repo 368 upstream_version (str): version in the upstream repo to clone 369 new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None. 370 371 Raises: 372 ValueError: if the specified version is not in the upstream repo 373 ValueError: if the upstream repo is not a valid phenotype repo 374 ValueError: if there's any other problems with Git 375 """ 376 _logger.info( 377 f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}" 378 ) 379 380 phen_path = Path(phen_dir) 381 # check if directory already exists and ask user if they want to recreate it 382 if ( 383 phen_path.exists() and phen_path.is_dir() 384 ): # Check if it exists and is a directory 385 configure = _check_delete_dir( 386 phen_path, 387 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 388 ) 389 else: 390 configure = True 391 392 if not configure: 393 _logger.info(f"Exiting, phenotype not initiatised") 394 return 395 396 try: 397 # Clone repo 398 git_url = _construct_git_url(upstream_url) 399 repo = git.Repo.clone_from(git_url, phen_path) 400 401 # Fetch all branches and tags 402 repo.remotes.origin.fetch() 403 404 # Check if the version exists 405 available_refs = [ref.name.split("/")[-1] for ref in repo.references] 406 if upstream_version not in available_refs: 407 raise ValueError( 408 f"Version '{upstream_version}' not found in the repository: {upstream_url}." 409 ) 410 411 # Checkout the specified version 412 repo.git.checkout(upstream_version) 413 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 414 main_branch.checkout() 415 416 # Check if 'config.yml' exists in the root directory 417 config_path = phen_path / "config.yml" 418 if not os.path.isfile(config_path): 419 raise ValueError( 420 f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory." 421 ) 422 423 # Validate the phenotype is compatible with the acmc tool 424 validate(str(phen_path.resolve())) 425 426 # Delete each tag locally 427 tags = repo.tags 428 for tag in tags: 429 repo.delete_tag(tag) 430 _logger.debug(f"Deleted tags from forked repo: {tag}") 431 432 # Add upstream remote 433 repo.create_remote("upstream", upstream_url) 434 remote = repo.remotes["origin"] 435 repo.delete_remote(remote) # Remove existing origin 436 437 # Optionally set a new origin remote 438 if new_origin_url: 439 git_url = _construct_git_url(new_origin_url) 440 repo.create_remote("origin", git_url) 441 repo.git.push("--set-upstream", "origin", "main") 442 443 _logger.info(f"Repository forked successfully at {phen_path}") 444 _logger.info(f"Upstream set to {upstream_url}") 445 if new_origin_url: 446 _logger.info(f"Origin set to {new_origin_url}") 447 448 except Exception as e: 449 if phen_path.exists(): 450 shutil.rmtree(phen_path) 451 raise ValueError(f"Error occurred during repository fork: {str(e)}")
Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
Arguments:
- phen_dir (str): local directory path where the upstream repo is to be cloned
- upstream_url (str): url to the upstream repo
- upstream_version (str): version in the upstream repo to clone
- new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
Raises:
- ValueError: if the specified version is not in the upstream repo
- ValueError: if the upstream repo is not a valid phenotype repo
- ValueError: if there's any other problems with Git
454def validate(phen_dir: str): 455 """Validates the phenotype directory is a git repo with standard structure""" 456 _logger.info(f"Validating phenotype: {phen_dir}") 457 phen_path = Path(phen_dir) 458 if not phen_path.is_dir(): 459 raise NotADirectoryError( 460 f"Error: '{str(phen_path.resolve())}' is not a directory" 461 ) 462 463 config_path = phen_path / CONFIG_FILE 464 if not config_path.is_file(): 465 raise FileNotFoundError( 466 f"Error: phen configuration file '{config_path}' does not exist." 467 ) 468 469 concepts_path = phen_path / CONCEPTS_DIR 470 if not concepts_path.is_dir(): 471 raise FileNotFoundError( 472 f"Error: source concepts directory {concepts_path} does not exist." 473 ) 474 475 # Calidate the directory is a git repo 476 try: 477 git.Repo(phen_path) 478 except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError): 479 raise Exception(f"Phen directory {phen_path} is not a git repo") 480 481 # Load configuration File 482 if config_path.suffix == ".yml": 483 try: 484 with config_path.open("r") as file: 485 phenotype = yaml.safe_load(file) 486 487 validator = Validator(CONFIG_SCHEMA) 488 if validator.validate(phenotype): 489 _logger.debug("YAML structure is valid.") 490 else: 491 _logger.error(f"YAML structure validation failed: {validator.errors}") 492 raise Exception(f"YAML structure validation failed: {validator.errors}") 493 except yaml.YAMLError as e: 494 _logger.error(f"YAML syntax error: {e}") 495 raise e 496 else: 497 raise Exception( 498 f"Unsupported configuration filetype: {str(config_path.resolve())}" 499 ) 500 501 # initiatise 502 validation_errors = [] 503 phenotype = phenotype["phenotype"] 504 code_types = parse.CodeTypeParser().code_types 505 506 # check the version number is of the format vn.n.n 507 match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"]) 508 if not match: 509 validation_errors.append( 510 f"Invalid version format in configuration file: {phenotype['version']}" 511 ) 512 513 # create a list of all the concept set names defined in the concept set configuration 514 concept_set_names = [] 515 for item in phenotype["concept_sets"]: 516 if item["name"] in concept_set_names: 517 validation_errors.append( 518 f"Duplicate concept set defined in concept sets {item['name'] }" 519 ) 520 else: 521 concept_set_names.append(item["name"]) 522 523 # check codes definition 524 for item in phenotype["concept_sets"]: 525 # check concepte code file exists 526 concept_code_file_path = concepts_path / item["file"]["path"] 527 if not concept_code_file_path.exists(): 528 validation_errors.append( 529 f"Coding file {str(concept_code_file_path.resolve())} does not exist" 530 ) 531 532 # check concepte code file is not empty 533 if concept_code_file_path.stat().st_size == 0: 534 validation_errors.append( 535 f"Coding file {str(concept_code_file_path.resolve())} is an empty file" 536 ) 537 538 # check code file type is supported 539 if concept_code_file_path.suffix not in CODE_FILE_TYPES: 540 raise ValueError( 541 f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types" 542 ) 543 544 # check columns specified are a supported medical coding type 545 for column in item["file"]["columns"]: 546 if column not in code_types: 547 validation_errors.append( 548 f"Column type {column} for file {concept_code_file_path} is not supported" 549 ) 550 551 # check the actions are supported 552 if "actions" in item["file"]: 553 for action in item["file"]["actions"]: 554 if action not in COL_ACTIONS: 555 validation_errors.append(f"Action {action} is not supported") 556 557 if len(validation_errors) > 0: 558 _logger.error(validation_errors) 559 raise PhenValidationException( 560 f"Configuration file {str(config_path.resolve())} failed validation", 561 validation_errors, 562 ) 563 564 _logger.info(f"Phenotype validated successfully")
Validates the phenotype directory is a git repo with standard structure
658def translate_codes( 659 source_df: pd.DataFrame, target_code_type: str, concept_name: str 660) -> pd.DataFrame: 661 """Translates each source code type the source coding list into a target type and returns all conversions as a concept set""" 662 663 # codes = pd.DataFrame([], dtype=str) 664 codes = pd.DataFrame( 665 columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string" 666 ) 667 # Convert codes to target type 668 _logger.info(f"Converting to target code type {target_code_type}") 669 670 for source_code_type in source_df.columns: 671 # if target code type is the same as thet source code type, no translation, just appending source as target 672 if source_code_type == target_code_type: 673 copy_df = pd.DataFrame( 674 { 675 "SOURCE_CONCEPT": source_df[source_code_type], 676 "SOURCE_CONCEPT_TYPE": source_code_type, 677 "CONCEPT": source_df[source_code_type], 678 } 679 ) 680 codes = pd.concat([codes, copy_df]) 681 _logger.debug( 682 f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating" 683 ) 684 else: 685 # get the translation filename using source to target code types 686 filename = f"{source_code_type}_to_{target_code_type}.parquet" 687 map_path = trud.PROCESSED_PATH / filename 688 689 # do the mapping if it exists 690 if map_path.exists(): 691 # get mapping 692 df_map = pd.read_parquet(map_path) 693 694 # do mapping 695 translated_df = pd.merge( 696 source_df[source_code_type], df_map, how="left" 697 ) 698 699 # normalise the output 700 translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"]) 701 translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type 702 703 # add to list of codes 704 codes = pd.concat([codes, translated_df]) 705 706 else: 707 _logger.warning( 708 f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" 709 ) 710 711 codes = codes.dropna() # delete NaNs 712 713 # added concept set type to output if any translations 714 if len(codes.index) > 0: 715 codes["CONCEPT_SET"] = concept_name 716 else: 717 _logger.debug(f"No codes converted with target code type {target_code_type}") 718 719 return codes
Translates each source code type the source coding list into a target type and returns all conversions as a concept set
740def write_vocab_version(phen_path: Path): 741 # write the vocab version files 742 743 if not trud.VERSION_PATH.exists(): 744 raise FileNotFoundError( 745 f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed" 746 ) 747 748 if not omop.VERSION_PATH.exists(): 749 raise FileNotFoundError( 750 f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed" 751 ) 752 753 with trud.VERSION_PATH.open("r") as file: 754 trud_version = yaml.safe_load(file) 755 756 with omop.VERSION_PATH.open("r") as file: 757 omop_version = yaml.safe_load(file) 758 759 # Create the combined YAML structure 760 version_data = { 761 "versions": { 762 "acmc": acmc.__version__, 763 "trud": trud_version, 764 "omop": omop_version, 765 } 766 } 767 768 with open(phen_path / VOCAB_VERSION_FILE, "w") as file: 769 yaml.dump( 770 version_data, 771 file, 772 Dumper=util.QuotedDumper, 773 default_flow_style=False, 774 sort_keys=False, 775 default_style='"', 776 )
779def map(phen_dir: str, target_code_type: str): 780 _logger.info(f"Processing phenotype: {phen_dir}") 781 782 # Validate configuration 783 validate(phen_dir) 784 785 # initialise paths 786 phen_path = Path(phen_dir) 787 config_path = phen_path / CONFIG_FILE 788 789 # load configuration 790 with config_path.open("r") as file: 791 config = yaml.safe_load(file) 792 phenotype = config["phenotype"] 793 794 if len(phenotype["map"]) == 0: 795 raise ValueError(f"No map codes defined in the phenotype configuration") 796 797 if target_code_type is not None and target_code_type not in phenotype["map"]: 798 raise ValueError( 799 f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}" 800 ) 801 802 if target_code_type is not None: 803 _map_target_code_type(phen_path, phenotype, target_code_type) 804 else: 805 for t in phenotype["map"]: 806 _map_target_code_type(phen_path, phenotype, t) 807 808 _logger.info(f"Phenotype processed successfully")
1003def publish( 1004 phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC 1005): 1006 """Publishes updates to the phenotype by commiting all changes to the repo directory""" 1007 1008 # Validate config 1009 validate(phen_dir) 1010 phen_path = Path(phen_dir) 1011 1012 # load git repo and set the branch 1013 repo = git.Repo(phen_path) 1014 if DEFAULT_GIT_BRANCH in repo.branches: 1015 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 1016 main_branch.checkout() 1017 else: 1018 raise AttributeError( 1019 f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}" 1020 ) 1021 1022 # check if any changes to publish 1023 if not repo.is_dirty() and not repo.untracked_files: 1024 if remote_url is not None and "origin" not in repo.remotes: 1025 _logger.info(f"First publish to remote url {remote_url}") 1026 else: 1027 _logger.info("Nothing to publish, no changes to the repo") 1028 return 1029 1030 # get next version 1031 new_version_str = _generate_version_tag(repo, increment) 1032 _logger.info(f"New version: {new_version_str}") 1033 1034 # Write version in configuration file 1035 config_path = phen_path / CONFIG_FILE 1036 with config_path.open("r") as file: 1037 config = yaml.safe_load(file) 1038 1039 config["phenotype"]["version"] = new_version_str 1040 with open(config_path, "w") as file: 1041 yaml.dump( 1042 config, 1043 file, 1044 Dumper=util.QuotedDumper, 1045 default_flow_style=False, 1046 sort_keys=False, 1047 default_style='"', 1048 ) 1049 1050 # Add and commit changes to repo including version updates 1051 commit_message = f"Committing updates to phenotype {phen_path}" 1052 repo.git.add("--all") 1053 repo.index.commit(commit_message) 1054 1055 # Add tag to the repo 1056 repo.create_tag(new_version_str) 1057 1058 # push to origin if a remote repo 1059 if remote_url is not None and "origin" not in repo.remotes: 1060 git_url = _construct_git_url(remote_url) 1061 repo.create_remote("origin", git_url) 1062 1063 try: 1064 if "origin" in repo.remotes: 1065 _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}") 1066 origin = repo.remotes.origin 1067 _logger.info(f"Pushing main branch to remote repo") 1068 repo.git.push("--set-upstream", "origin", "main") 1069 _logger.info(f"Pushing version tags to remote git repo") 1070 origin.push(tags=True) 1071 _logger.debug("Changes pushed to 'origin'") 1072 else: 1073 _logger.debug("Remote 'origin' is not set") 1074 except Exception as e: 1075 tag_ref = repo.tags[new_version_str] 1076 repo.delete_tag(tag_ref) 1077 repo.git.reset("--soft", "HEAD~1") 1078 raise e 1079 1080 _logger.info(f"Phenotype published successfully")
Publishes updates to the phenotype by commiting all changes to the repo directory
1083def export(phen_dir: str, version: str): 1084 """Exports a phen repo at a specific tagged version into a target directory""" 1085 _logger.info(f"Exporting phenotype {phen_dir} at version {version}") 1086 1087 # validate configuration 1088 validate(phen_dir) 1089 phen_path = Path(phen_dir) 1090 1091 # load configuration 1092 config_path = phen_path / CONFIG_FILE 1093 with config_path.open("r") as file: 1094 config = yaml.safe_load(file) 1095 1096 map_path = phen_path / MAP_DIR 1097 if not map_path.exists(): 1098 _logger.warning(f"Map path does not exist '{map_path}'") 1099 1100 export_path = phen_path / OMOP_PATH 1101 # check export directory exists and if not create it 1102 if not export_path.exists(): 1103 export_path.mkdir(parents=True) 1104 _logger.debug(f"OMOP export directory '{export_path}' created.") 1105 1106 # omop export db 1107 export_db_path = omop.export( 1108 map_path, 1109 export_path, 1110 config["phenotype"]["version"], 1111 config["phenotype"]["omop"], 1112 ) 1113 1114 # write to tables 1115 # export as csv 1116 _logger.info(f"Phenotype exported successfully")
Exports a phen repo at a specific tagged version into a target directory
1119def copy(phen_dir: str, target_dir: str, version: str): 1120 """Copys a phen repo at a specific tagged version into a target directory""" 1121 1122 # Validate 1123 validate(phen_dir) 1124 phen_path = Path(phen_dir) 1125 1126 # Check target directory exists 1127 target_path = Path(target_dir) 1128 if not target_path.exists(): 1129 raise FileNotFoundError(f"The target directory {target_path} does not exist") 1130 1131 # Set copy directory 1132 copy_path = target_path / version 1133 _logger.info(f"Copying repo {phen_path} to {copy_path}") 1134 1135 if ( 1136 copy_path.exists() and copy_path.is_dir() 1137 ): # Check if it exists and is a directory 1138 copy = _check_delete_dir( 1139 copy_path, 1140 f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ", 1141 ) 1142 else: 1143 copy = True 1144 1145 if not copy: 1146 _logger.info(f"Not copying the version {version}") 1147 return 1148 1149 _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...") 1150 repo = git.Repo.clone_from(phen_path, copy_path) 1151 1152 # Check out the latest commit or specified version 1153 if version: 1154 # Checkout a specific version (e.g., branch, tag, or commit hash) 1155 _logger.info(f"Checking out version {version}...") 1156 repo.git.checkout(version) 1157 else: 1158 # Checkout the latest commit (HEAD) 1159 _logger.info(f"Checking out the latest commit...") 1160 repo.git.checkout("HEAD") 1161 1162 _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}") 1163 1164 _logger.info(f"Phenotype copied successfully")
Copys a phen repo at a specific tagged version into a target directory
1168def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]: 1169 """Extracts concepts as {name: file_path} dictionary and a name set.""" 1170 concepts_dict = { 1171 item["name"]: item["file"]["path"] 1172 for item in config_data["phenotype"]["concept_sets"] 1173 } 1174 name_set = set(concepts_dict.keys()) 1175 return concepts_dict, name_set
Extracts concepts as {name: file_path} dictionary and a name set.
1189def diff_config(old_config: dict, new_config: dict) -> str: 1190 report = f"\n# Changes to phenotype configuration\n" 1191 report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n" 1192 1193 old_concepts, old_names = extract_concepts(old_config) 1194 new_concepts, new_names = extract_concepts(new_config) 1195 1196 # Check added and removed names 1197 added_names = new_names - old_names # Names that appear in new but not in old 1198 removed_names = old_names - new_names # Names that were in old but not in new 1199 1200 # find file path changes for unchanged names 1201 unchanged_names = old_names & new_names # Names that exist in both 1202 file_diff = DeepDiff( 1203 {name: old_concepts[name] for name in unchanged_names}, 1204 {name: new_concepts[name] for name in unchanged_names}, 1205 ) 1206 1207 # Find renamed concepts (same file, different name) 1208 renamed_concepts = [] 1209 for removed in removed_names: 1210 old_path = old_concepts[removed] 1211 for added in added_names: 1212 new_path = new_concepts[added] 1213 if old_path == new_path: 1214 renamed_concepts.append((removed, added)) 1215 1216 # Remove renamed concepts from added and removed sets 1217 for old_name, new_name in renamed_concepts: 1218 added_names.discard(new_name) 1219 removed_names.discard(old_name) 1220 1221 # generate config report 1222 if added_names: 1223 report += "## Added Concepts\n" 1224 for name in added_names: 1225 report += f"- `{name}` (File: `{new_concepts[name]}`)\n" 1226 report += "\n" 1227 1228 if removed_names: 1229 report += "## Removed Concepts\n" 1230 for name in removed_names: 1231 report += f"- `{name}` (File: `{old_concepts[name]}`)\n" 1232 report += "\n" 1233 1234 if renamed_concepts: 1235 report += "## Renamed Concepts\n" 1236 for old_name, new_name in renamed_concepts: 1237 report += ( 1238 f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n" 1239 ) 1240 report += "\n" 1241 1242 if "values_changed" in file_diff: 1243 report += "## Updated File Paths\n" 1244 for name, change in file_diff["values_changed"].items(): 1245 old_file = change["old_value"] 1246 new_file = change["new_value"] 1247 clean_name = name.split("root['")[1].split("']")[0] 1248 report += ( 1249 f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n" 1250 ) 1251 report += "\n" 1252 1253 if not ( 1254 added_names 1255 or removed_names 1256 or renamed_concepts 1257 or file_diff.get("values_changed") 1258 ): 1259 report += "No changes in concept sets.\n" 1260 1261 return report
1264def diff_map_files(old_map_path: Path, new_map_path: Path) -> str: 1265 old_output_files = [ 1266 file.name 1267 for file in old_map_path.iterdir() 1268 if file.is_file() and not file.name.startswith(".") 1269 ] 1270 new_output_files = [ 1271 file.name 1272 for file in new_map_path.iterdir() 1273 if file.is_file() and not file.name.startswith(".") 1274 ] 1275 1276 # Convert the lists to sets for easy comparison 1277 old_output_set = set(old_output_files) 1278 new_output_set = set(new_output_files) 1279 1280 # Outputs that are in old_output_set but not in new_output_set (removed files) 1281 removed_outputs = old_output_set - new_output_set 1282 # Outputs that are in new_output_set but not in old_output_set (added files) 1283 added_outputs = new_output_set - old_output_set 1284 # Outputs that are the intersection of old_output_set and new_output_set 1285 common_outputs = old_output_set & new_output_set 1286 1287 report = f"\n# Changes to available translations\n" 1288 report += f"This compares the coding translations files available.\n\n" 1289 report += f"- Removed outputs: {sorted(list(removed_outputs))}\n" 1290 report += f"- Added outputs: {sorted(list(added_outputs))}\n" 1291 report += f"- Common outputs: {sorted(list(common_outputs))}\n\n" 1292 1293 # Step N: Compare common outputs between versions 1294 report += f"# Changes to concepts in translation files\n\n" 1295 report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n" 1296 for file in common_outputs: 1297 old_output = old_map_path / file 1298 new_output = new_map_path / file 1299 1300 _logger.debug(f"Old ouptput: {str(old_output.resolve())}") 1301 _logger.debug(f"New ouptput: {str(new_output.resolve())}") 1302 1303 df1 = pd.read_csv(old_output) 1304 df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1305 df2 = pd.read_csv(new_output) 1306 df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1307 1308 # Check for added and removed concepts 1309 report += f"- File {file}\n" 1310 sorted_list = sorted(list(set(df1.index) - set(df2.index))) 1311 report += f"- Removed concepts {sorted_list}\n" 1312 sorted_list = sorted(list(set(df2.index) - set(df1.index))) 1313 report += f"- Added concepts {sorted_list}\n" 1314 1315 # Check for changed concepts 1316 diff = df2 - df1 # diff in counts 1317 diff = diff[ 1318 (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna() 1319 ] # get non-zero counts 1320 s = "\n" 1321 if len(diff.index) > 0: 1322 for concept, row in diff.iterrows(): 1323 s += "\t - {} {}\n".format(concept, row["CONCEPT"]) 1324 report += f"- Changed concepts {s}\n\n" 1325 else: 1326 report += f"- Changed concepts []\n\n" 1327 1328 return report
1331def diff_phen( 1332 new_phen_path: Path, 1333 new_version: str, 1334 old_phen_path: Path, 1335 old_version: str, 1336 report_path: Path, 1337): 1338 """Compare the differences between two versions of a phenotype""" 1339 1340 # validate phenotypes 1341 _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}") 1342 validate(str(old_phen_path.resolve())) 1343 _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}") 1344 validate(str(new_phen_path.resolve())) 1345 1346 # get old and new config 1347 old_config_path = old_phen_path / CONFIG_FILE 1348 with old_config_path.open("r") as file: 1349 old_config = yaml.safe_load(file) 1350 new_config_path = new_phen_path / CONFIG_FILE 1351 with new_config_path.open("r") as file: 1352 new_config = yaml.safe_load(file) 1353 1354 # write report heading 1355 report = f"# Phenotype Comparison Report\n" 1356 report += f"## Original phenotype\n" 1357 report += f" - {old_config['phenotype']['omop']['vocabulary_id']}\n" 1358 report += f" - {old_version}\n" 1359 report += f" - {str(old_phen_path.resolve())}\n" 1360 report += f"## Changed phenotype:\n" 1361 report += f" - {new_config['phenotype']['omop']['vocabulary_id']}\n" 1362 report += f" - {new_version}\n" 1363 report += f" - {str(new_phen_path.resolve())}\n" 1364 1365 # Step 1: check differences configuration files 1366 # Convert list of dicts into a dict: {name: file} 1367 report += diff_config(old_config, new_config) 1368 1369 # Step 2: check differences between map files 1370 # List files from output directories 1371 old_map_path = old_phen_path / MAP_DIR 1372 new_map_path = new_phen_path / MAP_DIR 1373 report += diff_map_files(old_map_path, new_map_path) 1374 1375 # initialise report file 1376 _logger.debug(f"Writing to report file {str(report_path.resolve())}") 1377 report_file = open(report_path, "w") 1378 report_file.write(report) 1379 report_file.close() 1380 1381 _logger.info(f"Phenotypes diff'd successfully")
Compare the differences between two versions of a phenotype
1384def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str): 1385 # make tmp directory .acmc 1386 timestamp = time.strftime("%Y%m%d_%H%M%S") 1387 temp_dir = Path(f".acmc/diff_{timestamp}") 1388 1389 changed_phen_path = Path(phen_dir) 1390 if not changed_phen_path.exists(): 1391 raise ValueError( 1392 f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}" 1393 ) 1394 1395 old_phen_path = Path(old_phen_dir) 1396 if not old_phen_path.exists(): 1397 raise ValueError( 1398 f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}" 1399 ) 1400 1401 t_path = old_phen_path / "config.yml" 1402 with t_path.open("r") as file: 1403 c = yaml.safe_load(file) 1404 1405 try: 1406 # Create the directory 1407 temp_dir.mkdir(parents=True, exist_ok=True) 1408 _logger.debug(f"Temporary directory created: {temp_dir}") 1409 1410 # Create temporary directories 1411 changed_path = temp_dir / "changed" 1412 changed_path.mkdir(parents=True, exist_ok=True) 1413 old_path = temp_dir / "old" 1414 old_path.mkdir(parents=True, exist_ok=True) 1415 1416 # checkout changed 1417 if version == "latest": 1418 _logger.debug( 1419 f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..." 1420 ) 1421 shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True) 1422 else: 1423 _logger.debug( 1424 f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..." 1425 ) 1426 changed_repo = git.Repo.clone_from(changed_phen_path, changed_path) 1427 changed_repo.git.checkout(version) 1428 1429 # checkout old 1430 if old_version == "latest": 1431 _logger.debug( 1432 f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1433 ) 1434 shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True) 1435 else: 1436 _logger.debug( 1437 f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1438 ) 1439 old_repo = git.Repo.clone_from(old_phen_dir, old_path) 1440 old_repo.git.checkout(old_version) 1441 1442 report_filename = f"{version}_{old_version}_diff.md" 1443 report_path = changed_phen_path / report_filename 1444 # diff old with new 1445 diff_phen(changed_path, version, old_path, old_version, report_path) 1446 1447 finally: 1448 # clean up tmp directory 1449 if temp_dir.exists(): 1450 shutil.rmtree(temp_dir) 1451 print(f"Temporary directory removed: {temp_dir}")