acmc.phen
phenotype.py module
This module provides functionality for managing phenotypes.
1""" 2phenotype.py module 3 4This module provides functionality for managing phenotypes. 5""" 6 7import argparse 8import pandas as pd 9import numpy as np 10import json 11import os 12import sqlite3 13import sys 14import shutil 15import time 16import git 17import re 18import logging 19import requests 20import yaml 21import semver 22from git import Repo 23from cerberus import Validator # type: ignore 24from deepdiff import DeepDiff 25from pathlib import Path 26from urllib.parse import urlparse, urlunparse 27from typing import Tuple, Set, Any 28import acmc 29from acmc import trud, omop, parse, util, logging_config as lc 30 31# set up logging 32_logger = lc.setup_logger() 33 34pd.set_option("mode.chained_assignment", None) 35 36PHEN_DIR = "phen" 37"""Default phenotype directory name""" 38 39DEFAULT_PHEN_PATH = Path("./workspace") / PHEN_DIR 40"""Default phenotype directory path""" 41 42CONCEPTS_DIR = "concepts" 43"""Default concepts directory name""" 44 45MAP_DIR = "map" 46"""Default map directory name""" 47 48CONCEPT_SET_DIR = "concept-sets" 49"""Default concept set directory name""" 50 51CSV_PATH = Path(CONCEPT_SET_DIR) / "csv" 52"""Default CSV concept set directory path""" 53 54OMOP_PATH = Path(CONCEPT_SET_DIR) / "omop" 55"""Default OMOP concept set directory path""" 56 57DEFAULT_PHEN_DIR_LIST = [CONCEPTS_DIR, MAP_DIR, CONCEPT_SET_DIR] 58"""List of default phenotype directories""" 59 60CONFIG_FILE = "config.yml" 61"""Default configuration filename""" 62 63VOCAB_VERSION_FILE = "vocab_version.yml" 64"""Default vocabulary version filename""" 65 66SEMANTIC_VERSION_TYPES = ["major", "minor", "patch"] 67"""List of semantic version increment types""" 68 69DEFAULT_VERSION_INC = "patch" 70"""Default semantic version increment type""" 71 72DEFAULT_GIT_BRANCH = "main" 73"""Default phenotype repo branch name""" 74 75SPLIT_COL_ACTION = "split_col" 76"""Split column preprocessing action type""" 77 78CODES_COL_ACTION = "codes_col" 79"""Codes column preprocessing action type""" 80 81DIVIDE_COL_ACTION = "divide_col" 82"""Divide column preprocessing action type""" 83 84COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION] 85"""List of column preprocessing action types""" 86 87CODE_FILE_TYPES = [".xlsx", ".xls", ".csv"] 88"""List of supported source concept coding list file types""" 89 90# config.yaml schema 91CONFIG_SCHEMA = { 92 "phenotype": { 93 "type": "dict", 94 "required": True, 95 "schema": { 96 "version": { 97 "type": "string", 98 "required": True, 99 "regex": r"^\d+\.\d+\.\d+$", # Enforces 'vN.N.N' format 100 }, 101 "omop": { 102 "type": "dict", 103 "required": True, 104 "schema": { 105 "vocabulary_id": {"type": "string", "required": True}, 106 "vocabulary_name": {"type": "string", "required": True}, 107 "vocabulary_reference": { 108 "type": "string", 109 "required": True, 110 "regex": r"^https?://.*", # Ensures it's a URL 111 }, 112 }, 113 }, 114 "map": { 115 "type": "list", 116 "schema": { 117 "type": "string", 118 "allowed": list( 119 parse.SUPPORTED_CODE_TYPES 120 ), # Ensure only predefined values are allowed 121 }, 122 }, 123 "concept_sets": { 124 "type": "list", 125 "required": True, 126 "schema": { 127 "type": "dict", 128 "schema": { 129 "name": {"type": "string", "required": True}, 130 "files": { 131 "type": "list", 132 "required": True, 133 "schema": { 134 "type": "dict", 135 "schema": { 136 "path": {"type": "string", "required": True}, 137 "columns": {"type": "dict", "required": True}, 138 "category": { 139 "type": "string" 140 }, # Optional but must be string if present 141 "actions": { 142 "type": "dict", 143 "schema": { 144 "divide_col": {"type": "string"}, 145 "split_col": {"type": "string"}, 146 "codes_col": {"type": "string"}, 147 }, 148 }, 149 }, 150 }, 151 }, 152 "metadata": {"type": "dict", "required": False}, 153 }, 154 }, 155 }, 156 }, 157 } 158} 159"""Phenotype config.yml schema definition""" 160 161 162class PhenValidationException(Exception): 163 """Custom exception class raised when validation errors in phenotype configuration file""" 164 165 def __init__(self, message, validation_errors=None): 166 super().__init__(message) 167 self.validation_errors = validation_errors 168 169 170def _construct_git_url(remote_url: str): 171 """Constructs a git url for github or gitlab including a PAT token environment variable""" 172 # check the url 173 parsed_url = urlparse(remote_url) 174 175 # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd 176 # need to update this function if the URL scheme is different. 177 if "github.com" in parsed_url.netloc: 178 # get GitHub PAT from environment variable 179 auth = os.getenv("ACMC_GITHUB_PAT") 180 if not auth: 181 raise ValueError( 182 "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable." 183 ) 184 else: 185 # get GitLab PAT from environment variable 186 auth = os.getenv("ACMC_GITLAB_PAT") 187 if not auth: 188 raise ValueError( 189 "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable." 190 ) 191 auth = f"oauth2:{auth}" 192 193 # Construct the new URL with credentials 194 new_netloc = f"{auth}@{parsed_url.netloc}" 195 return urlunparse( 196 ( 197 parsed_url.scheme, 198 new_netloc, 199 parsed_url.path, 200 parsed_url.params, 201 parsed_url.query, 202 parsed_url.fragment, 203 ) 204 ) 205 206 207def _create_empty_git_dir(path: Path): 208 """Creates a directory with a .gitkeep file so that it's tracked in git""" 209 path.mkdir(exist_ok=True) 210 keep_path = path / ".gitkeep" 211 keep_path.touch(exist_ok=True) 212 213 214def _check_delete_dir(path: Path, msg: str) -> bool: 215 """Checks on the command line if a user wants to delete a directory 216 217 Args: 218 path (Path): path of the directory to be deleted 219 msg (str): message to be displayed to the user 220 221 Returns: 222 Boolean: True if deleted 223 """ 224 deleted = False 225 226 user_input = input(f"{msg}").strip().lower() 227 if user_input in ["yes", "y"]: 228 shutil.rmtree(path) 229 deleted = True 230 else: 231 _logger.info("Directory was not deleted.") 232 233 return deleted 234 235 236def init(phen_dir: str, remote_url: str): 237 """Initial phenotype directory as git repo with standard structure""" 238 _logger.info(f"Initialising Phenotype in directory: {phen_dir}") 239 phen_path = Path(phen_dir) 240 241 # check if directory already exists and ask user if they want to recreate it 242 if ( 243 phen_path.exists() and phen_path.is_dir() 244 ): # Check if it exists and is a directory 245 configure = _check_delete_dir( 246 phen_path, 247 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 248 ) 249 else: 250 configure = True 251 252 if not configure: 253 _logger.info(f"Exiting, phenotype not initiatised") 254 return 255 256 # Initialise repo from local or remote 257 repo: Repo 258 259 # if remote then clone the repo otherwise init a local repo 260 if remote_url != None: 261 # add PAT token to the URL 262 git_url = _construct_git_url(remote_url) 263 264 # clone the repo 265 git_cmd = git.cmd.Git() 266 git_cmd.clone(git_url, phen_path) 267 268 # open repo 269 repo = Repo(phen_path) 270 # check if there are any commits (new repo has no commits) 271 if ( 272 len(repo.branches) == 0 or repo.head.is_detached 273 ): # Handle detached HEAD (e.g., after init) 274 _logger.debug("The phen repository has no commits yet.") 275 commit_count = 0 276 else: 277 # Get the total number of commits in the default branch 278 commit_count = sum(1 for _ in repo.iter_commits()) 279 _logger.debug(f"Repo has previous commits: {commit_count}") 280 else: 281 # local repo, create the directories and init 282 phen_path.mkdir(parents=True, exist_ok=True) 283 _logger.debug(f"Phen directory '{phen_path}' has been created.") 284 repo = git.Repo.init(phen_path) 285 commit_count = 0 286 287 phen_path = phen_path.resolve() 288 # initialise empty repos 289 if commit_count == 0: 290 # create initial commit 291 initial_file_path = phen_path / "README.md" 292 with open(initial_file_path, "w") as file: 293 file.write( 294 "# Initial commit\nThis is the first commit in the phen repository.\n" 295 ) 296 repo.index.add([initial_file_path]) 297 repo.index.commit("Initial commit") 298 commit_count = 1 299 300 # Checkout the phens default branch, creating it if it does not exist 301 if DEFAULT_GIT_BRANCH in repo.branches: 302 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 303 main_branch.checkout() 304 else: 305 main_branch = repo.create_head(DEFAULT_GIT_BRANCH) 306 main_branch.checkout() 307 308 # if the phen path does not contain the config file then initialise the phen type 309 config_path = phen_path / CONFIG_FILE 310 if config_path.exists(): 311 _logger.debug(f"Phenotype configuration files already exist") 312 return 313 314 _logger.info("Creating phen directory structure and config files") 315 for d in DEFAULT_PHEN_DIR_LIST: 316 _create_empty_git_dir(phen_path / d) 317 318 # create empty phen config file 319 config = { 320 "phenotype": { 321 "version": "0.0.0", 322 "omop": { 323 "vocabulary_id": "", 324 "vocabulary_name": "", 325 "vocabulary_reference": "", 326 }, 327 "translate": [], 328 "concept_sets": [], 329 } 330 } 331 332 with open(phen_path / CONFIG_FILE, "w") as file: 333 yaml.dump( 334 config, 335 file, 336 Dumper=util.QuotedDumper, 337 default_flow_style=False, 338 sort_keys=False, 339 default_style='"', 340 ) 341 342 # add git ignore 343 ignore_content = """# Ignore SQLite database files 344*.db 345*.sqlite3 346 347# Ignore SQLite journal and metadata files 348*.db-journal 349*.sqlite3-journal 350 351# python 352.ipynb_checkpoints 353 """ 354 ignore_path = phen_path / ".gitignore" 355 with open(ignore_path, "w") as file: 356 file.write(ignore_content) 357 358 # add to git repo and commit 359 for d in DEFAULT_PHEN_DIR_LIST: 360 repo.git.add(phen_path / d) 361 repo.git.add(all=True) 362 repo.index.commit("initialised the phen git repo.") 363 364 _logger.info(f"Phenotype initialised successfully") 365 366 367def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str): 368 """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin" 369 370 Args: 371 phen_dir (str): local directory path where the upstream repo is to be cloned 372 upstream_url (str): url to the upstream repo 373 upstream_version (str): version in the upstream repo to clone 374 new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None. 375 376 Raises: 377 ValueError: if the specified version is not in the upstream repo 378 ValueError: if the upstream repo is not a valid phenotype repo 379 ValueError: if there's any other problems with Git 380 """ 381 _logger.info( 382 f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}" 383 ) 384 385 phen_path = Path(phen_dir) 386 # check if directory already exists and ask user if they want to recreate it 387 if ( 388 phen_path.exists() and phen_path.is_dir() 389 ): # Check if it exists and is a directory 390 configure = _check_delete_dir( 391 phen_path, 392 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 393 ) 394 else: 395 configure = True 396 397 if not configure: 398 _logger.info(f"Exiting, phenotype not initiatised") 399 return 400 401 try: 402 # Clone repo 403 git_url = _construct_git_url(upstream_url) 404 repo = git.Repo.clone_from(git_url, phen_path) 405 406 # Fetch all branches and tags 407 repo.remotes.origin.fetch() 408 409 # Check if the version exists 410 available_refs = [ref.name.split("/")[-1] for ref in repo.references] 411 if upstream_version not in available_refs: 412 raise ValueError( 413 f"Version '{upstream_version}' not found in the repository: {upstream_url}." 414 ) 415 416 # Checkout the specified version 417 repo.git.checkout(upstream_version) 418 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 419 main_branch.checkout() 420 421 # Check if 'config.yml' exists in the root directory 422 config_path = phen_path / "config.yml" 423 if not os.path.isfile(config_path): 424 raise ValueError( 425 f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory." 426 ) 427 428 # Validate the phenotype is compatible with the acmc tool 429 validate(str(phen_path.resolve())) 430 431 # Delete each tag locally 432 tags = repo.tags 433 for tag in tags: 434 repo.delete_tag(tag) 435 _logger.debug(f"Deleted tags from forked repo: {tag}") 436 437 # Add upstream remote 438 repo.create_remote("upstream", upstream_url) 439 remote = repo.remotes["origin"] 440 repo.delete_remote(remote) # Remove existing origin 441 442 # Optionally set a new origin remote 443 if new_origin_url: 444 git_url = _construct_git_url(new_origin_url) 445 repo.create_remote("origin", git_url) 446 repo.git.push("--set-upstream", "origin", "main") 447 448 _logger.info(f"Repository forked successfully at {phen_path}") 449 _logger.info(f"Upstream set to {upstream_url}") 450 if new_origin_url: 451 _logger.info(f"Origin set to {new_origin_url}") 452 453 except Exception as e: 454 if phen_path.exists(): 455 shutil.rmtree(phen_path) 456 raise ValueError(f"Error occurred during repository fork: {str(e)}") 457 458 459def validate(phen_dir: str): 460 """Validates the phenotype directory is a git repo with standard structure""" 461 _logger.info(f"Validating phenotype: {phen_dir}") 462 phen_path = Path(phen_dir) 463 if not phen_path.is_dir(): 464 raise NotADirectoryError( 465 f"Error: '{str(phen_path.resolve())}' is not a directory" 466 ) 467 468 config_path = phen_path / CONFIG_FILE 469 if not config_path.is_file(): 470 raise FileNotFoundError( 471 f"Error: phen configuration file '{config_path}' does not exist." 472 ) 473 474 concepts_path = phen_path / CONCEPTS_DIR 475 if not concepts_path.is_dir(): 476 raise FileNotFoundError( 477 f"Error: source concepts directory {concepts_path} does not exist." 478 ) 479 480 # Calidate the directory is a git repo 481 try: 482 git.Repo(phen_path) 483 except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError): 484 raise Exception(f"Phen directory {phen_path} is not a git repo") 485 486 # Load configuration File 487 if config_path.suffix == ".yml": 488 try: 489 with config_path.open("r") as file: 490 phenotype = yaml.safe_load(file) 491 492 validator = Validator(CONFIG_SCHEMA) 493 if validator.validate(phenotype): 494 _logger.debug("YAML structure is valid.") 495 else: 496 _logger.error(f"YAML structure validation failed: {validator.errors}") 497 raise Exception(f"YAML structure validation failed: {validator.errors}") 498 except yaml.YAMLError as e: 499 _logger.error(f"YAML syntax error: {e}") 500 raise e 501 else: 502 raise Exception( 503 f"Unsupported configuration filetype: {str(config_path.resolve())}" 504 ) 505 506 # initiatise 507 validation_errors = [] 508 phenotype = phenotype["phenotype"] 509 code_types = parse.CodeTypeParser().code_types 510 511 # check the version number is of the format vn.n.n 512 match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"]) 513 if not match: 514 validation_errors.append( 515 f"Invalid version format in configuration file: {phenotype['version']}" 516 ) 517 518 # create a list of all the concept set names defined in the concept set configuration 519 concept_set_names = [] 520 for item in phenotype["concept_sets"]: 521 if item["name"] in concept_set_names: 522 validation_errors.append( 523 f"Duplicate concept set defined in concept sets {item['name'] }" 524 ) 525 else: 526 concept_set_names.append(item["name"]) 527 528 # check codes definition 529 for files in phenotype["concept_sets"]: 530 for item in files["files"]: 531 # check concepte code file exists 532 concept_code_file_path = concepts_path / item["path"] 533 if not concept_code_file_path.exists(): 534 validation_errors.append( 535 f"Coding file {str(concept_code_file_path.resolve())} does not exist" 536 ) 537 538 # check concepte code file is not empty 539 if concept_code_file_path.stat().st_size == 0: 540 validation_errors.append( 541 f"Coding file {str(concept_code_file_path.resolve())} is an empty file" 542 ) 543 544 # check code file type is supported 545 if concept_code_file_path.suffix not in CODE_FILE_TYPES: 546 raise ValueError( 547 f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types" 548 ) 549 550 # check columns specified are a supported medical coding type 551 for column in item["columns"]: 552 if column not in code_types: 553 validation_errors.append( 554 f"Column type {column} for file {concept_code_file_path} is not supported" 555 ) 556 557 # check the actions are supported 558 if "actions" in item: 559 for action in item["actions"]: 560 if action not in COL_ACTIONS: 561 validation_errors.append(f"Action {action} is not supported") 562 563 if len(validation_errors) > 0: 564 _logger.error(validation_errors) 565 raise PhenValidationException( 566 f"Configuration file {str(config_path.resolve())} failed validation", 567 validation_errors, 568 ) 569 570 _logger.info(f"Phenotype validated successfully") 571 572 573def _read_table_file(path: Path, excel_sheet: str = ""): 574 """ 575 Load Code List File 576 """ 577 578 path = path.resolve() 579 if path.suffix == ".csv": 580 df = pd.read_csv(path, dtype=str) 581 elif path.suffix == ".xlsx" or path.suffix == ".xls": 582 if excel_sheet != "": 583 df = pd.read_excel(path, sheet_name=excel_sheet, dtype=str) 584 else: 585 df = pd.read_excel(path, dtype=str) 586 elif path.suffix == ".dta": 587 df = pd.read_stata(path) 588 else: 589 raise ValueError( 590 f"Unsupported filetype {path.suffix}, only support{CODE_FILE_TYPES} code file types" 591 ) 592 593 return df 594 595 596def _process_actions(df: pd.DataFrame, concept_set: dict) -> pd.DataFrame: 597 # Perform Structural Changes to file before preprocessing 598 _logger.debug("Processing file structural actions") 599 if ( 600 "actions" in concept_set 601 and "split_col" in concept_set["actions"] 602 and "codes_col" in concept_set["actions"] 603 ): 604 split_col = concept_set["actions"]["split_col"] 605 codes_col = concept_set["actions"]["codes_col"] 606 _logger.debug( 607 "Action: Splitting", 608 split_col, 609 "column into:", 610 df[split_col].unique(), 611 ) 612 codes = df[codes_col] 613 oh = pd.get_dummies(df[split_col], dtype=bool) # one hot encode 614 oh = oh.where((oh != True), codes, axis=0) # fill in 1s with codes 615 oh[oh == False] = np.nan # replace 0s with None 616 df = pd.concat([df, oh], axis=1) # merge in new columns 617 618 return df 619 620 621def _preprocess_source_concepts( 622 df: pd.DataFrame, concept_set: dict, code_file_path: Path 623) -> Tuple[pd.DataFrame, list]: 624 """Perform QA Checks on columns individually and append to df""" 625 out = pd.DataFrame([]) # create output df to append to 626 code_errors = [] # list of errors from processing 627 628 # remove unnamed columns due to extra commas, missing headers, or incorrect parsing 629 df = df.drop(columns=[col for col in df.columns if "Unnamed" in col]) 630 631 # Preprocess codes 632 code_types = parse.CodeTypeParser().code_types 633 for code_type in concept_set["columns"]: 634 parser = code_types[code_type] 635 _logger.info(f"Processing {code_type} codes for {code_file_path}") 636 637 # get codes by column name 638 source_col_name = concept_set["columns"][code_type] 639 codes = df[source_col_name].dropna() 640 codes = codes.astype(str) # convert to string 641 codes = codes.str.strip() # remove excess spaces 642 643 # process codes, validating them using parser and returning the errors 644 codes, errors = parser.process(codes, code_file_path) 645 if len(errors) > 0: 646 code_errors.extend(errors) 647 _logger.warning(f"Codes validation failed with {len(errors)} errors") 648 649 # add processed codes to df 650 new_col_name = f"{source_col_name}_SOURCE" 651 df = df.rename(columns={source_col_name: new_col_name}) 652 process_codes = pd.DataFrame({code_type: codes}).join(df) 653 out = pd.concat( 654 [out, process_codes], 655 ignore_index=True, 656 ) 657 658 _logger.debug(out.head()) 659 660 return out, code_errors 661 662 663# Translate Df with multiple codes into single code type Series 664def translate_codes( 665 source_df: pd.DataFrame, 666 target_code_type: str, 667 concept_name: str, 668 not_translate: bool, 669) -> pd.DataFrame: 670 """Translates each source code type the source coding list into a target type and returns all conversions as a concept set""" 671 672 # codes = pd.DataFrame([], dtype=str) 673 codes = pd.DataFrame( 674 columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string" 675 ) 676 # Convert codes to target type 677 _logger.info(f"Converting to target code type {target_code_type}") 678 679 for source_code_type in source_df.columns: 680 # if target code type is the same as thet source code type, no translation, just appending source as target 681 if source_code_type == target_code_type: 682 copy_df = pd.DataFrame( 683 { 684 "SOURCE_CONCEPT": source_df[source_code_type], 685 "SOURCE_CONCEPT_TYPE": source_code_type, 686 "CONCEPT": source_df[source_code_type], 687 } 688 ) 689 codes = pd.concat([codes, copy_df]) 690 _logger.debug( 691 f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating" 692 ) 693 elif not not_translate: 694 # get the translation filename using source to target code types 695 filename = f"{source_code_type}_to_{target_code_type}.parquet" 696 map_path = trud.PROCESSED_PATH / filename 697 698 # do the mapping if it exists 699 if map_path.exists(): 700 # get mapping 701 df_map = pd.read_parquet(map_path) 702 703 # do mapping 704 translated_df = pd.merge( 705 source_df[source_code_type], df_map, how="left" 706 ) 707 708 # normalise the output 709 translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"]) 710 translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type 711 712 # add to list of codes 713 codes = pd.concat([codes, translated_df]) 714 715 else: 716 _logger.warning( 717 f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" 718 ) 719 720 codes = codes.dropna() # delete NaNs 721 722 # added concept set type to output if any translations 723 if len(codes.index) > 0: 724 codes["CONCEPT_SET"] = concept_name 725 else: 726 _logger.debug(f"No codes converted with target code type {target_code_type}") 727 728 return codes 729 730 731def _write_code_errors(code_errors: list, code_errors_path: Path): 732 err_df = pd.DataFrame( 733 [ 734 { 735 "CONCEPT": ", ".join(err.codes[~err.mask].tolist()), 736 "VOCABULARY": err.code_type, 737 "SOURCE": err.codes_file, 738 "CAUSE": err.message, 739 } 740 for err in code_errors 741 if err.mask is not None 742 ] 743 ) 744 745 err_df = err_df.drop_duplicates() # Remove Duplicates from Error file 746 err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"]) 747 err_df.to_csv(code_errors_path, index=False, mode="w") 748 749 750def write_vocab_version(phen_path: Path): 751 # write the vocab version files 752 753 if not trud.VERSION_PATH.exists(): 754 raise FileNotFoundError( 755 f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed" 756 ) 757 758 if not omop.VERSION_PATH.exists(): 759 raise FileNotFoundError( 760 f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed" 761 ) 762 763 with trud.VERSION_PATH.open("r") as file: 764 trud_version = yaml.safe_load(file) 765 766 with omop.VERSION_PATH.open("r") as file: 767 omop_version = yaml.safe_load(file) 768 769 # Create the combined YAML structure 770 version_data = { 771 "versions": { 772 "acmc": acmc.__version__, 773 "trud": trud_version, 774 "omop": omop_version, 775 } 776 } 777 778 with open(phen_path / VOCAB_VERSION_FILE, "w") as file: 779 yaml.dump( 780 version_data, 781 file, 782 Dumper=util.QuotedDumper, 783 default_flow_style=False, 784 sort_keys=False, 785 default_style='"', 786 ) 787 788 789def map(phen_dir: str, target_code_type: str, not_translate: bool, no_metadata: bool): 790 _logger.info(f"Processing phenotype: {phen_dir}") 791 792 # Validate configuration 793 validate(phen_dir) 794 795 # initialise paths 796 phen_path = Path(phen_dir) 797 config_path = phen_path / CONFIG_FILE 798 799 # load configuration 800 with config_path.open("r") as file: 801 config = yaml.safe_load(file) 802 phenotype = config["phenotype"] 803 804 if len(phenotype["map"]) == 0: 805 raise ValueError(f"No map codes defined in the phenotype configuration") 806 807 if target_code_type is not None and target_code_type not in phenotype["map"]: 808 raise ValueError( 809 f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}" 810 ) 811 812 if target_code_type is not None: 813 _map_target_code_type( 814 phen_path, phenotype, target_code_type, not_translate, no_metadata 815 ) 816 else: 817 for t in phenotype["map"]: 818 _map_target_code_type(phen_path, phenotype, t, not_translate, no_metadata) 819 820 _logger.info(f"Phenotype processed successfully") 821 822 823def _map_target_code_type( 824 phen_path: Path, 825 phenotype: dict, 826 target_code_type: str, 827 not_translate: bool, 828 no_metadata: bool, 829): 830 _logger.debug(f"Target coding format: {target_code_type}") 831 concepts_path = phen_path / CONCEPTS_DIR 832 # Create output dataframe 833 out = pd.DataFrame([]) 834 code_errors = [] 835 836 # Process each folder in codes section 837 for files in phenotype["concept_sets"]: 838 concept_set_name = files["name"] 839 if "metadata" in files: 840 concept_set_metadata = files["metadata"] 841 else: 842 concept_set_metadata = {} 843 for concept_set in files["files"]: 844 _logger.debug(f"--- {concept_set} ---") 845 846 # Load code file 847 codes_file_path = Path(concepts_path / concept_set["path"]) 848 df = _read_table_file(codes_file_path) 849 850 # process structural actions 851 df = _process_actions(df, concept_set) 852 853 # preprocessing and validate of source concepts 854 _logger.debug("Processing and validating source concept codes") 855 df, errors = _preprocess_source_concepts( 856 df, 857 concept_set, 858 codes_file_path, 859 ) 860 861 # create df with just the source code columns 862 source_column_names = list(concept_set["columns"].keys()) 863 source_df = df[source_column_names] 864 865 _logger.debug(source_df.columns) 866 _logger.debug(source_df.head()) 867 868 _logger.debug( 869 f"Length of errors from _preprocess_source_concepts {len(errors)}" 870 ) 871 if len(errors) > 0: 872 code_errors.extend(errors) 873 _logger.debug(f" Length of code_errors {len(code_errors)}") 874 875 # Map source concepts codes to target codes 876 # if processing a source coding list with categorical data 877 if ( 878 "actions" in concept_set 879 and "divide_col" in concept_set["actions"] 880 and len(df) > 0 881 ): 882 divide_col = concept_set["actions"]["divide_col"] 883 _logger.debug(f"Action: Dividing Table by {divide_col}") 884 _logger.debug(f"column into: {df[divide_col].unique()}") 885 df_grp = df.groupby(divide_col) 886 for cat, grp in df_grp: 887 if cat == concept_set["category"]: 888 grp = grp.drop( 889 columns=[divide_col] 890 ) # delete categorical column 891 source_df = grp[source_column_names] 892 trans_out = translate_codes( 893 source_df, 894 target_code_type=target_code_type, 895 concept_name=concept_set_name, 896 not_translate=not_translate, 897 ) 898 trans_out = add_metadata( 899 codes=trans_out, 900 metadata=concept_set_metadata, 901 no_metadata=no_metadata, 902 ) 903 out = pd.concat([out, trans_out]) 904 else: 905 source_df = df[source_column_names] 906 trans_out = translate_codes( 907 source_df, 908 target_code_type=target_code_type, 909 concept_name=concept_set_name, 910 not_translate=not_translate, 911 ) 912 trans_out = add_metadata( 913 codes=trans_out, 914 metadata=concept_set_metadata, 915 no_metadata=no_metadata, 916 ) 917 out = pd.concat([out, trans_out]) 918 919 if len(code_errors) > 0: 920 _logger.error(f"The map processing has {len(code_errors)} errors") 921 error_path = phen_path / MAP_DIR / "errors" 922 error_path.mkdir(parents=True, exist_ok=True) 923 error_filename = f"{target_code_type}-code-errors.csv" 924 _write_code_errors(code_errors, error_path / error_filename) 925 926 # Check there is output from processing 927 if len(out.index) == 0: 928 _logger.error(f"No output after map processing") 929 raise Exception( 930 f"No output after map processing, check config {str(phen_path.resolve())}" 931 ) 932 933 # final processing 934 out = out.reset_index(drop=True) 935 out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) 936 out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"]) 937 938 # out_count = len(out.index) 939 # added metadata 940 # Loop over each source_concept_type and perform the left join on all columns apart from source code columns 941 # result_list = [] 942 # for files in phenotype["concept_sets"]: 943 # concept_set_name = files["name"] 944 # for concept_set in files["files"]: 945 # source_column_names = list(concept_set["columns"].keys()) 946 # for source_concept_type in source_column_names: 947 # # Filter output based on the current source_concept_type 948 # out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type] 949 # filtered_count = len(out_filtered_df.index) 950 951 # # Remove the source type columns except the current type will leave the metadata and the join 952 # remove_types = [ 953 # type for type in source_column_names if type != source_concept_type 954 # ] 955 # metadata_df = df.drop(columns=remove_types) 956 # metadata_df = metadata_df.rename( 957 # columns={source_concept_type: "SOURCE_CONCEPT"} 958 # ) 959 # metadata_df_count = len(metadata_df.index) 960 961 # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata 962 # result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT") 963 # result_count = len(result.index) 964 965 # _logger.debug( 966 # f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}" 967 # ) 968 969 # # Append the result to the result_list 970 # result_list.append(result) 971 972 # Concatenate all the results into a single DataFrame 973 # final_out = pd.concat(result_list, ignore_index=True) 974 # final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) 975 # _logger.debug( 976 # f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}" 977 # ) 978 979 # Save output to map directory 980 output_filename = target_code_type + ".csv" 981 map_path = phen_path / MAP_DIR / output_filename 982 out.to_csv(map_path, index=False) 983 _logger.info(f"Saved mapped concepts to {str(map_path.resolve())}") 984 985 # save concept sets as separate files 986 concept_set_path = phen_path / CSV_PATH / target_code_type 987 988 # empty the concept-set directory except for hiddle files, e.g. .git 989 if concept_set_path.exists(): 990 for item in concept_set_path.iterdir(): 991 if not item.name.startswith("."): 992 item.unlink() 993 else: 994 concept_set_path.mkdir(parents=True, exist_ok=True) 995 996 # write each concept as a separate file 997 for name, concept in out.groupby("CONCEPT_SET"): 998 concept = concept.sort_values(by="CONCEPT") # sort rows 999 concept = concept.dropna(how="all", axis=1) # remove empty cols 1000 concept = concept.reindex( 1001 sorted(concept.columns), axis=1 1002 ) # sort cols alphabetically 1003 filename = f"{name}.csv" 1004 concept_path = concept_set_path / filename 1005 concept.to_csv(concept_path, index=False) 1006 1007 write_vocab_version(phen_path) 1008 1009 _logger.info(f"Phenotype processed target code type {target_code_type}") 1010 1011 1012# Add metadata dict to each row of Df codes 1013def add_metadata( 1014 codes: pd.DataFrame, 1015 metadata: dict, 1016 no_metadata: bool, 1017) -> pd.DataFrame: 1018 """Add concept set metadata, stored as a dictionary, to each concept row""" 1019 1020 if not no_metadata: 1021 for meta_name, meta_value in metadata.items(): 1022 codes[meta_name] = meta_value 1023 _logger.debug( 1024 f"Adding metadata for concept set: metadata name {meta_name}, metadata value {meta_value}" 1025 ) 1026 1027 return codes 1028 1029 1030def _generate_version_tag( 1031 repo: git.Repo, increment: str = DEFAULT_VERSION_INC, use_v_prefix: bool = False 1032) -> str: 1033 # Get all valid semantic version tags 1034 versions = [] 1035 for tag in repo.tags: 1036 if tag.name.startswith("v"): 1037 tag_name = tag.name[1:] # Remove the first character 1038 else: 1039 tag_name = tag.name 1040 if semver.Version.is_valid(tag_name): 1041 versions.append(semver.Version.parse(tag_name)) 1042 1043 _logger.debug(f"Versions: {versions}") 1044 # Determine the next version 1045 if not versions: 1046 new_version = semver.Version(0, 0, 1) 1047 else: 1048 latest_version = max(versions) 1049 if increment == "major": 1050 new_version = latest_version.bump_major() 1051 elif increment == "minor": 1052 new_version = latest_version.bump_minor() 1053 else: 1054 new_version = latest_version.bump_patch() 1055 1056 # Create the new tag 1057 new_version_str = f"v{new_version}" if use_v_prefix else str(new_version) 1058 1059 return new_version_str 1060 1061 1062def publish( 1063 phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC 1064): 1065 """Publishes updates to the phenotype by commiting all changes to the repo directory""" 1066 1067 # Validate config 1068 validate(phen_dir) 1069 phen_path = Path(phen_dir) 1070 1071 # load git repo and set the branch 1072 repo = git.Repo(phen_path) 1073 if DEFAULT_GIT_BRANCH in repo.branches: 1074 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 1075 main_branch.checkout() 1076 else: 1077 raise AttributeError( 1078 f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}" 1079 ) 1080 1081 # check if any changes to publish 1082 if not repo.is_dirty() and not repo.untracked_files: 1083 if remote_url is not None and "origin" not in repo.remotes: 1084 _logger.info(f"First publish to remote url {remote_url}") 1085 else: 1086 _logger.info("Nothing to publish, no changes to the repo") 1087 return 1088 1089 # get next version 1090 new_version_str = _generate_version_tag(repo, increment) 1091 _logger.info(f"New version: {new_version_str}") 1092 1093 # Write version in configuration file 1094 config_path = phen_path / CONFIG_FILE 1095 with config_path.open("r") as file: 1096 config = yaml.safe_load(file) 1097 1098 config["phenotype"]["version"] = new_version_str 1099 with open(config_path, "w") as file: 1100 yaml.dump( 1101 config, 1102 file, 1103 Dumper=util.QuotedDumper, 1104 default_flow_style=False, 1105 sort_keys=False, 1106 default_style='"', 1107 ) 1108 1109 # Add and commit changes to repo including version updates 1110 commit_message = f"Committing updates to phenotype {phen_path}" 1111 repo.git.add("--all") 1112 repo.index.commit(commit_message) 1113 1114 # Add tag to the repo 1115 repo.create_tag(new_version_str) 1116 1117 # push to origin if a remote repo 1118 if remote_url is not None and "origin" not in repo.remotes: 1119 git_url = _construct_git_url(remote_url) 1120 repo.create_remote("origin", git_url) 1121 1122 try: 1123 if "origin" in repo.remotes: 1124 _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}") 1125 origin = repo.remotes.origin 1126 _logger.info(f"Pushing main branch to remote repo") 1127 repo.git.push("--set-upstream", "origin", "main") 1128 _logger.info(f"Pushing version tags to remote git repo") 1129 origin.push(tags=True) 1130 _logger.debug("Changes pushed to 'origin'") 1131 else: 1132 _logger.debug("Remote 'origin' is not set") 1133 except Exception as e: 1134 tag_ref = repo.tags[new_version_str] 1135 repo.delete_tag(tag_ref) 1136 repo.git.reset("--soft", "HEAD~1") 1137 raise e 1138 1139 _logger.info(f"Phenotype published successfully") 1140 1141 1142def export(phen_dir: str, version: str): 1143 """Exports a phen repo at a specific tagged version into a target directory""" 1144 _logger.info(f"Exporting phenotype {phen_dir} at version {version}") 1145 1146 # validate configuration 1147 validate(phen_dir) 1148 phen_path = Path(phen_dir) 1149 1150 # load configuration 1151 config_path = phen_path / CONFIG_FILE 1152 with config_path.open("r") as file: 1153 config = yaml.safe_load(file) 1154 1155 map_path = phen_path / MAP_DIR 1156 if not map_path.exists(): 1157 _logger.warning(f"Map path does not exist '{map_path}'") 1158 1159 export_path = phen_path / OMOP_PATH 1160 # check export directory exists and if not create it 1161 if not export_path.exists(): 1162 export_path.mkdir(parents=True) 1163 _logger.debug(f"OMOP export directory '{export_path}' created.") 1164 1165 # omop export db 1166 export_db_path = omop.export( 1167 map_path, 1168 export_path, 1169 config["phenotype"]["version"], 1170 config["phenotype"]["omop"], 1171 ) 1172 1173 # write to tables 1174 # export as csv 1175 _logger.info(f"Phenotype exported successfully") 1176 1177 1178def copy(phen_dir: str, target_dir: str, version: str): 1179 """Copys a phen repo at a specific tagged version into a target directory""" 1180 1181 # Validate 1182 validate(phen_dir) 1183 phen_path = Path(phen_dir) 1184 1185 # Check target directory exists 1186 target_path = Path(target_dir) 1187 if not target_path.exists(): 1188 raise FileNotFoundError(f"The target directory {target_path} does not exist") 1189 1190 # Set copy directory 1191 copy_path = target_path / version 1192 _logger.info(f"Copying repo {phen_path} to {copy_path}") 1193 1194 if ( 1195 copy_path.exists() and copy_path.is_dir() 1196 ): # Check if it exists and is a directory 1197 copy = _check_delete_dir( 1198 copy_path, 1199 f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ", 1200 ) 1201 else: 1202 copy = True 1203 1204 if not copy: 1205 _logger.info(f"Not copying the version {version}") 1206 return 1207 1208 _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...") 1209 repo = git.Repo.clone_from(phen_path, copy_path) 1210 1211 # Check out the latest commit or specified version 1212 if version: 1213 # Checkout a specific version (e.g., branch, tag, or commit hash) 1214 _logger.info(f"Checking out version {version}...") 1215 repo.git.checkout(version) 1216 else: 1217 # Checkout the latest commit (HEAD) 1218 _logger.info(f"Checking out the latest commit...") 1219 repo.git.checkout("HEAD") 1220 1221 _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}") 1222 1223 _logger.info(f"Phenotype copied successfully") 1224 1225 1226# Convert concept_sets list into dictionaries 1227def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]: 1228 """Extracts concepts as {name: file_path} dictionary and a name set.""" 1229 concepts_dict = { 1230 item["name"]: [file["path"] for file in item["files"]] 1231 for item in config_data["phenotype"]["concept_sets"] 1232 } 1233 name_set = set(concepts_dict.keys()) 1234 return concepts_dict, name_set 1235 1236 1237def _extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]: 1238 """ 1239 Extracts clean keys from a DeepDiff dictionary. 1240 1241 :param diff: DeepDiff result dictionary 1242 :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed") 1243 :return: A set of clean key names 1244 """ 1245 return {key.split("root['")[1].split("']")[0] for key in diff.get(key_type, [])} 1246 1247 1248def diff_config(old_config: dict, new_config: dict) -> str: 1249 report = f"\n# Changes to phenotype configuration\n" 1250 report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n" 1251 1252 old_concepts, old_names = extract_concepts(old_config) 1253 new_concepts, new_names = extract_concepts(new_config) 1254 1255 # Check added and removed concept set names 1256 added_names = new_names - old_names # Names that appear in new but not in old 1257 removed_names = old_names - new_names # Names that were in old but not in new 1258 1259 # find file path changes for unchanged names 1260 unchanged_names = old_names & new_names # Names that exist in both 1261 file_diff = DeepDiff( 1262 {name: old_concepts[name] for name in unchanged_names}, 1263 {name: new_concepts[name] for name in unchanged_names}, 1264 ) 1265 1266 # Find renamed concepts (same file, different name) 1267 renamed_concepts = [] 1268 for removed in removed_names: 1269 old_path = old_concepts[removed] 1270 for added in added_names: 1271 new_path = new_concepts[added] 1272 if old_path == new_path: 1273 renamed_concepts.append((removed, added)) 1274 1275 # Remove renamed concepts from added and removed sets 1276 for old_name, new_name in renamed_concepts: 1277 added_names.discard(new_name) 1278 removed_names.discard(old_name) 1279 1280 # generate config report 1281 if added_names: 1282 report += "## Added Concepts\n" 1283 for name in added_names: 1284 report += f"- `{name}` (File: `{new_concepts[name]}`)\n" 1285 report += "\n" 1286 1287 if removed_names: 1288 report += "## Removed Concepts\n" 1289 for name in removed_names: 1290 report += f"- `{name}` (File: `{old_concepts[name]}`)\n" 1291 report += "\n" 1292 1293 if renamed_concepts: 1294 report += "## Renamed Concepts\n" 1295 for old_name, new_name in renamed_concepts: 1296 report += ( 1297 f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n" 1298 ) 1299 report += "\n" 1300 1301 if "values_changed" in file_diff: 1302 report += "## Updated File Paths\n" 1303 for name, change in file_diff["values_changed"].items(): 1304 old_file = change["old_value"] 1305 new_file = change["new_value"] 1306 clean_name = name.split("root['")[1].split("']")[0] 1307 report += ( 1308 f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n" 1309 ) 1310 report += "\n" 1311 1312 if not ( 1313 added_names 1314 or removed_names 1315 or renamed_concepts 1316 or file_diff.get("values_changed") 1317 ): 1318 report += "No changes in concept sets.\n" 1319 1320 return report 1321 1322 1323def diff_map_files(old_map_path: Path, new_map_path: Path) -> str: 1324 old_output_files = [ 1325 file.name 1326 for file in old_map_path.iterdir() 1327 if file.is_file() and not file.name.startswith(".") 1328 ] 1329 new_output_files = [ 1330 file.name 1331 for file in new_map_path.iterdir() 1332 if file.is_file() and not file.name.startswith(".") 1333 ] 1334 1335 # Convert the lists to sets for easy comparison 1336 old_output_set = set(old_output_files) 1337 new_output_set = set(new_output_files) 1338 1339 # Outputs that are in old_output_set but not in new_output_set (removed files) 1340 removed_outputs = old_output_set - new_output_set 1341 # Outputs that are in new_output_set but not in old_output_set (added files) 1342 added_outputs = new_output_set - old_output_set 1343 # Outputs that are the intersection of old_output_set and new_output_set 1344 common_outputs = old_output_set & new_output_set 1345 1346 report = f"\n# Changes to available translations\n" 1347 report += f"This compares the coding translations files available.\n\n" 1348 report += f"- Removed outputs: {sorted(list(removed_outputs))}\n" 1349 report += f"- Added outputs: {sorted(list(added_outputs))}\n" 1350 report += f"- Common outputs: {sorted(list(common_outputs))}\n\n" 1351 1352 # Step N: Compare common outputs between versions 1353 report += f"# Changes to concepts in translation files\n\n" 1354 report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n" 1355 for file in common_outputs: 1356 old_output = old_map_path / file 1357 new_output = new_map_path / file 1358 1359 _logger.debug(f"Old ouptput: {str(old_output.resolve())}") 1360 _logger.debug(f"New ouptput: {str(new_output.resolve())}") 1361 1362 df1 = pd.read_csv(old_output) 1363 df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1364 df2 = pd.read_csv(new_output) 1365 df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1366 1367 # Check for added and removed concepts 1368 report += f"- File {file}\n" 1369 sorted_list = sorted(list(set(df1.index) - set(df2.index))) 1370 report += f"- Removed concepts {sorted_list}\n" 1371 sorted_list = sorted(list(set(df2.index) - set(df1.index))) 1372 report += f"- Added concepts {sorted_list}\n" 1373 1374 # Check for changed concepts 1375 diff = df2 - df1 # diff in counts 1376 diff = diff[ 1377 (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna() 1378 ] # get non-zero counts 1379 s = "\n" 1380 if len(diff.index) > 0: 1381 for concept, row in diff.iterrows(): 1382 s += "\t - {} {}\n".format(concept, row["CONCEPT"]) 1383 report += f"- Changed concepts {s}\n\n" 1384 else: 1385 report += f"- Changed concepts []\n\n" 1386 1387 return report 1388 1389 1390def diff_phen( 1391 new_phen_path: Path, 1392 new_version: str, 1393 old_phen_path: Path, 1394 old_version: str, 1395 report_path: Path, 1396 not_check_config: bool, 1397): 1398 """Compare the differences between two versions of a phenotype""" 1399 1400 # write report heading 1401 report = f"# Phenotype Comparison Report\n" 1402 1403 # Step 1: check differences configuration files 1404 if not not_check_config: 1405 # validate phenotypes 1406 _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}") 1407 validate(str(old_phen_path.resolve())) 1408 _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}") 1409 validate(str(new_phen_path.resolve())) 1410 1411 # get old and new config 1412 old_config_path = old_phen_path / CONFIG_FILE 1413 with old_config_path.open("r") as file: 1414 old_config = yaml.safe_load(file) 1415 new_config_path = new_phen_path / CONFIG_FILE 1416 with new_config_path.open("r") as file: 1417 new_config = yaml.safe_load(file) 1418 1419 # write report 1420 report += f"## Original phenotype\n" 1421 report += f" - {old_config['phenotype']['omop']['vocabulary_id']}\n" 1422 report += f" - {old_version}\n" 1423 report += f" - {str(old_phen_path.resolve())}\n" 1424 report += f"## Changed phenotype:\n" 1425 report += f" - {new_config['phenotype']['omop']['vocabulary_id']}\n" 1426 report += f" - {new_version}\n" 1427 report += f" - {str(new_phen_path.resolve())}\n" 1428 1429 # Convert list of dicts into a dict: {name: file} 1430 report += diff_config(old_config, new_config) 1431 1432 # Step 2: check differences between map files 1433 # List files from output directories 1434 old_map_path = old_phen_path / MAP_DIR 1435 new_map_path = new_phen_path / MAP_DIR 1436 report += diff_map_files(old_map_path, new_map_path) 1437 1438 # initialise report file 1439 _logger.debug(f"Writing to report file {str(report_path.resolve())}") 1440 report_file = open(report_path, "w") 1441 report_file.write(report) 1442 report_file.close() 1443 1444 _logger.info(f"Phenotypes diff'd successfully") 1445 1446 1447def diff( 1448 phen_dir: str, 1449 version: str, 1450 old_phen_dir: str, 1451 old_version: str, 1452 not_check_config: bool, 1453): 1454 # make tmp directory .acmc 1455 timestamp = time.strftime("%Y%m%d_%H%M%S") 1456 temp_dir = Path(f".acmc/diff_{timestamp}") 1457 1458 changed_phen_path = Path(phen_dir) 1459 if not changed_phen_path.exists(): 1460 raise ValueError( 1461 f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}" 1462 ) 1463 1464 old_phen_path = Path(old_phen_dir) 1465 if not old_phen_path.exists(): 1466 raise ValueError( 1467 f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}" 1468 ) 1469 1470 # t_path = old_phen_path / "config.yml" 1471 # with t_path.open("r") as file: 1472 # c = yaml.safe_load(file) 1473 1474 try: 1475 # Create the directory 1476 temp_dir.mkdir(parents=True, exist_ok=True) 1477 _logger.debug(f"Temporary directory created: {temp_dir}") 1478 1479 # Create temporary directories 1480 changed_path = temp_dir / "changed" 1481 changed_path.mkdir(parents=True, exist_ok=True) 1482 old_path = temp_dir / "old" 1483 old_path.mkdir(parents=True, exist_ok=True) 1484 1485 # checkout changed 1486 if version == "latest": 1487 _logger.debug( 1488 f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..." 1489 ) 1490 shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True) 1491 else: 1492 _logger.debug( 1493 f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..." 1494 ) 1495 changed_repo = git.Repo.clone_from(changed_phen_path, changed_path) 1496 changed_repo.git.checkout(version) 1497 1498 # checkout old 1499 if old_version == "latest": 1500 _logger.debug( 1501 f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1502 ) 1503 shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True) 1504 else: 1505 _logger.debug( 1506 f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1507 ) 1508 old_repo = git.Repo.clone_from(old_phen_dir, old_path) 1509 old_repo.git.checkout(old_version) 1510 1511 report_filename = f"{version}_{old_version}_diff.md" 1512 report_path = changed_phen_path / report_filename 1513 # diff old with new 1514 diff_phen( 1515 changed_path, version, old_path, old_version, report_path, not_check_config 1516 ) 1517 1518 finally: 1519 # clean up tmp directory 1520 if temp_dir.exists(): 1521 shutil.rmtree(temp_dir)
Default phenotype directory name
Default phenotype directory path
Default concepts directory name
Default map directory name
Default concept set directory name
Default CSV concept set directory path
Default OMOP concept set directory path
List of default phenotype directories
Default configuration filename
Default vocabulary version filename
List of semantic version increment types
Default semantic version increment type
Default phenotype repo branch name
Split column preprocessing action type
Codes column preprocessing action type
Divide column preprocessing action type
List of column preprocessing action types
List of supported source concept coding list file types
Phenotype config.yml schema definition
163class PhenValidationException(Exception): 164 """Custom exception class raised when validation errors in phenotype configuration file""" 165 166 def __init__(self, message, validation_errors=None): 167 super().__init__(message) 168 self.validation_errors = validation_errors
Custom exception class raised when validation errors in phenotype configuration file
237def init(phen_dir: str, remote_url: str): 238 """Initial phenotype directory as git repo with standard structure""" 239 _logger.info(f"Initialising Phenotype in directory: {phen_dir}") 240 phen_path = Path(phen_dir) 241 242 # check if directory already exists and ask user if they want to recreate it 243 if ( 244 phen_path.exists() and phen_path.is_dir() 245 ): # Check if it exists and is a directory 246 configure = _check_delete_dir( 247 phen_path, 248 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 249 ) 250 else: 251 configure = True 252 253 if not configure: 254 _logger.info(f"Exiting, phenotype not initiatised") 255 return 256 257 # Initialise repo from local or remote 258 repo: Repo 259 260 # if remote then clone the repo otherwise init a local repo 261 if remote_url != None: 262 # add PAT token to the URL 263 git_url = _construct_git_url(remote_url) 264 265 # clone the repo 266 git_cmd = git.cmd.Git() 267 git_cmd.clone(git_url, phen_path) 268 269 # open repo 270 repo = Repo(phen_path) 271 # check if there are any commits (new repo has no commits) 272 if ( 273 len(repo.branches) == 0 or repo.head.is_detached 274 ): # Handle detached HEAD (e.g., after init) 275 _logger.debug("The phen repository has no commits yet.") 276 commit_count = 0 277 else: 278 # Get the total number of commits in the default branch 279 commit_count = sum(1 for _ in repo.iter_commits()) 280 _logger.debug(f"Repo has previous commits: {commit_count}") 281 else: 282 # local repo, create the directories and init 283 phen_path.mkdir(parents=True, exist_ok=True) 284 _logger.debug(f"Phen directory '{phen_path}' has been created.") 285 repo = git.Repo.init(phen_path) 286 commit_count = 0 287 288 phen_path = phen_path.resolve() 289 # initialise empty repos 290 if commit_count == 0: 291 # create initial commit 292 initial_file_path = phen_path / "README.md" 293 with open(initial_file_path, "w") as file: 294 file.write( 295 "# Initial commit\nThis is the first commit in the phen repository.\n" 296 ) 297 repo.index.add([initial_file_path]) 298 repo.index.commit("Initial commit") 299 commit_count = 1 300 301 # Checkout the phens default branch, creating it if it does not exist 302 if DEFAULT_GIT_BRANCH in repo.branches: 303 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 304 main_branch.checkout() 305 else: 306 main_branch = repo.create_head(DEFAULT_GIT_BRANCH) 307 main_branch.checkout() 308 309 # if the phen path does not contain the config file then initialise the phen type 310 config_path = phen_path / CONFIG_FILE 311 if config_path.exists(): 312 _logger.debug(f"Phenotype configuration files already exist") 313 return 314 315 _logger.info("Creating phen directory structure and config files") 316 for d in DEFAULT_PHEN_DIR_LIST: 317 _create_empty_git_dir(phen_path / d) 318 319 # create empty phen config file 320 config = { 321 "phenotype": { 322 "version": "0.0.0", 323 "omop": { 324 "vocabulary_id": "", 325 "vocabulary_name": "", 326 "vocabulary_reference": "", 327 }, 328 "translate": [], 329 "concept_sets": [], 330 } 331 } 332 333 with open(phen_path / CONFIG_FILE, "w") as file: 334 yaml.dump( 335 config, 336 file, 337 Dumper=util.QuotedDumper, 338 default_flow_style=False, 339 sort_keys=False, 340 default_style='"', 341 ) 342 343 # add git ignore 344 ignore_content = """# Ignore SQLite database files 345*.db 346*.sqlite3 347 348# Ignore SQLite journal and metadata files 349*.db-journal 350*.sqlite3-journal 351 352# python 353.ipynb_checkpoints 354 """ 355 ignore_path = phen_path / ".gitignore" 356 with open(ignore_path, "w") as file: 357 file.write(ignore_content) 358 359 # add to git repo and commit 360 for d in DEFAULT_PHEN_DIR_LIST: 361 repo.git.add(phen_path / d) 362 repo.git.add(all=True) 363 repo.index.commit("initialised the phen git repo.") 364 365 _logger.info(f"Phenotype initialised successfully")
Initial phenotype directory as git repo with standard structure
368def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str): 369 """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin" 370 371 Args: 372 phen_dir (str): local directory path where the upstream repo is to be cloned 373 upstream_url (str): url to the upstream repo 374 upstream_version (str): version in the upstream repo to clone 375 new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None. 376 377 Raises: 378 ValueError: if the specified version is not in the upstream repo 379 ValueError: if the upstream repo is not a valid phenotype repo 380 ValueError: if there's any other problems with Git 381 """ 382 _logger.info( 383 f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}" 384 ) 385 386 phen_path = Path(phen_dir) 387 # check if directory already exists and ask user if they want to recreate it 388 if ( 389 phen_path.exists() and phen_path.is_dir() 390 ): # Check if it exists and is a directory 391 configure = _check_delete_dir( 392 phen_path, 393 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 394 ) 395 else: 396 configure = True 397 398 if not configure: 399 _logger.info(f"Exiting, phenotype not initiatised") 400 return 401 402 try: 403 # Clone repo 404 git_url = _construct_git_url(upstream_url) 405 repo = git.Repo.clone_from(git_url, phen_path) 406 407 # Fetch all branches and tags 408 repo.remotes.origin.fetch() 409 410 # Check if the version exists 411 available_refs = [ref.name.split("/")[-1] for ref in repo.references] 412 if upstream_version not in available_refs: 413 raise ValueError( 414 f"Version '{upstream_version}' not found in the repository: {upstream_url}." 415 ) 416 417 # Checkout the specified version 418 repo.git.checkout(upstream_version) 419 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 420 main_branch.checkout() 421 422 # Check if 'config.yml' exists in the root directory 423 config_path = phen_path / "config.yml" 424 if not os.path.isfile(config_path): 425 raise ValueError( 426 f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory." 427 ) 428 429 # Validate the phenotype is compatible with the acmc tool 430 validate(str(phen_path.resolve())) 431 432 # Delete each tag locally 433 tags = repo.tags 434 for tag in tags: 435 repo.delete_tag(tag) 436 _logger.debug(f"Deleted tags from forked repo: {tag}") 437 438 # Add upstream remote 439 repo.create_remote("upstream", upstream_url) 440 remote = repo.remotes["origin"] 441 repo.delete_remote(remote) # Remove existing origin 442 443 # Optionally set a new origin remote 444 if new_origin_url: 445 git_url = _construct_git_url(new_origin_url) 446 repo.create_remote("origin", git_url) 447 repo.git.push("--set-upstream", "origin", "main") 448 449 _logger.info(f"Repository forked successfully at {phen_path}") 450 _logger.info(f"Upstream set to {upstream_url}") 451 if new_origin_url: 452 _logger.info(f"Origin set to {new_origin_url}") 453 454 except Exception as e: 455 if phen_path.exists(): 456 shutil.rmtree(phen_path) 457 raise ValueError(f"Error occurred during repository fork: {str(e)}")
Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
Arguments:
- phen_dir (str): local directory path where the upstream repo is to be cloned
- upstream_url (str): url to the upstream repo
- upstream_version (str): version in the upstream repo to clone
- new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
Raises:
- ValueError: if the specified version is not in the upstream repo
- ValueError: if the upstream repo is not a valid phenotype repo
- ValueError: if there's any other problems with Git
460def validate(phen_dir: str): 461 """Validates the phenotype directory is a git repo with standard structure""" 462 _logger.info(f"Validating phenotype: {phen_dir}") 463 phen_path = Path(phen_dir) 464 if not phen_path.is_dir(): 465 raise NotADirectoryError( 466 f"Error: '{str(phen_path.resolve())}' is not a directory" 467 ) 468 469 config_path = phen_path / CONFIG_FILE 470 if not config_path.is_file(): 471 raise FileNotFoundError( 472 f"Error: phen configuration file '{config_path}' does not exist." 473 ) 474 475 concepts_path = phen_path / CONCEPTS_DIR 476 if not concepts_path.is_dir(): 477 raise FileNotFoundError( 478 f"Error: source concepts directory {concepts_path} does not exist." 479 ) 480 481 # Calidate the directory is a git repo 482 try: 483 git.Repo(phen_path) 484 except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError): 485 raise Exception(f"Phen directory {phen_path} is not a git repo") 486 487 # Load configuration File 488 if config_path.suffix == ".yml": 489 try: 490 with config_path.open("r") as file: 491 phenotype = yaml.safe_load(file) 492 493 validator = Validator(CONFIG_SCHEMA) 494 if validator.validate(phenotype): 495 _logger.debug("YAML structure is valid.") 496 else: 497 _logger.error(f"YAML structure validation failed: {validator.errors}") 498 raise Exception(f"YAML structure validation failed: {validator.errors}") 499 except yaml.YAMLError as e: 500 _logger.error(f"YAML syntax error: {e}") 501 raise e 502 else: 503 raise Exception( 504 f"Unsupported configuration filetype: {str(config_path.resolve())}" 505 ) 506 507 # initiatise 508 validation_errors = [] 509 phenotype = phenotype["phenotype"] 510 code_types = parse.CodeTypeParser().code_types 511 512 # check the version number is of the format vn.n.n 513 match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"]) 514 if not match: 515 validation_errors.append( 516 f"Invalid version format in configuration file: {phenotype['version']}" 517 ) 518 519 # create a list of all the concept set names defined in the concept set configuration 520 concept_set_names = [] 521 for item in phenotype["concept_sets"]: 522 if item["name"] in concept_set_names: 523 validation_errors.append( 524 f"Duplicate concept set defined in concept sets {item['name'] }" 525 ) 526 else: 527 concept_set_names.append(item["name"]) 528 529 # check codes definition 530 for files in phenotype["concept_sets"]: 531 for item in files["files"]: 532 # check concepte code file exists 533 concept_code_file_path = concepts_path / item["path"] 534 if not concept_code_file_path.exists(): 535 validation_errors.append( 536 f"Coding file {str(concept_code_file_path.resolve())} does not exist" 537 ) 538 539 # check concepte code file is not empty 540 if concept_code_file_path.stat().st_size == 0: 541 validation_errors.append( 542 f"Coding file {str(concept_code_file_path.resolve())} is an empty file" 543 ) 544 545 # check code file type is supported 546 if concept_code_file_path.suffix not in CODE_FILE_TYPES: 547 raise ValueError( 548 f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types" 549 ) 550 551 # check columns specified are a supported medical coding type 552 for column in item["columns"]: 553 if column not in code_types: 554 validation_errors.append( 555 f"Column type {column} for file {concept_code_file_path} is not supported" 556 ) 557 558 # check the actions are supported 559 if "actions" in item: 560 for action in item["actions"]: 561 if action not in COL_ACTIONS: 562 validation_errors.append(f"Action {action} is not supported") 563 564 if len(validation_errors) > 0: 565 _logger.error(validation_errors) 566 raise PhenValidationException( 567 f"Configuration file {str(config_path.resolve())} failed validation", 568 validation_errors, 569 ) 570 571 _logger.info(f"Phenotype validated successfully")
Validates the phenotype directory is a git repo with standard structure
665def translate_codes( 666 source_df: pd.DataFrame, 667 target_code_type: str, 668 concept_name: str, 669 not_translate: bool, 670) -> pd.DataFrame: 671 """Translates each source code type the source coding list into a target type and returns all conversions as a concept set""" 672 673 # codes = pd.DataFrame([], dtype=str) 674 codes = pd.DataFrame( 675 columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string" 676 ) 677 # Convert codes to target type 678 _logger.info(f"Converting to target code type {target_code_type}") 679 680 for source_code_type in source_df.columns: 681 # if target code type is the same as thet source code type, no translation, just appending source as target 682 if source_code_type == target_code_type: 683 copy_df = pd.DataFrame( 684 { 685 "SOURCE_CONCEPT": source_df[source_code_type], 686 "SOURCE_CONCEPT_TYPE": source_code_type, 687 "CONCEPT": source_df[source_code_type], 688 } 689 ) 690 codes = pd.concat([codes, copy_df]) 691 _logger.debug( 692 f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating" 693 ) 694 elif not not_translate: 695 # get the translation filename using source to target code types 696 filename = f"{source_code_type}_to_{target_code_type}.parquet" 697 map_path = trud.PROCESSED_PATH / filename 698 699 # do the mapping if it exists 700 if map_path.exists(): 701 # get mapping 702 df_map = pd.read_parquet(map_path) 703 704 # do mapping 705 translated_df = pd.merge( 706 source_df[source_code_type], df_map, how="left" 707 ) 708 709 # normalise the output 710 translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"]) 711 translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type 712 713 # add to list of codes 714 codes = pd.concat([codes, translated_df]) 715 716 else: 717 _logger.warning( 718 f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" 719 ) 720 721 codes = codes.dropna() # delete NaNs 722 723 # added concept set type to output if any translations 724 if len(codes.index) > 0: 725 codes["CONCEPT_SET"] = concept_name 726 else: 727 _logger.debug(f"No codes converted with target code type {target_code_type}") 728 729 return codes
Translates each source code type the source coding list into a target type and returns all conversions as a concept set
751def write_vocab_version(phen_path: Path): 752 # write the vocab version files 753 754 if not trud.VERSION_PATH.exists(): 755 raise FileNotFoundError( 756 f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed" 757 ) 758 759 if not omop.VERSION_PATH.exists(): 760 raise FileNotFoundError( 761 f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed" 762 ) 763 764 with trud.VERSION_PATH.open("r") as file: 765 trud_version = yaml.safe_load(file) 766 767 with omop.VERSION_PATH.open("r") as file: 768 omop_version = yaml.safe_load(file) 769 770 # Create the combined YAML structure 771 version_data = { 772 "versions": { 773 "acmc": acmc.__version__, 774 "trud": trud_version, 775 "omop": omop_version, 776 } 777 } 778 779 with open(phen_path / VOCAB_VERSION_FILE, "w") as file: 780 yaml.dump( 781 version_data, 782 file, 783 Dumper=util.QuotedDumper, 784 default_flow_style=False, 785 sort_keys=False, 786 default_style='"', 787 )
790def map(phen_dir: str, target_code_type: str, not_translate: bool, no_metadata: bool): 791 _logger.info(f"Processing phenotype: {phen_dir}") 792 793 # Validate configuration 794 validate(phen_dir) 795 796 # initialise paths 797 phen_path = Path(phen_dir) 798 config_path = phen_path / CONFIG_FILE 799 800 # load configuration 801 with config_path.open("r") as file: 802 config = yaml.safe_load(file) 803 phenotype = config["phenotype"] 804 805 if len(phenotype["map"]) == 0: 806 raise ValueError(f"No map codes defined in the phenotype configuration") 807 808 if target_code_type is not None and target_code_type not in phenotype["map"]: 809 raise ValueError( 810 f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}" 811 ) 812 813 if target_code_type is not None: 814 _map_target_code_type( 815 phen_path, phenotype, target_code_type, not_translate, no_metadata 816 ) 817 else: 818 for t in phenotype["map"]: 819 _map_target_code_type(phen_path, phenotype, t, not_translate, no_metadata) 820 821 _logger.info(f"Phenotype processed successfully")
1014def add_metadata( 1015 codes: pd.DataFrame, 1016 metadata: dict, 1017 no_metadata: bool, 1018) -> pd.DataFrame: 1019 """Add concept set metadata, stored as a dictionary, to each concept row""" 1020 1021 if not no_metadata: 1022 for meta_name, meta_value in metadata.items(): 1023 codes[meta_name] = meta_value 1024 _logger.debug( 1025 f"Adding metadata for concept set: metadata name {meta_name}, metadata value {meta_value}" 1026 ) 1027 1028 return codes
Add concept set metadata, stored as a dictionary, to each concept row
1063def publish( 1064 phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC 1065): 1066 """Publishes updates to the phenotype by commiting all changes to the repo directory""" 1067 1068 # Validate config 1069 validate(phen_dir) 1070 phen_path = Path(phen_dir) 1071 1072 # load git repo and set the branch 1073 repo = git.Repo(phen_path) 1074 if DEFAULT_GIT_BRANCH in repo.branches: 1075 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 1076 main_branch.checkout() 1077 else: 1078 raise AttributeError( 1079 f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}" 1080 ) 1081 1082 # check if any changes to publish 1083 if not repo.is_dirty() and not repo.untracked_files: 1084 if remote_url is not None and "origin" not in repo.remotes: 1085 _logger.info(f"First publish to remote url {remote_url}") 1086 else: 1087 _logger.info("Nothing to publish, no changes to the repo") 1088 return 1089 1090 # get next version 1091 new_version_str = _generate_version_tag(repo, increment) 1092 _logger.info(f"New version: {new_version_str}") 1093 1094 # Write version in configuration file 1095 config_path = phen_path / CONFIG_FILE 1096 with config_path.open("r") as file: 1097 config = yaml.safe_load(file) 1098 1099 config["phenotype"]["version"] = new_version_str 1100 with open(config_path, "w") as file: 1101 yaml.dump( 1102 config, 1103 file, 1104 Dumper=util.QuotedDumper, 1105 default_flow_style=False, 1106 sort_keys=False, 1107 default_style='"', 1108 ) 1109 1110 # Add and commit changes to repo including version updates 1111 commit_message = f"Committing updates to phenotype {phen_path}" 1112 repo.git.add("--all") 1113 repo.index.commit(commit_message) 1114 1115 # Add tag to the repo 1116 repo.create_tag(new_version_str) 1117 1118 # push to origin if a remote repo 1119 if remote_url is not None and "origin" not in repo.remotes: 1120 git_url = _construct_git_url(remote_url) 1121 repo.create_remote("origin", git_url) 1122 1123 try: 1124 if "origin" in repo.remotes: 1125 _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}") 1126 origin = repo.remotes.origin 1127 _logger.info(f"Pushing main branch to remote repo") 1128 repo.git.push("--set-upstream", "origin", "main") 1129 _logger.info(f"Pushing version tags to remote git repo") 1130 origin.push(tags=True) 1131 _logger.debug("Changes pushed to 'origin'") 1132 else: 1133 _logger.debug("Remote 'origin' is not set") 1134 except Exception as e: 1135 tag_ref = repo.tags[new_version_str] 1136 repo.delete_tag(tag_ref) 1137 repo.git.reset("--soft", "HEAD~1") 1138 raise e 1139 1140 _logger.info(f"Phenotype published successfully")
Publishes updates to the phenotype by commiting all changes to the repo directory
1143def export(phen_dir: str, version: str): 1144 """Exports a phen repo at a specific tagged version into a target directory""" 1145 _logger.info(f"Exporting phenotype {phen_dir} at version {version}") 1146 1147 # validate configuration 1148 validate(phen_dir) 1149 phen_path = Path(phen_dir) 1150 1151 # load configuration 1152 config_path = phen_path / CONFIG_FILE 1153 with config_path.open("r") as file: 1154 config = yaml.safe_load(file) 1155 1156 map_path = phen_path / MAP_DIR 1157 if not map_path.exists(): 1158 _logger.warning(f"Map path does not exist '{map_path}'") 1159 1160 export_path = phen_path / OMOP_PATH 1161 # check export directory exists and if not create it 1162 if not export_path.exists(): 1163 export_path.mkdir(parents=True) 1164 _logger.debug(f"OMOP export directory '{export_path}' created.") 1165 1166 # omop export db 1167 export_db_path = omop.export( 1168 map_path, 1169 export_path, 1170 config["phenotype"]["version"], 1171 config["phenotype"]["omop"], 1172 ) 1173 1174 # write to tables 1175 # export as csv 1176 _logger.info(f"Phenotype exported successfully")
Exports a phen repo at a specific tagged version into a target directory
1179def copy(phen_dir: str, target_dir: str, version: str): 1180 """Copys a phen repo at a specific tagged version into a target directory""" 1181 1182 # Validate 1183 validate(phen_dir) 1184 phen_path = Path(phen_dir) 1185 1186 # Check target directory exists 1187 target_path = Path(target_dir) 1188 if not target_path.exists(): 1189 raise FileNotFoundError(f"The target directory {target_path} does not exist") 1190 1191 # Set copy directory 1192 copy_path = target_path / version 1193 _logger.info(f"Copying repo {phen_path} to {copy_path}") 1194 1195 if ( 1196 copy_path.exists() and copy_path.is_dir() 1197 ): # Check if it exists and is a directory 1198 copy = _check_delete_dir( 1199 copy_path, 1200 f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ", 1201 ) 1202 else: 1203 copy = True 1204 1205 if not copy: 1206 _logger.info(f"Not copying the version {version}") 1207 return 1208 1209 _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...") 1210 repo = git.Repo.clone_from(phen_path, copy_path) 1211 1212 # Check out the latest commit or specified version 1213 if version: 1214 # Checkout a specific version (e.g., branch, tag, or commit hash) 1215 _logger.info(f"Checking out version {version}...") 1216 repo.git.checkout(version) 1217 else: 1218 # Checkout the latest commit (HEAD) 1219 _logger.info(f"Checking out the latest commit...") 1220 repo.git.checkout("HEAD") 1221 1222 _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}") 1223 1224 _logger.info(f"Phenotype copied successfully")
Copys a phen repo at a specific tagged version into a target directory
1228def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]: 1229 """Extracts concepts as {name: file_path} dictionary and a name set.""" 1230 concepts_dict = { 1231 item["name"]: [file["path"] for file in item["files"]] 1232 for item in config_data["phenotype"]["concept_sets"] 1233 } 1234 name_set = set(concepts_dict.keys()) 1235 return concepts_dict, name_set
Extracts concepts as {name: file_path} dictionary and a name set.
1249def diff_config(old_config: dict, new_config: dict) -> str: 1250 report = f"\n# Changes to phenotype configuration\n" 1251 report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n" 1252 1253 old_concepts, old_names = extract_concepts(old_config) 1254 new_concepts, new_names = extract_concepts(new_config) 1255 1256 # Check added and removed concept set names 1257 added_names = new_names - old_names # Names that appear in new but not in old 1258 removed_names = old_names - new_names # Names that were in old but not in new 1259 1260 # find file path changes for unchanged names 1261 unchanged_names = old_names & new_names # Names that exist in both 1262 file_diff = DeepDiff( 1263 {name: old_concepts[name] for name in unchanged_names}, 1264 {name: new_concepts[name] for name in unchanged_names}, 1265 ) 1266 1267 # Find renamed concepts (same file, different name) 1268 renamed_concepts = [] 1269 for removed in removed_names: 1270 old_path = old_concepts[removed] 1271 for added in added_names: 1272 new_path = new_concepts[added] 1273 if old_path == new_path: 1274 renamed_concepts.append((removed, added)) 1275 1276 # Remove renamed concepts from added and removed sets 1277 for old_name, new_name in renamed_concepts: 1278 added_names.discard(new_name) 1279 removed_names.discard(old_name) 1280 1281 # generate config report 1282 if added_names: 1283 report += "## Added Concepts\n" 1284 for name in added_names: 1285 report += f"- `{name}` (File: `{new_concepts[name]}`)\n" 1286 report += "\n" 1287 1288 if removed_names: 1289 report += "## Removed Concepts\n" 1290 for name in removed_names: 1291 report += f"- `{name}` (File: `{old_concepts[name]}`)\n" 1292 report += "\n" 1293 1294 if renamed_concepts: 1295 report += "## Renamed Concepts\n" 1296 for old_name, new_name in renamed_concepts: 1297 report += ( 1298 f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n" 1299 ) 1300 report += "\n" 1301 1302 if "values_changed" in file_diff: 1303 report += "## Updated File Paths\n" 1304 for name, change in file_diff["values_changed"].items(): 1305 old_file = change["old_value"] 1306 new_file = change["new_value"] 1307 clean_name = name.split("root['")[1].split("']")[0] 1308 report += ( 1309 f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n" 1310 ) 1311 report += "\n" 1312 1313 if not ( 1314 added_names 1315 or removed_names 1316 or renamed_concepts 1317 or file_diff.get("values_changed") 1318 ): 1319 report += "No changes in concept sets.\n" 1320 1321 return report
1324def diff_map_files(old_map_path: Path, new_map_path: Path) -> str: 1325 old_output_files = [ 1326 file.name 1327 for file in old_map_path.iterdir() 1328 if file.is_file() and not file.name.startswith(".") 1329 ] 1330 new_output_files = [ 1331 file.name 1332 for file in new_map_path.iterdir() 1333 if file.is_file() and not file.name.startswith(".") 1334 ] 1335 1336 # Convert the lists to sets for easy comparison 1337 old_output_set = set(old_output_files) 1338 new_output_set = set(new_output_files) 1339 1340 # Outputs that are in old_output_set but not in new_output_set (removed files) 1341 removed_outputs = old_output_set - new_output_set 1342 # Outputs that are in new_output_set but not in old_output_set (added files) 1343 added_outputs = new_output_set - old_output_set 1344 # Outputs that are the intersection of old_output_set and new_output_set 1345 common_outputs = old_output_set & new_output_set 1346 1347 report = f"\n# Changes to available translations\n" 1348 report += f"This compares the coding translations files available.\n\n" 1349 report += f"- Removed outputs: {sorted(list(removed_outputs))}\n" 1350 report += f"- Added outputs: {sorted(list(added_outputs))}\n" 1351 report += f"- Common outputs: {sorted(list(common_outputs))}\n\n" 1352 1353 # Step N: Compare common outputs between versions 1354 report += f"# Changes to concepts in translation files\n\n" 1355 report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n" 1356 for file in common_outputs: 1357 old_output = old_map_path / file 1358 new_output = new_map_path / file 1359 1360 _logger.debug(f"Old ouptput: {str(old_output.resolve())}") 1361 _logger.debug(f"New ouptput: {str(new_output.resolve())}") 1362 1363 df1 = pd.read_csv(old_output) 1364 df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1365 df2 = pd.read_csv(new_output) 1366 df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1367 1368 # Check for added and removed concepts 1369 report += f"- File {file}\n" 1370 sorted_list = sorted(list(set(df1.index) - set(df2.index))) 1371 report += f"- Removed concepts {sorted_list}\n" 1372 sorted_list = sorted(list(set(df2.index) - set(df1.index))) 1373 report += f"- Added concepts {sorted_list}\n" 1374 1375 # Check for changed concepts 1376 diff = df2 - df1 # diff in counts 1377 diff = diff[ 1378 (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna() 1379 ] # get non-zero counts 1380 s = "\n" 1381 if len(diff.index) > 0: 1382 for concept, row in diff.iterrows(): 1383 s += "\t - {} {}\n".format(concept, row["CONCEPT"]) 1384 report += f"- Changed concepts {s}\n\n" 1385 else: 1386 report += f"- Changed concepts []\n\n" 1387 1388 return report
1391def diff_phen( 1392 new_phen_path: Path, 1393 new_version: str, 1394 old_phen_path: Path, 1395 old_version: str, 1396 report_path: Path, 1397 not_check_config: bool, 1398): 1399 """Compare the differences between two versions of a phenotype""" 1400 1401 # write report heading 1402 report = f"# Phenotype Comparison Report\n" 1403 1404 # Step 1: check differences configuration files 1405 if not not_check_config: 1406 # validate phenotypes 1407 _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}") 1408 validate(str(old_phen_path.resolve())) 1409 _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}") 1410 validate(str(new_phen_path.resolve())) 1411 1412 # get old and new config 1413 old_config_path = old_phen_path / CONFIG_FILE 1414 with old_config_path.open("r") as file: 1415 old_config = yaml.safe_load(file) 1416 new_config_path = new_phen_path / CONFIG_FILE 1417 with new_config_path.open("r") as file: 1418 new_config = yaml.safe_load(file) 1419 1420 # write report 1421 report += f"## Original phenotype\n" 1422 report += f" - {old_config['phenotype']['omop']['vocabulary_id']}\n" 1423 report += f" - {old_version}\n" 1424 report += f" - {str(old_phen_path.resolve())}\n" 1425 report += f"## Changed phenotype:\n" 1426 report += f" - {new_config['phenotype']['omop']['vocabulary_id']}\n" 1427 report += f" - {new_version}\n" 1428 report += f" - {str(new_phen_path.resolve())}\n" 1429 1430 # Convert list of dicts into a dict: {name: file} 1431 report += diff_config(old_config, new_config) 1432 1433 # Step 2: check differences between map files 1434 # List files from output directories 1435 old_map_path = old_phen_path / MAP_DIR 1436 new_map_path = new_phen_path / MAP_DIR 1437 report += diff_map_files(old_map_path, new_map_path) 1438 1439 # initialise report file 1440 _logger.debug(f"Writing to report file {str(report_path.resolve())}") 1441 report_file = open(report_path, "w") 1442 report_file.write(report) 1443 report_file.close() 1444 1445 _logger.info(f"Phenotypes diff'd successfully")
Compare the differences between two versions of a phenotype
1448def diff( 1449 phen_dir: str, 1450 version: str, 1451 old_phen_dir: str, 1452 old_version: str, 1453 not_check_config: bool, 1454): 1455 # make tmp directory .acmc 1456 timestamp = time.strftime("%Y%m%d_%H%M%S") 1457 temp_dir = Path(f".acmc/diff_{timestamp}") 1458 1459 changed_phen_path = Path(phen_dir) 1460 if not changed_phen_path.exists(): 1461 raise ValueError( 1462 f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}" 1463 ) 1464 1465 old_phen_path = Path(old_phen_dir) 1466 if not old_phen_path.exists(): 1467 raise ValueError( 1468 f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}" 1469 ) 1470 1471 # t_path = old_phen_path / "config.yml" 1472 # with t_path.open("r") as file: 1473 # c = yaml.safe_load(file) 1474 1475 try: 1476 # Create the directory 1477 temp_dir.mkdir(parents=True, exist_ok=True) 1478 _logger.debug(f"Temporary directory created: {temp_dir}") 1479 1480 # Create temporary directories 1481 changed_path = temp_dir / "changed" 1482 changed_path.mkdir(parents=True, exist_ok=True) 1483 old_path = temp_dir / "old" 1484 old_path.mkdir(parents=True, exist_ok=True) 1485 1486 # checkout changed 1487 if version == "latest": 1488 _logger.debug( 1489 f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..." 1490 ) 1491 shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True) 1492 else: 1493 _logger.debug( 1494 f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..." 1495 ) 1496 changed_repo = git.Repo.clone_from(changed_phen_path, changed_path) 1497 changed_repo.git.checkout(version) 1498 1499 # checkout old 1500 if old_version == "latest": 1501 _logger.debug( 1502 f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1503 ) 1504 shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True) 1505 else: 1506 _logger.debug( 1507 f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1508 ) 1509 old_repo = git.Repo.clone_from(old_phen_dir, old_path) 1510 old_repo.git.checkout(old_version) 1511 1512 report_filename = f"{version}_{old_version}_diff.md" 1513 report_path = changed_phen_path / report_filename 1514 # diff old with new 1515 diff_phen( 1516 changed_path, version, old_path, old_version, report_path, not_check_config 1517 ) 1518 1519 finally: 1520 # clean up tmp directory 1521 if temp_dir.exists(): 1522 shutil.rmtree(temp_dir)