acmc.phen
phenotype.py module
This module provides functionality for managing phenotypes.
1""" 2phenotype.py module 3 4This module provides functionality for managing phenotypes. 5""" 6 7import argparse 8import pandas as pd 9import numpy as np 10import json 11import os 12import sqlite3 13import sys 14import shutil 15import time 16import git 17import re 18import logging 19import requests 20import yaml 21import semver 22from git import Repo 23from cerberus import Validator # type: ignore 24from deepdiff import DeepDiff 25from pathlib import Path 26from urllib.parse import urlparse, urlunparse 27from typing import Tuple, Set, Any 28import acmc 29from acmc import trud, omop, parse, util, logging_config as lc 30 31# set up logging 32_logger = lc.setup_logger() 33 34pd.set_option("mode.chained_assignment", None) 35 36PHEN_DIR = "phen" 37"""Default phenotype directory name""" 38 39DEFAULT_PHEN_PATH = Path("./workspace") / PHEN_DIR 40"""Default phenotype directory path""" 41 42CONCEPTS_DIR = "concepts" 43"""Default concepts directory name""" 44 45MAP_DIR = "map" 46"""Default map directory name""" 47 48CONCEPT_SET_DIR = "concept-sets" 49"""Default concept set directory name""" 50 51CSV_PATH = Path(CONCEPT_SET_DIR) / "csv" 52"""Default CSV concept set directory path""" 53 54OMOP_PATH = Path(CONCEPT_SET_DIR) / "omop" 55"""Default OMOP concept set directory path""" 56 57DEFAULT_PHEN_DIR_LIST = [CONCEPTS_DIR, MAP_DIR, CONCEPT_SET_DIR] 58"""List of default phenotype directories""" 59 60CONFIG_FILE = "config.yml" 61"""Default configuration filename""" 62 63VOCAB_VERSION_FILE = "vocab_version.yml" 64"""Default vocabulary version filename""" 65 66SEMANTIC_VERSION_TYPES = ["major", "minor", "patch"] 67"""List of semantic version increment types""" 68 69DEFAULT_VERSION_INC = "patch" 70"""Default semantic version increment type""" 71 72DEFAULT_GIT_BRANCH = "main" 73"""Default phenotype repo branch name""" 74 75SPLIT_COL_ACTION = "split_col" 76"""Split column preprocessing action type""" 77 78CODES_COL_ACTION = "codes_col" 79"""Codes column preprocessing action type""" 80 81DIVIDE_COL_ACTION = "divide_col" 82"""Divide column preprocessing action type""" 83 84COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION] 85"""List of column preprocessing action types""" 86 87CODE_FILE_TYPES = [".xlsx", ".xls", ".csv"] 88"""List of supported source concept coding list file types""" 89 90# config.yaml schema 91CONFIG_SCHEMA = { 92 "phenotype": { 93 "type": "dict", 94 "required": True, 95 "schema": { 96 "version": { 97 "type": "string", 98 "required": True, 99 "regex": r"^\d+\.\d+\.\d+$", # Enforces 'vN.N.N' format 100 }, 101 "omop": { 102 "type": "dict", 103 "required": True, 104 "schema": { 105 "vocabulary_id": {"type": "string", "required": True}, 106 "vocabulary_name": {"type": "string", "required": True}, 107 "vocabulary_reference": { 108 "type": "string", 109 "required": True, 110 "regex": r"^https?://.*", # Ensures it's a URL 111 }, 112 }, 113 }, 114 "map": { 115 "type": "list", 116 "schema": { 117 "type": "string", 118 "allowed": list( 119 parse.SUPPORTED_CODE_TYPES 120 ), # Ensure only predefined values are allowed 121 }, 122 }, 123 "concept_sets": { 124 "type": "list", 125 "required": True, 126 "schema": { 127 "type": "dict", 128 "schema": { 129 "name": {"type": "string", "required": True}, 130 "files": { 131 "type": "list", 132 "required": True, 133 "schema": { 134 "type": "dict", 135 "schema": { 136 "path": {"type": "string", "required": True}, 137 "columns": {"type": "dict", "required": True}, 138 "category": { 139 "type": "string" 140 }, # Optional but must be string if present 141 "actions": { 142 "type": "dict", 143 "schema": { 144 "divide_col": {"type": "string"}, 145 "split_col": {"type": "string"}, 146 "codes_col": {"type": "string"}, 147 }, 148 }, 149 }, 150 }, 151 }, 152 "metadata": {"type": "dict", "required": False}, 153 }, 154 }, 155 }, 156 }, 157 } 158} 159"""Phenotype config.yml schema definition""" 160 161 162class PhenValidationException(Exception): 163 """Custom exception class raised when validation errors in phenotype configuration file""" 164 165 def __init__(self, message, validation_errors=None): 166 super().__init__(message) 167 self.validation_errors = validation_errors 168 169 170def _construct_git_url(remote_url: str): 171 """Constructs a git url for github or gitlab including a PAT token environment variable""" 172 # check the url 173 parsed_url = urlparse(remote_url) 174 175 # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd 176 # need to update this function if the URL scheme is different. 177 if "github.com" in parsed_url.netloc: 178 # get GitHub PAT from environment variable 179 auth = os.getenv("ACMC_GITHUB_PAT") 180 if not auth: 181 raise ValueError( 182 "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable." 183 ) 184 else: 185 # get GitLab PAT from environment variable 186 auth = os.getenv("ACMC_GITLAB_PAT") 187 if not auth: 188 raise ValueError( 189 "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable." 190 ) 191 auth = f"oauth2:{auth}" 192 193 # Construct the new URL with credentials 194 new_netloc = f"{auth}@{parsed_url.netloc}" 195 return urlunparse( 196 ( 197 parsed_url.scheme, 198 new_netloc, 199 parsed_url.path, 200 parsed_url.params, 201 parsed_url.query, 202 parsed_url.fragment, 203 ) 204 ) 205 206 207def _create_empty_git_dir(path: Path): 208 """Creates a directory with a .gitkeep file so that it's tracked in git""" 209 path.mkdir(exist_ok=True) 210 keep_path = path / ".gitkeep" 211 keep_path.touch(exist_ok=True) 212 213 214def _check_delete_dir(path: Path, msg: str) -> bool: 215 """Checks on the command line if a user wants to delete a directory 216 217 Args: 218 path (Path): path of the directory to be deleted 219 msg (str): message to be displayed to the user 220 221 Returns: 222 Boolean: True if deleted 223 """ 224 deleted = False 225 226 user_input = input(f"{msg}").strip().lower() 227 if user_input in ["yes", "y"]: 228 shutil.rmtree(path) 229 deleted = True 230 else: 231 _logger.info("Directory was not deleted.") 232 233 return deleted 234 235 236def init(phen_dir: str, remote_url: str): 237 """Initial phenotype directory as git repo with standard structure""" 238 _logger.info(f"Initialising Phenotype in directory: {phen_dir}") 239 phen_path = Path(phen_dir) 240 241 # check if directory already exists and ask user if they want to recreate it 242 if ( 243 phen_path.exists() and phen_path.is_dir() 244 ): # Check if it exists and is a directory 245 configure = _check_delete_dir( 246 phen_path, 247 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 248 ) 249 else: 250 configure = True 251 252 if not configure: 253 _logger.info(f"Exiting, phenotype not initiatised") 254 return 255 256 # Initialise repo from local or remote 257 repo: Repo 258 259 # if remote then clone the repo otherwise init a local repo 260 if remote_url != None: 261 # add PAT token to the URL 262 git_url = _construct_git_url(remote_url) 263 264 # clone the repo 265 git_cmd = git.cmd.Git() 266 git_cmd.clone(git_url, phen_path) 267 268 # open repo 269 repo = Repo(phen_path) 270 # check if there are any commits (new repo has no commits) 271 if ( 272 len(repo.branches) == 0 or repo.head.is_detached 273 ): # Handle detached HEAD (e.g., after init) 274 _logger.debug("The phen repository has no commits yet.") 275 commit_count = 0 276 else: 277 # Get the total number of commits in the default branch 278 commit_count = sum(1 for _ in repo.iter_commits()) 279 _logger.debug(f"Repo has previous commits: {commit_count}") 280 else: 281 # local repo, create the directories and init 282 phen_path.mkdir(parents=True, exist_ok=True) 283 _logger.debug(f"Phen directory '{phen_path}' has been created.") 284 repo = git.Repo.init(phen_path) 285 commit_count = 0 286 287 phen_path = phen_path.resolve() 288 # initialise empty repos 289 if commit_count == 0: 290 # create initial commit 291 initial_file_path = phen_path / "README.md" 292 with open(initial_file_path, "w") as file: 293 file.write( 294 "# Initial commit\nThis is the first commit in the phen repository.\n" 295 ) 296 repo.index.add([initial_file_path]) 297 repo.index.commit("Initial commit") 298 commit_count = 1 299 300 # Checkout the phens default branch, creating it if it does not exist 301 if DEFAULT_GIT_BRANCH in repo.branches: 302 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 303 main_branch.checkout() 304 else: 305 main_branch = repo.create_head(DEFAULT_GIT_BRANCH) 306 main_branch.checkout() 307 308 # if the phen path does not contain the config file then initialise the phen type 309 config_path = phen_path / CONFIG_FILE 310 if config_path.exists(): 311 _logger.debug(f"Phenotype configuration files already exist") 312 return 313 314 _logger.info("Creating phen directory structure and config files") 315 for d in DEFAULT_PHEN_DIR_LIST: 316 _create_empty_git_dir(phen_path / d) 317 318 # create empty phen config file 319 config = { 320 "phenotype": { 321 "version": "0.0.0", 322 "omop": { 323 "vocabulary_id": "", 324 "vocabulary_name": "", 325 "vocabulary_reference": "", 326 }, 327 "translate": [], 328 "concept_sets": [], 329 } 330 } 331 332 with open(phen_path / CONFIG_FILE, "w") as file: 333 yaml.dump( 334 config, 335 file, 336 Dumper=util.QuotedDumper, 337 default_flow_style=False, 338 sort_keys=False, 339 default_style='"', 340 ) 341 342 # add git ignore 343 ignore_content = """# Ignore SQLite database files 344*.db 345*.sqlite3 346 347# Ignore SQLite journal and metadata files 348*.db-journal 349*.sqlite3-journal 350 351# python 352.ipynb_checkpoints 353 """ 354 ignore_path = phen_path / ".gitignore" 355 with open(ignore_path, "w") as file: 356 file.write(ignore_content) 357 358 # add to git repo and commit 359 for d in DEFAULT_PHEN_DIR_LIST: 360 repo.git.add(phen_path / d) 361 repo.git.add(all=True) 362 repo.index.commit("initialised the phen git repo.") 363 364 _logger.info(f"Phenotype initialised successfully") 365 366 367def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str): 368 """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin" 369 370 Args: 371 phen_dir (str): local directory path where the upstream repo is to be cloned 372 upstream_url (str): url to the upstream repo 373 upstream_version (str): version in the upstream repo to clone 374 new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None. 375 376 Raises: 377 ValueError: if the specified version is not in the upstream repo 378 ValueError: if the upstream repo is not a valid phenotype repo 379 ValueError: if there's any other problems with Git 380 """ 381 _logger.info( 382 f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}" 383 ) 384 385 phen_path = Path(phen_dir) 386 # check if directory already exists and ask user if they want to recreate it 387 if ( 388 phen_path.exists() and phen_path.is_dir() 389 ): # Check if it exists and is a directory 390 configure = _check_delete_dir( 391 phen_path, 392 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 393 ) 394 else: 395 configure = True 396 397 if not configure: 398 _logger.info(f"Exiting, phenotype not initiatised") 399 return 400 401 try: 402 # Clone repo 403 git_url = _construct_git_url(upstream_url) 404 repo = git.Repo.clone_from(git_url, phen_path) 405 406 # Fetch all branches and tags 407 repo.remotes.origin.fetch() 408 409 # Check if the version exists 410 available_refs = [ref.name.split("/")[-1] for ref in repo.references] 411 if upstream_version not in available_refs: 412 raise ValueError( 413 f"Version '{upstream_version}' not found in the repository: {upstream_url}." 414 ) 415 416 # Checkout the specified version 417 repo.git.checkout(upstream_version) 418 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 419 main_branch.checkout() 420 421 # Check if 'config.yml' exists in the root directory 422 config_path = phen_path / "config.yml" 423 if not os.path.isfile(config_path): 424 raise ValueError( 425 f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory." 426 ) 427 428 # Validate the phenotype is compatible with the acmc tool 429 validate(str(phen_path.resolve())) 430 431 # Delete each tag locally 432 tags = repo.tags 433 for tag in tags: 434 repo.delete_tag(tag) 435 _logger.debug(f"Deleted tags from forked repo: {tag}") 436 437 # Add upstream remote 438 repo.create_remote("upstream", upstream_url) 439 remote = repo.remotes["origin"] 440 repo.delete_remote(remote) # Remove existing origin 441 442 # Optionally set a new origin remote 443 if new_origin_url: 444 git_url = _construct_git_url(new_origin_url) 445 repo.create_remote("origin", git_url) 446 repo.git.push("--set-upstream", "origin", "main") 447 448 _logger.info(f"Repository forked successfully at {phen_path}") 449 _logger.info(f"Upstream set to {upstream_url}") 450 if new_origin_url: 451 _logger.info(f"Origin set to {new_origin_url}") 452 453 except Exception as e: 454 if phen_path.exists(): 455 shutil.rmtree(phen_path) 456 raise ValueError(f"Error occurred during repository fork: {str(e)}") 457 458 459def validate(phen_dir: str): 460 """Validates the phenotype directory is a git repo with standard structure""" 461 _logger.info(f"Validating phenotype: {phen_dir}") 462 phen_path = Path(phen_dir) 463 if not phen_path.is_dir(): 464 raise NotADirectoryError( 465 f"Error: '{str(phen_path.resolve())}' is not a directory" 466 ) 467 468 config_path = phen_path / CONFIG_FILE 469 if not config_path.is_file(): 470 raise FileNotFoundError( 471 f"Error: phen configuration file '{config_path}' does not exist." 472 ) 473 474 concepts_path = phen_path / CONCEPTS_DIR 475 if not concepts_path.is_dir(): 476 raise FileNotFoundError( 477 f"Error: source concepts directory {concepts_path} does not exist." 478 ) 479 480 # Calidate the directory is a git repo 481 try: 482 git.Repo(phen_path) 483 except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError): 484 raise Exception(f"Phen directory {phen_path} is not a git repo") 485 486 # Load configuration File 487 if config_path.suffix == ".yml": 488 try: 489 with config_path.open("r") as file: 490 phenotype = yaml.safe_load(file) 491 492 validator = Validator(CONFIG_SCHEMA) 493 if validator.validate(phenotype): 494 _logger.debug("YAML structure is valid.") 495 else: 496 _logger.error(f"YAML structure validation failed: {validator.errors}") 497 raise Exception(f"YAML structure validation failed: {validator.errors}") 498 except yaml.YAMLError as e: 499 _logger.error(f"YAML syntax error: {e}") 500 raise e 501 else: 502 raise Exception( 503 f"Unsupported configuration filetype: {str(config_path.resolve())}" 504 ) 505 506 # initiatise 507 validation_errors = [] 508 phenotype = phenotype["phenotype"] 509 code_types = parse.CodeTypeParser().code_types 510 511 # check the version number is of the format vn.n.n 512 match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"]) 513 if not match: 514 validation_errors.append( 515 f"Invalid version format in configuration file: {phenotype['version']}" 516 ) 517 518 # create a list of all the concept set names defined in the concept set configuration 519 concept_set_names = [] 520 for item in phenotype["concept_sets"]: 521 if item["name"] in concept_set_names: 522 validation_errors.append( 523 f"Duplicate concept set defined in concept sets {item['name'] }" 524 ) 525 else: 526 concept_set_names.append(item["name"]) 527 528 # check codes definition 529 for files in phenotype["concept_sets"]: 530 for item in files["files"]: 531 # check concepte code file exists 532 concept_code_file_path = concepts_path / item["path"] 533 if not concept_code_file_path.exists(): 534 validation_errors.append( 535 f"Coding file {str(concept_code_file_path.resolve())} does not exist" 536 ) 537 538 # check concepte code file is not empty 539 if concept_code_file_path.stat().st_size == 0: 540 validation_errors.append( 541 f"Coding file {str(concept_code_file_path.resolve())} is an empty file" 542 ) 543 544 # check code file type is supported 545 if concept_code_file_path.suffix not in CODE_FILE_TYPES: 546 raise ValueError( 547 f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types" 548 ) 549 550 # check columns specified are a supported medical coding type 551 for column in item["columns"]: 552 if column not in code_types: 553 validation_errors.append( 554 f"Column type {column} for file {concept_code_file_path} is not supported" 555 ) 556 557 # check the actions are supported 558 if "actions" in item: 559 for action in item["actions"]: 560 if action not in COL_ACTIONS: 561 validation_errors.append(f"Action {action} is not supported") 562 563 if len(validation_errors) > 0: 564 _logger.error(validation_errors) 565 raise PhenValidationException( 566 f"Configuration file {str(config_path.resolve())} failed validation", 567 validation_errors, 568 ) 569 570 _logger.info(f"Phenotype validated successfully") 571 572 573def _read_table_file(path: Path, excel_sheet: str = ""): 574 """ 575 Load Code List File 576 """ 577 578 path = path.resolve() 579 if path.suffix == ".csv": 580 df = pd.read_csv(path, dtype=str) 581 elif path.suffix == ".xlsx" or path.suffix == ".xls": 582 if excel_sheet != "": 583 df = pd.read_excel(path, sheet_name=excel_sheet, dtype=str) 584 else: 585 df = pd.read_excel(path, dtype=str) 586 elif path.suffix == ".dta": 587 df = pd.read_stata(path) 588 else: 589 raise ValueError( 590 f"Unsupported filetype {path.suffix}, only support{CODE_FILE_TYPES} code file types" 591 ) 592 593 return df 594 595 596def _process_actions(df: pd.DataFrame, concept_set: dict) -> pd.DataFrame: 597 # Perform Structural Changes to file before preprocessing 598 _logger.debug("Processing file structural actions") 599 if ( 600 "actions" in concept_set 601 and "split_col" in concept_set["actions"] 602 and "codes_col" in concept_set["actions"] 603 ): 604 split_col = concept_set["actions"]["split_col"] 605 codes_col = concept_set["actions"]["codes_col"] 606 _logger.debug( 607 "Action: Splitting", 608 split_col, 609 "column into:", 610 df[split_col].unique(), 611 ) 612 codes = df[codes_col] 613 oh = pd.get_dummies(df[split_col], dtype=bool) # one hot encode 614 oh = oh.where((oh != True), codes, axis=0) # fill in 1s with codes 615 oh[oh == False] = np.nan # replace 0s with None 616 df = pd.concat([df, oh], axis=1) # merge in new columns 617 618 return df 619 620 621def _preprocess_source_concepts( 622 df: pd.DataFrame, concept_set: dict, code_file_path: Path 623) -> Tuple[pd.DataFrame, list]: 624 """Perform QA Checks on columns individually and append to df""" 625 out = pd.DataFrame([]) # create output df to append to 626 code_errors = [] # list of errors from processing 627 628 # remove unnamed columns due to extra commas, missing headers, or incorrect parsing 629 df = df.drop(columns=[col for col in df.columns if "Unnamed" in col]) 630 631 # Preprocess codes 632 code_types = parse.CodeTypeParser().code_types 633 for code_type in concept_set["columns"]: 634 parser = code_types[code_type] 635 _logger.info(f"Processing {code_type} codes for {code_file_path}") 636 637 # get codes by column name 638 source_col_name = concept_set["columns"][code_type] 639 codes = df[source_col_name].dropna() 640 codes = codes.astype(str) # convert to string 641 codes = codes.str.strip() # remove excess spaces 642 643 # process codes, validating them using parser and returning the errors 644 codes, errors = parser.process(codes, code_file_path) 645 if len(errors) > 0: 646 code_errors.extend(errors) 647 _logger.warning(f"Codes validation failed with {len(errors)} errors") 648 649 # add processed codes to df 650 new_col_name = f"{source_col_name}_SOURCE" 651 df = df.rename(columns={source_col_name: new_col_name}) 652 process_codes = pd.DataFrame({code_type: codes}).join(df) 653 out = pd.concat( 654 [out, process_codes], 655 ignore_index=True, 656 ) 657 658 _logger.debug(out.head()) 659 660 return out, code_errors 661 662 663# Translate Df with multiple codes into single code type Series 664def translate_codes( 665 source_df: pd.DataFrame, 666 target_code_type: str, 667 concept_name: str, 668 not_translate: bool, 669 do_reverse_translate: bool, 670) -> pd.DataFrame: 671 """Translates each source code type the source coding list into a target type and returns all conversions as a concept set""" 672 673 # codes = pd.DataFrame([], dtype=str) 674 codes = pd.DataFrame( 675 columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string" 676 ) 677 # Convert codes to target type 678 _logger.info(f"Converting to target code type {target_code_type}") 679 680 for source_code_type in source_df.columns: 681 # if target code type is the same as thet source code type, no translation, just appending source as target 682 if source_code_type == target_code_type: 683 copy_df = pd.DataFrame( 684 { 685 "SOURCE_CONCEPT": source_df[source_code_type], 686 "SOURCE_CONCEPT_TYPE": source_code_type, 687 "CONCEPT": source_df[source_code_type], 688 } 689 ) 690 codes = pd.concat([codes, copy_df]) 691 _logger.debug( 692 f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating" 693 ) 694 elif not not_translate: 695 # get the translation filename using source to target code types 696 filename = f"{source_code_type}_to_{target_code_type}.parquet" 697 map_path = trud.PROCESSED_PATH / filename 698 699 filename_reversed = f"{target_code_type}_to_{source_code_type}.parquet" 700 map_path_reversed = trud.PROCESSED_PATH / filename_reversed 701 702 # do the mapping if it exists 703 if map_path.exists(): 704 codes = _translate_codes(map_path, source_df, source_code_type, codes) 705 # otherwise do reverse mapping if enabled and it exists 706 elif do_reverse_translate and map_path_reversed.exists(): 707 codes = _translate_codes( 708 map_path_reversed, source_df, source_code_type, codes, reverse=True 709 ) 710 else: 711 _logger.warning( 712 f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" 713 ) 714 715 codes = codes.dropna() # delete NaNs 716 717 # print(codes) 718 719 # added concept set type to output if any translations 720 if len(codes.index) > 0: 721 codes["CONCEPT_SET"] = concept_name 722 else: 723 # print("NO CODES CONVERTED") 724 _logger.debug(f"No codes converted with target code type {target_code_type}") 725 726 return codes 727 728 729def _translate_codes(map_path, source_df, source_code_type, codes, reverse=False) -> pd.DataFrame: 730 # get mapping 731 df_map = pd.read_parquet(map_path) 732 733 # do mapping 734 if not(reverse): 735 translated_df = pd.merge(source_df[source_code_type], df_map, how="left") 736 # print("NORMAL")#, source_df, df_map) 737 else: 738 translated_df = pd.merge(source_df[source_code_type], df_map, how="left") #output codes from target as reversed 739 # print("REVERSED")#, source_df, df_map) 740 741 # normalise the output 742 translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"]) 743 translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type 744 # print(translated_df, codes) 745 746 # add to list of codes 747 codes = pd.concat([codes, translated_df]) 748 749 return codes 750 751 752def _write_code_errors(code_errors: list, code_errors_path: Path): 753 err_df = pd.DataFrame( 754 [ 755 { 756 "CONCEPT": ", ".join(err.codes[~err.mask].tolist()), 757 "VOCABULARY": err.code_type, 758 "SOURCE": err.codes_file, 759 "CAUSE": err.message, 760 } 761 for err in code_errors 762 if err.mask is not None 763 ] 764 ) 765 766 err_df = err_df.drop_duplicates() # Remove Duplicates from Error file 767 err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"]) 768 err_df.to_csv(code_errors_path, index=False, mode="w") 769 770 771def write_vocab_version(phen_path: Path): 772 # write the vocab version files 773 774 if not trud.VERSION_PATH.exists(): 775 raise FileNotFoundError( 776 f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed" 777 ) 778 779 if not omop.VERSION_PATH.exists(): 780 raise FileNotFoundError( 781 f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed" 782 ) 783 784 with trud.VERSION_PATH.open("r") as file: 785 trud_version = yaml.safe_load(file) 786 787 with omop.VERSION_PATH.open("r") as file: 788 omop_version = yaml.safe_load(file) 789 790 # Create the combined YAML structure 791 version_data = { 792 "versions": { 793 "acmc": acmc.__version__, 794 "trud": trud_version, 795 "omop": omop_version, 796 } 797 } 798 799 with open(phen_path / VOCAB_VERSION_FILE, "w") as file: 800 yaml.dump( 801 version_data, 802 file, 803 Dumper=util.QuotedDumper, 804 default_flow_style=False, 805 sort_keys=False, 806 default_style='"', 807 ) 808 809 810def map( 811 phen_dir: str, 812 target_code_type: str, 813 not_translate: bool, 814 no_metadata: bool, 815 do_reverse_translate: bool, 816): 817 _logger.info(f"Processing phenotype: {phen_dir}") 818 819 # Validate configuration 820 validate(phen_dir) 821 822 # initialise paths 823 phen_path = Path(phen_dir) 824 config_path = phen_path / CONFIG_FILE 825 826 # load configuration 827 with config_path.open("r") as file: 828 config = yaml.safe_load(file) 829 phenotype = config["phenotype"] 830 831 if len(phenotype["map"]) == 0: 832 raise ValueError(f"No map codes defined in the phenotype configuration") 833 834 if target_code_type is not None and target_code_type not in phenotype["map"]: 835 raise ValueError( 836 f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}" 837 ) 838 839 if target_code_type is not None: 840 _map_target_code_type( 841 phen_path, 842 phenotype, 843 target_code_type, 844 not_translate, 845 no_metadata, 846 do_reverse_translate, 847 ) 848 else: 849 for t in phenotype["map"]: 850 _map_target_code_type( 851 phen_path, 852 phenotype, 853 t, 854 not_translate, 855 no_metadata, 856 do_reverse_translate, 857 ) 858 859 _logger.info(f"Phenotype processed successfully") 860 861 862def _map_target_code_type( 863 phen_path: Path, 864 phenotype: dict, 865 target_code_type: str, 866 not_translate: bool, 867 no_metadata: bool, 868 do_reverse_translate: bool, 869): 870 _logger.debug(f"Target coding format: {target_code_type}") 871 concepts_path = phen_path / CONCEPTS_DIR 872 # Create output dataframe 873 out = pd.DataFrame([]) 874 code_errors = [] 875 876 # Process each folder in codes section 877 for files in phenotype["concept_sets"]: 878 concept_set_name = files["name"] 879 if "metadata" in files: 880 concept_set_metadata = files["metadata"] 881 else: 882 concept_set_metadata = {} 883 for concept_set in files["files"]: 884 _logger.debug(f"--- {concept_set} ---") 885 886 # Load code file 887 codes_file_path = Path(concepts_path / concept_set["path"]) 888 df = _read_table_file(codes_file_path) 889 890 # process structural actions 891 df = _process_actions(df, concept_set) 892 893 # preprocessing and validate of source concepts 894 _logger.debug("Processing and validating source concept codes") 895 df, errors = _preprocess_source_concepts( 896 df, 897 concept_set, 898 codes_file_path, 899 ) 900 901 # create df with just the source code columns 902 source_column_names = list(concept_set["columns"].keys()) 903 source_df = df[source_column_names] 904 905 _logger.debug(source_df.columns) 906 _logger.debug(source_df.head()) 907 908 _logger.debug( 909 f"Length of errors from _preprocess_source_concepts {len(errors)}" 910 ) 911 if len(errors) > 0: 912 code_errors.extend(errors) 913 _logger.debug(f" Length of code_errors {len(code_errors)}") 914 915 # Map source concepts codes to target codes 916 # if processing a source coding list with categorical data 917 if ( 918 "actions" in concept_set 919 and "divide_col" in concept_set["actions"] 920 and len(df) > 0 921 ): 922 divide_col = concept_set["actions"]["divide_col"] 923 _logger.debug(f"Action: Dividing Table by {divide_col}") 924 _logger.debug(f"column into: {df[divide_col].unique()}") 925 df_grp = df.groupby(divide_col) 926 for cat, grp in df_grp: 927 if cat == concept_set["category"]: 928 grp = grp.drop( 929 columns=[divide_col] 930 ) # delete categorical column 931 source_df = grp[source_column_names] 932 trans_out = translate_codes( 933 source_df, 934 target_code_type=target_code_type, 935 concept_name=concept_set_name, 936 not_translate=not_translate, 937 do_reverse_translate=do_reverse_translate, 938 ) 939 trans_out = add_metadata( 940 codes=trans_out, 941 metadata=concept_set_metadata, 942 no_metadata=no_metadata, 943 ) 944 out = pd.concat([out, trans_out]) 945 else: 946 source_df = df[source_column_names] 947 trans_out = translate_codes( 948 source_df, 949 target_code_type=target_code_type, 950 concept_name=concept_set_name, 951 not_translate=not_translate, 952 do_reverse_translate=do_reverse_translate, 953 ) 954 trans_out = add_metadata( 955 codes=trans_out, 956 metadata=concept_set_metadata, 957 no_metadata=no_metadata, 958 ) 959 out = pd.concat([out, trans_out]) 960 961 if len(code_errors) > 0: 962 _logger.error(f"The map processing has {len(code_errors)} errors") 963 error_path = phen_path / MAP_DIR / "errors" 964 error_path.mkdir(parents=True, exist_ok=True) 965 error_filename = f"{target_code_type}-code-errors.csv" 966 _write_code_errors(code_errors, error_path / error_filename) 967 968 # Check there is output from processing 969 if len(out.index) == 0: 970 _logger.error(f"No output after map processing") 971 raise Exception( 972 f"No output after map processing, check config {str(phen_path.resolve())}" 973 ) 974 975 # final processing 976 out = out.reset_index(drop=True) 977 out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) 978 out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"]) 979 980 # out_count = len(out.index) 981 # added metadata 982 # Loop over each source_concept_type and perform the left join on all columns apart from source code columns 983 # result_list = [] 984 # for files in phenotype["concept_sets"]: 985 # concept_set_name = files["name"] 986 # for concept_set in files["files"]: 987 # source_column_names = list(concept_set["columns"].keys()) 988 # for source_concept_type in source_column_names: 989 # # Filter output based on the current source_concept_type 990 # out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type] 991 # filtered_count = len(out_filtered_df.index) 992 993 # # Remove the source type columns except the current type will leave the metadata and the join 994 # remove_types = [ 995 # type for type in source_column_names if type != source_concept_type 996 # ] 997 # metadata_df = df.drop(columns=remove_types) 998 # metadata_df = metadata_df.rename( 999 # columns={source_concept_type: "SOURCE_CONCEPT"} 1000 # ) 1001 # metadata_df_count = len(metadata_df.index) 1002 1003 # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata 1004 # result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT") 1005 # result_count = len(result.index) 1006 1007 # _logger.debug( 1008 # f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}" 1009 # ) 1010 1011 # # Append the result to the result_list 1012 # result_list.append(result) 1013 1014 # Concatenate all the results into a single DataFrame 1015 # final_out = pd.concat(result_list, ignore_index=True) 1016 # final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) 1017 # _logger.debug( 1018 # f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}" 1019 # ) 1020 1021 # Save output to map directory 1022 output_filename = target_code_type + ".csv" 1023 map_path = phen_path / MAP_DIR / output_filename 1024 out.to_csv(map_path, index=False) 1025 _logger.info(f"Saved mapped concepts to {str(map_path.resolve())}") 1026 1027 # save concept sets as separate files 1028 concept_set_path = phen_path / CSV_PATH / target_code_type 1029 1030 # empty the concept-set directory except for hiddle files, e.g. .git 1031 if concept_set_path.exists(): 1032 for item in concept_set_path.iterdir(): 1033 if not item.name.startswith("."): 1034 item.unlink() 1035 else: 1036 concept_set_path.mkdir(parents=True, exist_ok=True) 1037 1038 # write each concept as a separate file 1039 for name, concept in out.groupby("CONCEPT_SET"): 1040 concept = concept.sort_values(by="CONCEPT") # sort rows 1041 concept = concept.dropna(how="all", axis=1) # remove empty cols 1042 concept = concept.reindex( 1043 sorted(concept.columns), axis=1 1044 ) # sort cols alphabetically 1045 filename = f"{name}.csv" 1046 concept_path = concept_set_path / filename 1047 concept.to_csv(concept_path, index=False) 1048 1049 write_vocab_version(phen_path) 1050 1051 _logger.info(f"Phenotype processed target code type {target_code_type}") 1052 1053 1054# Add metadata dict to each row of Df codes 1055def add_metadata( 1056 codes: pd.DataFrame, 1057 metadata: dict, 1058 no_metadata: bool, 1059) -> pd.DataFrame: 1060 """Add concept set metadata, stored as a dictionary, to each concept row""" 1061 1062 if not no_metadata: 1063 for meta_name, meta_value in metadata.items(): 1064 codes[meta_name] = meta_value 1065 _logger.debug( 1066 f"Adding metadata for concept set: metadata name {meta_name}, metadata value {meta_value}" 1067 ) 1068 1069 return codes 1070 1071 1072def _generate_version_tag( 1073 repo: git.Repo, increment: str = DEFAULT_VERSION_INC, use_v_prefix: bool = False 1074) -> str: 1075 # Get all valid semantic version tags 1076 versions = [] 1077 for tag in repo.tags: 1078 if tag.name.startswith("v"): 1079 tag_name = tag.name[1:] # Remove the first character 1080 else: 1081 tag_name = tag.name 1082 if semver.Version.is_valid(tag_name): 1083 versions.append(semver.Version.parse(tag_name)) 1084 1085 _logger.debug(f"Versions: {versions}") 1086 # Determine the next version 1087 if not versions: 1088 new_version = semver.Version(0, 0, 1) 1089 else: 1090 latest_version = max(versions) 1091 if increment == "major": 1092 new_version = latest_version.bump_major() 1093 elif increment == "minor": 1094 new_version = latest_version.bump_minor() 1095 else: 1096 new_version = latest_version.bump_patch() 1097 1098 # Create the new tag 1099 new_version_str = f"v{new_version}" if use_v_prefix else str(new_version) 1100 1101 return new_version_str 1102 1103 1104def publish( 1105 phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC 1106): 1107 """Publishes updates to the phenotype by commiting all changes to the repo directory""" 1108 1109 # Validate config 1110 validate(phen_dir) 1111 phen_path = Path(phen_dir) 1112 1113 # load git repo and set the branch 1114 repo = git.Repo(phen_path) 1115 if DEFAULT_GIT_BRANCH in repo.branches: 1116 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 1117 main_branch.checkout() 1118 else: 1119 raise AttributeError( 1120 f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}" 1121 ) 1122 1123 # check if any changes to publish 1124 if not repo.is_dirty() and not repo.untracked_files: 1125 if remote_url is not None and "origin" not in repo.remotes: 1126 _logger.info(f"First publish to remote url {remote_url}") 1127 else: 1128 _logger.info("Nothing to publish, no changes to the repo") 1129 return 1130 1131 # get next version 1132 new_version_str = _generate_version_tag(repo, increment) 1133 _logger.info(f"New version: {new_version_str}") 1134 1135 # Write version in configuration file 1136 config_path = phen_path / CONFIG_FILE 1137 with config_path.open("r") as file: 1138 config = yaml.safe_load(file) 1139 1140 config["phenotype"]["version"] = new_version_str 1141 with open(config_path, "w") as file: 1142 yaml.dump( 1143 config, 1144 file, 1145 Dumper=util.QuotedDumper, 1146 default_flow_style=False, 1147 sort_keys=False, 1148 default_style='"', 1149 ) 1150 1151 # Add and commit changes to repo including version updates 1152 commit_message = f"Committing updates to phenotype {phen_path}" 1153 repo.git.add("--all") 1154 repo.index.commit(commit_message) 1155 1156 # Add tag to the repo 1157 repo.create_tag(new_version_str) 1158 1159 # push to origin if a remote repo 1160 if remote_url is not None and "origin" not in repo.remotes: 1161 git_url = _construct_git_url(remote_url) 1162 repo.create_remote("origin", git_url) 1163 1164 try: 1165 if "origin" in repo.remotes: 1166 _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}") 1167 origin = repo.remotes.origin 1168 _logger.info(f"Pushing main branch to remote repo") 1169 repo.git.push("--set-upstream", "origin", "main") 1170 _logger.info(f"Pushing version tags to remote git repo") 1171 origin.push(tags=True) 1172 _logger.debug("Changes pushed to 'origin'") 1173 else: 1174 _logger.debug("Remote 'origin' is not set") 1175 except Exception as e: 1176 tag_ref = repo.tags[new_version_str] 1177 repo.delete_tag(tag_ref) 1178 repo.git.reset("--soft", "HEAD~1") 1179 raise e 1180 1181 _logger.info(f"Phenotype published successfully") 1182 1183 1184def export(phen_dir: str, version: str): 1185 """Exports a phen repo at a specific tagged version into a target directory""" 1186 _logger.info(f"Exporting phenotype {phen_dir} at version {version}") 1187 1188 # validate configuration 1189 validate(phen_dir) 1190 phen_path = Path(phen_dir) 1191 1192 # load configuration 1193 config_path = phen_path / CONFIG_FILE 1194 with config_path.open("r") as file: 1195 config = yaml.safe_load(file) 1196 1197 map_path = phen_path / MAP_DIR 1198 if not map_path.exists(): 1199 _logger.warning(f"Map path does not exist '{map_path}'") 1200 1201 export_path = phen_path / OMOP_PATH 1202 # check export directory exists and if not create it 1203 if not export_path.exists(): 1204 export_path.mkdir(parents=True) 1205 _logger.debug(f"OMOP export directory '{export_path}' created.") 1206 1207 # omop export db 1208 export_db_path = omop.export( 1209 map_path, 1210 export_path, 1211 config["phenotype"]["version"], 1212 config["phenotype"]["omop"], 1213 ) 1214 1215 # write to tables 1216 # export as csv 1217 _logger.info(f"Phenotype exported successfully") 1218 1219 1220def copy(phen_dir: str, target_dir: str, version: str): 1221 """Copys a phen repo at a specific tagged version into a target directory""" 1222 1223 # Validate 1224 validate(phen_dir) 1225 phen_path = Path(phen_dir) 1226 1227 # Check target directory exists 1228 target_path = Path(target_dir) 1229 if not target_path.exists(): 1230 raise FileNotFoundError(f"The target directory {target_path} does not exist") 1231 1232 # Set copy directory 1233 copy_path = target_path / version 1234 _logger.info(f"Copying repo {phen_path} to {copy_path}") 1235 1236 if ( 1237 copy_path.exists() and copy_path.is_dir() 1238 ): # Check if it exists and is a directory 1239 copy = _check_delete_dir( 1240 copy_path, 1241 f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ", 1242 ) 1243 else: 1244 copy = True 1245 1246 if not copy: 1247 _logger.info(f"Not copying the version {version}") 1248 return 1249 1250 _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...") 1251 repo = git.Repo.clone_from(phen_path, copy_path) 1252 1253 # Check out the latest commit or specified version 1254 if version: 1255 # Checkout a specific version (e.g., branch, tag, or commit hash) 1256 _logger.info(f"Checking out version {version}...") 1257 repo.git.checkout(version) 1258 else: 1259 # Checkout the latest commit (HEAD) 1260 _logger.info(f"Checking out the latest commit...") 1261 repo.git.checkout("HEAD") 1262 1263 _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}") 1264 1265 _logger.info(f"Phenotype copied successfully") 1266 1267 1268# Convert concept_sets list into dictionaries 1269def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]: 1270 """Extracts concepts as {name: file_path} dictionary and a name set.""" 1271 concepts_dict = { 1272 item["name"]: [file["path"] for file in item["files"]] 1273 for item in config_data["phenotype"]["concept_sets"] 1274 } 1275 name_set = set(concepts_dict.keys()) 1276 return concepts_dict, name_set 1277 1278 1279def _extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]: 1280 """ 1281 Extracts clean keys from a DeepDiff dictionary. 1282 1283 :param diff: DeepDiff result dictionary 1284 :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed") 1285 :return: A set of clean key names 1286 """ 1287 return {key.split("root['")[1].split("']")[0] for key in diff.get(key_type, [])} 1288 1289 1290def diff_config(old_config: dict, new_config: dict) -> str: 1291 report = f"\n# Changes to phenotype configuration\n" 1292 report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n" 1293 1294 old_concepts, old_names = extract_concepts(old_config) 1295 new_concepts, new_names = extract_concepts(new_config) 1296 1297 # Check added and removed concept set names 1298 added_names = new_names - old_names # Names that appear in new but not in old 1299 removed_names = old_names - new_names # Names that were in old but not in new 1300 1301 # find file path changes for unchanged names 1302 unchanged_names = old_names & new_names # Names that exist in both 1303 file_diff = DeepDiff( 1304 {name: old_concepts[name] for name in unchanged_names}, 1305 {name: new_concepts[name] for name in unchanged_names}, 1306 ) 1307 1308 # Find renamed concepts (same file, different name) 1309 renamed_concepts = [] 1310 for removed in removed_names: 1311 old_path = old_concepts[removed] 1312 for added in added_names: 1313 new_path = new_concepts[added] 1314 if old_path == new_path: 1315 renamed_concepts.append((removed, added)) 1316 1317 # Remove renamed concepts from added and removed sets 1318 for old_name, new_name in renamed_concepts: 1319 added_names.discard(new_name) 1320 removed_names.discard(old_name) 1321 1322 # generate config report 1323 if added_names: 1324 report += "## Added Concepts\n" 1325 for name in added_names: 1326 report += f"- `{name}` (File: `{new_concepts[name]}`)\n" 1327 report += "\n" 1328 1329 if removed_names: 1330 report += "## Removed Concepts\n" 1331 for name in removed_names: 1332 report += f"- `{name}` (File: `{old_concepts[name]}`)\n" 1333 report += "\n" 1334 1335 if renamed_concepts: 1336 report += "## Renamed Concepts\n" 1337 for old_name, new_name in renamed_concepts: 1338 report += ( 1339 f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n" 1340 ) 1341 report += "\n" 1342 1343 if "values_changed" in file_diff: 1344 report += "## Updated File Paths\n" 1345 for name, change in file_diff["values_changed"].items(): 1346 old_file = change["old_value"] 1347 new_file = change["new_value"] 1348 clean_name = name.split("root['")[1].split("']")[0] 1349 report += ( 1350 f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n" 1351 ) 1352 report += "\n" 1353 1354 if not ( 1355 added_names 1356 or removed_names 1357 or renamed_concepts 1358 or file_diff.get("values_changed") 1359 ): 1360 report += "No changes in concept sets.\n" 1361 1362 return report 1363 1364 1365def diff_map_files(old_map_path: Path, new_map_path: Path) -> str: 1366 old_output_files = [ 1367 file.name 1368 for file in old_map_path.iterdir() 1369 if file.is_file() and not file.name.startswith(".") 1370 ] 1371 new_output_files = [ 1372 file.name 1373 for file in new_map_path.iterdir() 1374 if file.is_file() and not file.name.startswith(".") 1375 ] 1376 1377 # Convert the lists to sets for easy comparison 1378 old_output_set = set(old_output_files) 1379 new_output_set = set(new_output_files) 1380 1381 # Outputs that are in old_output_set but not in new_output_set (removed files) 1382 removed_outputs = old_output_set - new_output_set 1383 # Outputs that are in new_output_set but not in old_output_set (added files) 1384 added_outputs = new_output_set - old_output_set 1385 # Outputs that are the intersection of old_output_set and new_output_set 1386 common_outputs = old_output_set & new_output_set 1387 1388 report = f"\n# Changes to available translations\n" 1389 report += f"This compares the coding translations files available.\n\n" 1390 report += f"- Removed outputs: {sorted(list(removed_outputs))}\n" 1391 report += f"- Added outputs: {sorted(list(added_outputs))}\n" 1392 report += f"- Common outputs: {sorted(list(common_outputs))}\n\n" 1393 1394 # Step N: Compare common outputs between versions 1395 report += f"# Changes to concepts in translation files\n\n" 1396 report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n" 1397 for file in common_outputs: 1398 old_output = old_map_path / file 1399 new_output = new_map_path / file 1400 1401 _logger.debug(f"Old ouptput: {str(old_output.resolve())}") 1402 _logger.debug(f"New ouptput: {str(new_output.resolve())}") 1403 1404 df1 = pd.read_csv(old_output) 1405 df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1406 df2 = pd.read_csv(new_output) 1407 df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1408 1409 # Check for added and removed concepts 1410 report += f"- File {file}\n" 1411 sorted_list = sorted(list(set(df1.index) - set(df2.index))) 1412 report += f"- Removed concepts {sorted_list}\n" 1413 sorted_list = sorted(list(set(df2.index) - set(df1.index))) 1414 report += f"- Added concepts {sorted_list}\n" 1415 1416 # Check for changed concepts 1417 diff = df2 - df1 # diff in counts 1418 diff = diff[ 1419 (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna() 1420 ] # get non-zero counts 1421 s = "\n" 1422 if len(diff.index) > 0: 1423 for concept, row in diff.iterrows(): 1424 s += "\t - {} {}\n".format(concept, row["CONCEPT"]) 1425 report += f"- Changed concepts {s}\n\n" 1426 else: 1427 report += f"- Changed concepts []\n\n" 1428 1429 return report 1430 1431 1432def diff_phen( 1433 new_phen_path: Path, 1434 new_version: str, 1435 old_phen_path: Path, 1436 old_version: str, 1437 report_path: Path, 1438 not_check_config: bool, 1439): 1440 """Compare the differences between two versions of a phenotype""" 1441 1442 # write report heading 1443 report = f"# Phenotype Comparison Report\n" 1444 1445 # Step 1: check differences configuration files 1446 if not not_check_config: 1447 # validate phenotypes 1448 _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}") 1449 validate(str(old_phen_path.resolve())) 1450 _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}") 1451 validate(str(new_phen_path.resolve())) 1452 1453 # get old and new config 1454 old_config_path = old_phen_path / CONFIG_FILE 1455 with old_config_path.open("r") as file: 1456 old_config = yaml.safe_load(file) 1457 new_config_path = new_phen_path / CONFIG_FILE 1458 with new_config_path.open("r") as file: 1459 new_config = yaml.safe_load(file) 1460 1461 # write report 1462 report += f"## Original phenotype\n" 1463 report += f" - {old_config['phenotype']['omop']['vocabulary_id']}\n" 1464 report += f" - {old_version}\n" 1465 report += f" - {str(old_phen_path.resolve())}\n" 1466 report += f"## Changed phenotype:\n" 1467 report += f" - {new_config['phenotype']['omop']['vocabulary_id']}\n" 1468 report += f" - {new_version}\n" 1469 report += f" - {str(new_phen_path.resolve())}\n" 1470 1471 # Convert list of dicts into a dict: {name: file} 1472 report += diff_config(old_config, new_config) 1473 1474 # Step 2: check differences between map files 1475 # List files from output directories 1476 old_map_path = old_phen_path / MAP_DIR 1477 new_map_path = new_phen_path / MAP_DIR 1478 report += diff_map_files(old_map_path, new_map_path) 1479 1480 # initialise report file 1481 _logger.debug(f"Writing to report file {str(report_path.resolve())}") 1482 report_file = open(report_path, "w") 1483 report_file.write(report) 1484 report_file.close() 1485 1486 _logger.info(f"Phenotypes diff'd successfully") 1487 1488 1489def diff( 1490 phen_dir: str, 1491 version: str, 1492 old_phen_dir: str, 1493 old_version: str, 1494 not_check_config: bool, 1495): 1496 # make tmp directory .acmc 1497 timestamp = time.strftime("%Y%m%d_%H%M%S") 1498 temp_dir = Path(f".acmc/diff_{timestamp}") 1499 1500 changed_phen_path = Path(phen_dir) 1501 if not changed_phen_path.exists(): 1502 raise ValueError( 1503 f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}" 1504 ) 1505 1506 old_phen_path = Path(old_phen_dir) 1507 if not old_phen_path.exists(): 1508 raise ValueError( 1509 f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}" 1510 ) 1511 1512 # t_path = old_phen_path / "config.yml" 1513 # with t_path.open("r") as file: 1514 # c = yaml.safe_load(file) 1515 1516 try: 1517 # Create the directory 1518 temp_dir.mkdir(parents=True, exist_ok=True) 1519 _logger.debug(f"Temporary directory created: {temp_dir}") 1520 1521 # Create temporary directories 1522 changed_path = temp_dir / "changed" 1523 changed_path.mkdir(parents=True, exist_ok=True) 1524 old_path = temp_dir / "old" 1525 old_path.mkdir(parents=True, exist_ok=True) 1526 1527 # checkout changed 1528 if version == "latest": 1529 _logger.debug( 1530 f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..." 1531 ) 1532 shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True) 1533 else: 1534 _logger.debug( 1535 f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..." 1536 ) 1537 changed_repo = git.Repo.clone_from(changed_phen_path, changed_path) 1538 changed_repo.git.checkout(version) 1539 1540 # checkout old 1541 if old_version == "latest": 1542 _logger.debug( 1543 f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1544 ) 1545 shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True) 1546 else: 1547 _logger.debug( 1548 f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1549 ) 1550 old_repo = git.Repo.clone_from(old_phen_dir, old_path) 1551 old_repo.git.checkout(old_version) 1552 1553 report_filename = f"{version}_{old_version}_diff.md" 1554 report_path = changed_phen_path / report_filename 1555 # diff old with new 1556 diff_phen( 1557 changed_path, version, old_path, old_version, report_path, not_check_config 1558 ) 1559 1560 finally: 1561 # clean up tmp directory 1562 if temp_dir.exists(): 1563 shutil.rmtree(temp_dir)
Default phenotype directory name
Default phenotype directory path
Default concepts directory name
Default map directory name
Default concept set directory name
Default CSV concept set directory path
Default OMOP concept set directory path
List of default phenotype directories
Default configuration filename
Default vocabulary version filename
List of semantic version increment types
Default semantic version increment type
Default phenotype repo branch name
Split column preprocessing action type
Codes column preprocessing action type
Divide column preprocessing action type
List of column preprocessing action types
List of supported source concept coding list file types
Phenotype config.yml schema definition
163class PhenValidationException(Exception): 164 """Custom exception class raised when validation errors in phenotype configuration file""" 165 166 def __init__(self, message, validation_errors=None): 167 super().__init__(message) 168 self.validation_errors = validation_errors
Custom exception class raised when validation errors in phenotype configuration file
237def init(phen_dir: str, remote_url: str): 238 """Initial phenotype directory as git repo with standard structure""" 239 _logger.info(f"Initialising Phenotype in directory: {phen_dir}") 240 phen_path = Path(phen_dir) 241 242 # check if directory already exists and ask user if they want to recreate it 243 if ( 244 phen_path.exists() and phen_path.is_dir() 245 ): # Check if it exists and is a directory 246 configure = _check_delete_dir( 247 phen_path, 248 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 249 ) 250 else: 251 configure = True 252 253 if not configure: 254 _logger.info(f"Exiting, phenotype not initiatised") 255 return 256 257 # Initialise repo from local or remote 258 repo: Repo 259 260 # if remote then clone the repo otherwise init a local repo 261 if remote_url != None: 262 # add PAT token to the URL 263 git_url = _construct_git_url(remote_url) 264 265 # clone the repo 266 git_cmd = git.cmd.Git() 267 git_cmd.clone(git_url, phen_path) 268 269 # open repo 270 repo = Repo(phen_path) 271 # check if there are any commits (new repo has no commits) 272 if ( 273 len(repo.branches) == 0 or repo.head.is_detached 274 ): # Handle detached HEAD (e.g., after init) 275 _logger.debug("The phen repository has no commits yet.") 276 commit_count = 0 277 else: 278 # Get the total number of commits in the default branch 279 commit_count = sum(1 for _ in repo.iter_commits()) 280 _logger.debug(f"Repo has previous commits: {commit_count}") 281 else: 282 # local repo, create the directories and init 283 phen_path.mkdir(parents=True, exist_ok=True) 284 _logger.debug(f"Phen directory '{phen_path}' has been created.") 285 repo = git.Repo.init(phen_path) 286 commit_count = 0 287 288 phen_path = phen_path.resolve() 289 # initialise empty repos 290 if commit_count == 0: 291 # create initial commit 292 initial_file_path = phen_path / "README.md" 293 with open(initial_file_path, "w") as file: 294 file.write( 295 "# Initial commit\nThis is the first commit in the phen repository.\n" 296 ) 297 repo.index.add([initial_file_path]) 298 repo.index.commit("Initial commit") 299 commit_count = 1 300 301 # Checkout the phens default branch, creating it if it does not exist 302 if DEFAULT_GIT_BRANCH in repo.branches: 303 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 304 main_branch.checkout() 305 else: 306 main_branch = repo.create_head(DEFAULT_GIT_BRANCH) 307 main_branch.checkout() 308 309 # if the phen path does not contain the config file then initialise the phen type 310 config_path = phen_path / CONFIG_FILE 311 if config_path.exists(): 312 _logger.debug(f"Phenotype configuration files already exist") 313 return 314 315 _logger.info("Creating phen directory structure and config files") 316 for d in DEFAULT_PHEN_DIR_LIST: 317 _create_empty_git_dir(phen_path / d) 318 319 # create empty phen config file 320 config = { 321 "phenotype": { 322 "version": "0.0.0", 323 "omop": { 324 "vocabulary_id": "", 325 "vocabulary_name": "", 326 "vocabulary_reference": "", 327 }, 328 "translate": [], 329 "concept_sets": [], 330 } 331 } 332 333 with open(phen_path / CONFIG_FILE, "w") as file: 334 yaml.dump( 335 config, 336 file, 337 Dumper=util.QuotedDumper, 338 default_flow_style=False, 339 sort_keys=False, 340 default_style='"', 341 ) 342 343 # add git ignore 344 ignore_content = """# Ignore SQLite database files 345*.db 346*.sqlite3 347 348# Ignore SQLite journal and metadata files 349*.db-journal 350*.sqlite3-journal 351 352# python 353.ipynb_checkpoints 354 """ 355 ignore_path = phen_path / ".gitignore" 356 with open(ignore_path, "w") as file: 357 file.write(ignore_content) 358 359 # add to git repo and commit 360 for d in DEFAULT_PHEN_DIR_LIST: 361 repo.git.add(phen_path / d) 362 repo.git.add(all=True) 363 repo.index.commit("initialised the phen git repo.") 364 365 _logger.info(f"Phenotype initialised successfully")
Initial phenotype directory as git repo with standard structure
368def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str): 369 """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin" 370 371 Args: 372 phen_dir (str): local directory path where the upstream repo is to be cloned 373 upstream_url (str): url to the upstream repo 374 upstream_version (str): version in the upstream repo to clone 375 new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None. 376 377 Raises: 378 ValueError: if the specified version is not in the upstream repo 379 ValueError: if the upstream repo is not a valid phenotype repo 380 ValueError: if there's any other problems with Git 381 """ 382 _logger.info( 383 f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}" 384 ) 385 386 phen_path = Path(phen_dir) 387 # check if directory already exists and ask user if they want to recreate it 388 if ( 389 phen_path.exists() and phen_path.is_dir() 390 ): # Check if it exists and is a directory 391 configure = _check_delete_dir( 392 phen_path, 393 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 394 ) 395 else: 396 configure = True 397 398 if not configure: 399 _logger.info(f"Exiting, phenotype not initiatised") 400 return 401 402 try: 403 # Clone repo 404 git_url = _construct_git_url(upstream_url) 405 repo = git.Repo.clone_from(git_url, phen_path) 406 407 # Fetch all branches and tags 408 repo.remotes.origin.fetch() 409 410 # Check if the version exists 411 available_refs = [ref.name.split("/")[-1] for ref in repo.references] 412 if upstream_version not in available_refs: 413 raise ValueError( 414 f"Version '{upstream_version}' not found in the repository: {upstream_url}." 415 ) 416 417 # Checkout the specified version 418 repo.git.checkout(upstream_version) 419 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 420 main_branch.checkout() 421 422 # Check if 'config.yml' exists in the root directory 423 config_path = phen_path / "config.yml" 424 if not os.path.isfile(config_path): 425 raise ValueError( 426 f"The forked repository is not a valid ACMC repo because 'config.yml' is missing in the root directory." 427 ) 428 429 # Validate the phenotype is compatible with the acmc tool 430 validate(str(phen_path.resolve())) 431 432 # Delete each tag locally 433 tags = repo.tags 434 for tag in tags: 435 repo.delete_tag(tag) 436 _logger.debug(f"Deleted tags from forked repo: {tag}") 437 438 # Add upstream remote 439 repo.create_remote("upstream", upstream_url) 440 remote = repo.remotes["origin"] 441 repo.delete_remote(remote) # Remove existing origin 442 443 # Optionally set a new origin remote 444 if new_origin_url: 445 git_url = _construct_git_url(new_origin_url) 446 repo.create_remote("origin", git_url) 447 repo.git.push("--set-upstream", "origin", "main") 448 449 _logger.info(f"Repository forked successfully at {phen_path}") 450 _logger.info(f"Upstream set to {upstream_url}") 451 if new_origin_url: 452 _logger.info(f"Origin set to {new_origin_url}") 453 454 except Exception as e: 455 if phen_path.exists(): 456 shutil.rmtree(phen_path) 457 raise ValueError(f"Error occurred during repository fork: {str(e)}")
Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
Arguments:
- phen_dir (str): local directory path where the upstream repo is to be cloned
- upstream_url (str): url to the upstream repo
- upstream_version (str): version in the upstream repo to clone
- new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
Raises:
- ValueError: if the specified version is not in the upstream repo
- ValueError: if the upstream repo is not a valid phenotype repo
- ValueError: if there's any other problems with Git
460def validate(phen_dir: str): 461 """Validates the phenotype directory is a git repo with standard structure""" 462 _logger.info(f"Validating phenotype: {phen_dir}") 463 phen_path = Path(phen_dir) 464 if not phen_path.is_dir(): 465 raise NotADirectoryError( 466 f"Error: '{str(phen_path.resolve())}' is not a directory" 467 ) 468 469 config_path = phen_path / CONFIG_FILE 470 if not config_path.is_file(): 471 raise FileNotFoundError( 472 f"Error: phen configuration file '{config_path}' does not exist." 473 ) 474 475 concepts_path = phen_path / CONCEPTS_DIR 476 if not concepts_path.is_dir(): 477 raise FileNotFoundError( 478 f"Error: source concepts directory {concepts_path} does not exist." 479 ) 480 481 # Calidate the directory is a git repo 482 try: 483 git.Repo(phen_path) 484 except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError): 485 raise Exception(f"Phen directory {phen_path} is not a git repo") 486 487 # Load configuration File 488 if config_path.suffix == ".yml": 489 try: 490 with config_path.open("r") as file: 491 phenotype = yaml.safe_load(file) 492 493 validator = Validator(CONFIG_SCHEMA) 494 if validator.validate(phenotype): 495 _logger.debug("YAML structure is valid.") 496 else: 497 _logger.error(f"YAML structure validation failed: {validator.errors}") 498 raise Exception(f"YAML structure validation failed: {validator.errors}") 499 except yaml.YAMLError as e: 500 _logger.error(f"YAML syntax error: {e}") 501 raise e 502 else: 503 raise Exception( 504 f"Unsupported configuration filetype: {str(config_path.resolve())}" 505 ) 506 507 # initiatise 508 validation_errors = [] 509 phenotype = phenotype["phenotype"] 510 code_types = parse.CodeTypeParser().code_types 511 512 # check the version number is of the format vn.n.n 513 match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"]) 514 if not match: 515 validation_errors.append( 516 f"Invalid version format in configuration file: {phenotype['version']}" 517 ) 518 519 # create a list of all the concept set names defined in the concept set configuration 520 concept_set_names = [] 521 for item in phenotype["concept_sets"]: 522 if item["name"] in concept_set_names: 523 validation_errors.append( 524 f"Duplicate concept set defined in concept sets {item['name'] }" 525 ) 526 else: 527 concept_set_names.append(item["name"]) 528 529 # check codes definition 530 for files in phenotype["concept_sets"]: 531 for item in files["files"]: 532 # check concepte code file exists 533 concept_code_file_path = concepts_path / item["path"] 534 if not concept_code_file_path.exists(): 535 validation_errors.append( 536 f"Coding file {str(concept_code_file_path.resolve())} does not exist" 537 ) 538 539 # check concepte code file is not empty 540 if concept_code_file_path.stat().st_size == 0: 541 validation_errors.append( 542 f"Coding file {str(concept_code_file_path.resolve())} is an empty file" 543 ) 544 545 # check code file type is supported 546 if concept_code_file_path.suffix not in CODE_FILE_TYPES: 547 raise ValueError( 548 f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types" 549 ) 550 551 # check columns specified are a supported medical coding type 552 for column in item["columns"]: 553 if column not in code_types: 554 validation_errors.append( 555 f"Column type {column} for file {concept_code_file_path} is not supported" 556 ) 557 558 # check the actions are supported 559 if "actions" in item: 560 for action in item["actions"]: 561 if action not in COL_ACTIONS: 562 validation_errors.append(f"Action {action} is not supported") 563 564 if len(validation_errors) > 0: 565 _logger.error(validation_errors) 566 raise PhenValidationException( 567 f"Configuration file {str(config_path.resolve())} failed validation", 568 validation_errors, 569 ) 570 571 _logger.info(f"Phenotype validated successfully")
Validates the phenotype directory is a git repo with standard structure
665def translate_codes( 666 source_df: pd.DataFrame, 667 target_code_type: str, 668 concept_name: str, 669 not_translate: bool, 670 do_reverse_translate: bool, 671) -> pd.DataFrame: 672 """Translates each source code type the source coding list into a target type and returns all conversions as a concept set""" 673 674 # codes = pd.DataFrame([], dtype=str) 675 codes = pd.DataFrame( 676 columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string" 677 ) 678 # Convert codes to target type 679 _logger.info(f"Converting to target code type {target_code_type}") 680 681 for source_code_type in source_df.columns: 682 # if target code type is the same as thet source code type, no translation, just appending source as target 683 if source_code_type == target_code_type: 684 copy_df = pd.DataFrame( 685 { 686 "SOURCE_CONCEPT": source_df[source_code_type], 687 "SOURCE_CONCEPT_TYPE": source_code_type, 688 "CONCEPT": source_df[source_code_type], 689 } 690 ) 691 codes = pd.concat([codes, copy_df]) 692 _logger.debug( 693 f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating" 694 ) 695 elif not not_translate: 696 # get the translation filename using source to target code types 697 filename = f"{source_code_type}_to_{target_code_type}.parquet" 698 map_path = trud.PROCESSED_PATH / filename 699 700 filename_reversed = f"{target_code_type}_to_{source_code_type}.parquet" 701 map_path_reversed = trud.PROCESSED_PATH / filename_reversed 702 703 # do the mapping if it exists 704 if map_path.exists(): 705 codes = _translate_codes(map_path, source_df, source_code_type, codes) 706 # otherwise do reverse mapping if enabled and it exists 707 elif do_reverse_translate and map_path_reversed.exists(): 708 codes = _translate_codes( 709 map_path_reversed, source_df, source_code_type, codes, reverse=True 710 ) 711 else: 712 _logger.warning( 713 f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" 714 ) 715 716 codes = codes.dropna() # delete NaNs 717 718 # print(codes) 719 720 # added concept set type to output if any translations 721 if len(codes.index) > 0: 722 codes["CONCEPT_SET"] = concept_name 723 else: 724 # print("NO CODES CONVERTED") 725 _logger.debug(f"No codes converted with target code type {target_code_type}") 726 727 return codes
Translates each source code type the source coding list into a target type and returns all conversions as a concept set
772def write_vocab_version(phen_path: Path): 773 # write the vocab version files 774 775 if not trud.VERSION_PATH.exists(): 776 raise FileNotFoundError( 777 f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed" 778 ) 779 780 if not omop.VERSION_PATH.exists(): 781 raise FileNotFoundError( 782 f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed" 783 ) 784 785 with trud.VERSION_PATH.open("r") as file: 786 trud_version = yaml.safe_load(file) 787 788 with omop.VERSION_PATH.open("r") as file: 789 omop_version = yaml.safe_load(file) 790 791 # Create the combined YAML structure 792 version_data = { 793 "versions": { 794 "acmc": acmc.__version__, 795 "trud": trud_version, 796 "omop": omop_version, 797 } 798 } 799 800 with open(phen_path / VOCAB_VERSION_FILE, "w") as file: 801 yaml.dump( 802 version_data, 803 file, 804 Dumper=util.QuotedDumper, 805 default_flow_style=False, 806 sort_keys=False, 807 default_style='"', 808 )
811def map( 812 phen_dir: str, 813 target_code_type: str, 814 not_translate: bool, 815 no_metadata: bool, 816 do_reverse_translate: bool, 817): 818 _logger.info(f"Processing phenotype: {phen_dir}") 819 820 # Validate configuration 821 validate(phen_dir) 822 823 # initialise paths 824 phen_path = Path(phen_dir) 825 config_path = phen_path / CONFIG_FILE 826 827 # load configuration 828 with config_path.open("r") as file: 829 config = yaml.safe_load(file) 830 phenotype = config["phenotype"] 831 832 if len(phenotype["map"]) == 0: 833 raise ValueError(f"No map codes defined in the phenotype configuration") 834 835 if target_code_type is not None and target_code_type not in phenotype["map"]: 836 raise ValueError( 837 f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}" 838 ) 839 840 if target_code_type is not None: 841 _map_target_code_type( 842 phen_path, 843 phenotype, 844 target_code_type, 845 not_translate, 846 no_metadata, 847 do_reverse_translate, 848 ) 849 else: 850 for t in phenotype["map"]: 851 _map_target_code_type( 852 phen_path, 853 phenotype, 854 t, 855 not_translate, 856 no_metadata, 857 do_reverse_translate, 858 ) 859 860 _logger.info(f"Phenotype processed successfully")
1056def add_metadata( 1057 codes: pd.DataFrame, 1058 metadata: dict, 1059 no_metadata: bool, 1060) -> pd.DataFrame: 1061 """Add concept set metadata, stored as a dictionary, to each concept row""" 1062 1063 if not no_metadata: 1064 for meta_name, meta_value in metadata.items(): 1065 codes[meta_name] = meta_value 1066 _logger.debug( 1067 f"Adding metadata for concept set: metadata name {meta_name}, metadata value {meta_value}" 1068 ) 1069 1070 return codes
Add concept set metadata, stored as a dictionary, to each concept row
1105def publish( 1106 phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC 1107): 1108 """Publishes updates to the phenotype by commiting all changes to the repo directory""" 1109 1110 # Validate config 1111 validate(phen_dir) 1112 phen_path = Path(phen_dir) 1113 1114 # load git repo and set the branch 1115 repo = git.Repo(phen_path) 1116 if DEFAULT_GIT_BRANCH in repo.branches: 1117 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 1118 main_branch.checkout() 1119 else: 1120 raise AttributeError( 1121 f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}" 1122 ) 1123 1124 # check if any changes to publish 1125 if not repo.is_dirty() and not repo.untracked_files: 1126 if remote_url is not None and "origin" not in repo.remotes: 1127 _logger.info(f"First publish to remote url {remote_url}") 1128 else: 1129 _logger.info("Nothing to publish, no changes to the repo") 1130 return 1131 1132 # get next version 1133 new_version_str = _generate_version_tag(repo, increment) 1134 _logger.info(f"New version: {new_version_str}") 1135 1136 # Write version in configuration file 1137 config_path = phen_path / CONFIG_FILE 1138 with config_path.open("r") as file: 1139 config = yaml.safe_load(file) 1140 1141 config["phenotype"]["version"] = new_version_str 1142 with open(config_path, "w") as file: 1143 yaml.dump( 1144 config, 1145 file, 1146 Dumper=util.QuotedDumper, 1147 default_flow_style=False, 1148 sort_keys=False, 1149 default_style='"', 1150 ) 1151 1152 # Add and commit changes to repo including version updates 1153 commit_message = f"Committing updates to phenotype {phen_path}" 1154 repo.git.add("--all") 1155 repo.index.commit(commit_message) 1156 1157 # Add tag to the repo 1158 repo.create_tag(new_version_str) 1159 1160 # push to origin if a remote repo 1161 if remote_url is not None and "origin" not in repo.remotes: 1162 git_url = _construct_git_url(remote_url) 1163 repo.create_remote("origin", git_url) 1164 1165 try: 1166 if "origin" in repo.remotes: 1167 _logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}") 1168 origin = repo.remotes.origin 1169 _logger.info(f"Pushing main branch to remote repo") 1170 repo.git.push("--set-upstream", "origin", "main") 1171 _logger.info(f"Pushing version tags to remote git repo") 1172 origin.push(tags=True) 1173 _logger.debug("Changes pushed to 'origin'") 1174 else: 1175 _logger.debug("Remote 'origin' is not set") 1176 except Exception as e: 1177 tag_ref = repo.tags[new_version_str] 1178 repo.delete_tag(tag_ref) 1179 repo.git.reset("--soft", "HEAD~1") 1180 raise e 1181 1182 _logger.info(f"Phenotype published successfully")
Publishes updates to the phenotype by commiting all changes to the repo directory
1185def export(phen_dir: str, version: str): 1186 """Exports a phen repo at a specific tagged version into a target directory""" 1187 _logger.info(f"Exporting phenotype {phen_dir} at version {version}") 1188 1189 # validate configuration 1190 validate(phen_dir) 1191 phen_path = Path(phen_dir) 1192 1193 # load configuration 1194 config_path = phen_path / CONFIG_FILE 1195 with config_path.open("r") as file: 1196 config = yaml.safe_load(file) 1197 1198 map_path = phen_path / MAP_DIR 1199 if not map_path.exists(): 1200 _logger.warning(f"Map path does not exist '{map_path}'") 1201 1202 export_path = phen_path / OMOP_PATH 1203 # check export directory exists and if not create it 1204 if not export_path.exists(): 1205 export_path.mkdir(parents=True) 1206 _logger.debug(f"OMOP export directory '{export_path}' created.") 1207 1208 # omop export db 1209 export_db_path = omop.export( 1210 map_path, 1211 export_path, 1212 config["phenotype"]["version"], 1213 config["phenotype"]["omop"], 1214 ) 1215 1216 # write to tables 1217 # export as csv 1218 _logger.info(f"Phenotype exported successfully")
Exports a phen repo at a specific tagged version into a target directory
1221def copy(phen_dir: str, target_dir: str, version: str): 1222 """Copys a phen repo at a specific tagged version into a target directory""" 1223 1224 # Validate 1225 validate(phen_dir) 1226 phen_path = Path(phen_dir) 1227 1228 # Check target directory exists 1229 target_path = Path(target_dir) 1230 if not target_path.exists(): 1231 raise FileNotFoundError(f"The target directory {target_path} does not exist") 1232 1233 # Set copy directory 1234 copy_path = target_path / version 1235 _logger.info(f"Copying repo {phen_path} to {copy_path}") 1236 1237 if ( 1238 copy_path.exists() and copy_path.is_dir() 1239 ): # Check if it exists and is a directory 1240 copy = _check_delete_dir( 1241 copy_path, 1242 f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ", 1243 ) 1244 else: 1245 copy = True 1246 1247 if not copy: 1248 _logger.info(f"Not copying the version {version}") 1249 return 1250 1251 _logger.debug(f"Cloning repo from {phen_path} into {copy_path}...") 1252 repo = git.Repo.clone_from(phen_path, copy_path) 1253 1254 # Check out the latest commit or specified version 1255 if version: 1256 # Checkout a specific version (e.g., branch, tag, or commit hash) 1257 _logger.info(f"Checking out version {version}...") 1258 repo.git.checkout(version) 1259 else: 1260 # Checkout the latest commit (HEAD) 1261 _logger.info(f"Checking out the latest commit...") 1262 repo.git.checkout("HEAD") 1263 1264 _logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}") 1265 1266 _logger.info(f"Phenotype copied successfully")
Copys a phen repo at a specific tagged version into a target directory
1270def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]: 1271 """Extracts concepts as {name: file_path} dictionary and a name set.""" 1272 concepts_dict = { 1273 item["name"]: [file["path"] for file in item["files"]] 1274 for item in config_data["phenotype"]["concept_sets"] 1275 } 1276 name_set = set(concepts_dict.keys()) 1277 return concepts_dict, name_set
Extracts concepts as {name: file_path} dictionary and a name set.
1291def diff_config(old_config: dict, new_config: dict) -> str: 1292 report = f"\n# Changes to phenotype configuration\n" 1293 report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n" 1294 1295 old_concepts, old_names = extract_concepts(old_config) 1296 new_concepts, new_names = extract_concepts(new_config) 1297 1298 # Check added and removed concept set names 1299 added_names = new_names - old_names # Names that appear in new but not in old 1300 removed_names = old_names - new_names # Names that were in old but not in new 1301 1302 # find file path changes for unchanged names 1303 unchanged_names = old_names & new_names # Names that exist in both 1304 file_diff = DeepDiff( 1305 {name: old_concepts[name] for name in unchanged_names}, 1306 {name: new_concepts[name] for name in unchanged_names}, 1307 ) 1308 1309 # Find renamed concepts (same file, different name) 1310 renamed_concepts = [] 1311 for removed in removed_names: 1312 old_path = old_concepts[removed] 1313 for added in added_names: 1314 new_path = new_concepts[added] 1315 if old_path == new_path: 1316 renamed_concepts.append((removed, added)) 1317 1318 # Remove renamed concepts from added and removed sets 1319 for old_name, new_name in renamed_concepts: 1320 added_names.discard(new_name) 1321 removed_names.discard(old_name) 1322 1323 # generate config report 1324 if added_names: 1325 report += "## Added Concepts\n" 1326 for name in added_names: 1327 report += f"- `{name}` (File: `{new_concepts[name]}`)\n" 1328 report += "\n" 1329 1330 if removed_names: 1331 report += "## Removed Concepts\n" 1332 for name in removed_names: 1333 report += f"- `{name}` (File: `{old_concepts[name]}`)\n" 1334 report += "\n" 1335 1336 if renamed_concepts: 1337 report += "## Renamed Concepts\n" 1338 for old_name, new_name in renamed_concepts: 1339 report += ( 1340 f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n" 1341 ) 1342 report += "\n" 1343 1344 if "values_changed" in file_diff: 1345 report += "## Updated File Paths\n" 1346 for name, change in file_diff["values_changed"].items(): 1347 old_file = change["old_value"] 1348 new_file = change["new_value"] 1349 clean_name = name.split("root['")[1].split("']")[0] 1350 report += ( 1351 f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n" 1352 ) 1353 report += "\n" 1354 1355 if not ( 1356 added_names 1357 or removed_names 1358 or renamed_concepts 1359 or file_diff.get("values_changed") 1360 ): 1361 report += "No changes in concept sets.\n" 1362 1363 return report
1366def diff_map_files(old_map_path: Path, new_map_path: Path) -> str: 1367 old_output_files = [ 1368 file.name 1369 for file in old_map_path.iterdir() 1370 if file.is_file() and not file.name.startswith(".") 1371 ] 1372 new_output_files = [ 1373 file.name 1374 for file in new_map_path.iterdir() 1375 if file.is_file() and not file.name.startswith(".") 1376 ] 1377 1378 # Convert the lists to sets for easy comparison 1379 old_output_set = set(old_output_files) 1380 new_output_set = set(new_output_files) 1381 1382 # Outputs that are in old_output_set but not in new_output_set (removed files) 1383 removed_outputs = old_output_set - new_output_set 1384 # Outputs that are in new_output_set but not in old_output_set (added files) 1385 added_outputs = new_output_set - old_output_set 1386 # Outputs that are the intersection of old_output_set and new_output_set 1387 common_outputs = old_output_set & new_output_set 1388 1389 report = f"\n# Changes to available translations\n" 1390 report += f"This compares the coding translations files available.\n\n" 1391 report += f"- Removed outputs: {sorted(list(removed_outputs))}\n" 1392 report += f"- Added outputs: {sorted(list(added_outputs))}\n" 1393 report += f"- Common outputs: {sorted(list(common_outputs))}\n\n" 1394 1395 # Step N: Compare common outputs between versions 1396 report += f"# Changes to concepts in translation files\n\n" 1397 report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n" 1398 for file in common_outputs: 1399 old_output = old_map_path / file 1400 new_output = new_map_path / file 1401 1402 _logger.debug(f"Old ouptput: {str(old_output.resolve())}") 1403 _logger.debug(f"New ouptput: {str(new_output.resolve())}") 1404 1405 df1 = pd.read_csv(old_output) 1406 df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1407 df2 = pd.read_csv(new_output) 1408 df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1409 1410 # Check for added and removed concepts 1411 report += f"- File {file}\n" 1412 sorted_list = sorted(list(set(df1.index) - set(df2.index))) 1413 report += f"- Removed concepts {sorted_list}\n" 1414 sorted_list = sorted(list(set(df2.index) - set(df1.index))) 1415 report += f"- Added concepts {sorted_list}\n" 1416 1417 # Check for changed concepts 1418 diff = df2 - df1 # diff in counts 1419 diff = diff[ 1420 (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna() 1421 ] # get non-zero counts 1422 s = "\n" 1423 if len(diff.index) > 0: 1424 for concept, row in diff.iterrows(): 1425 s += "\t - {} {}\n".format(concept, row["CONCEPT"]) 1426 report += f"- Changed concepts {s}\n\n" 1427 else: 1428 report += f"- Changed concepts []\n\n" 1429 1430 return report
1433def diff_phen( 1434 new_phen_path: Path, 1435 new_version: str, 1436 old_phen_path: Path, 1437 old_version: str, 1438 report_path: Path, 1439 not_check_config: bool, 1440): 1441 """Compare the differences between two versions of a phenotype""" 1442 1443 # write report heading 1444 report = f"# Phenotype Comparison Report\n" 1445 1446 # Step 1: check differences configuration files 1447 if not not_check_config: 1448 # validate phenotypes 1449 _logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}") 1450 validate(str(old_phen_path.resolve())) 1451 _logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}") 1452 validate(str(new_phen_path.resolve())) 1453 1454 # get old and new config 1455 old_config_path = old_phen_path / CONFIG_FILE 1456 with old_config_path.open("r") as file: 1457 old_config = yaml.safe_load(file) 1458 new_config_path = new_phen_path / CONFIG_FILE 1459 with new_config_path.open("r") as file: 1460 new_config = yaml.safe_load(file) 1461 1462 # write report 1463 report += f"## Original phenotype\n" 1464 report += f" - {old_config['phenotype']['omop']['vocabulary_id']}\n" 1465 report += f" - {old_version}\n" 1466 report += f" - {str(old_phen_path.resolve())}\n" 1467 report += f"## Changed phenotype:\n" 1468 report += f" - {new_config['phenotype']['omop']['vocabulary_id']}\n" 1469 report += f" - {new_version}\n" 1470 report += f" - {str(new_phen_path.resolve())}\n" 1471 1472 # Convert list of dicts into a dict: {name: file} 1473 report += diff_config(old_config, new_config) 1474 1475 # Step 2: check differences between map files 1476 # List files from output directories 1477 old_map_path = old_phen_path / MAP_DIR 1478 new_map_path = new_phen_path / MAP_DIR 1479 report += diff_map_files(old_map_path, new_map_path) 1480 1481 # initialise report file 1482 _logger.debug(f"Writing to report file {str(report_path.resolve())}") 1483 report_file = open(report_path, "w") 1484 report_file.write(report) 1485 report_file.close() 1486 1487 _logger.info(f"Phenotypes diff'd successfully")
Compare the differences between two versions of a phenotype
1490def diff( 1491 phen_dir: str, 1492 version: str, 1493 old_phen_dir: str, 1494 old_version: str, 1495 not_check_config: bool, 1496): 1497 # make tmp directory .acmc 1498 timestamp = time.strftime("%Y%m%d_%H%M%S") 1499 temp_dir = Path(f".acmc/diff_{timestamp}") 1500 1501 changed_phen_path = Path(phen_dir) 1502 if not changed_phen_path.exists(): 1503 raise ValueError( 1504 f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}" 1505 ) 1506 1507 old_phen_path = Path(old_phen_dir) 1508 if not old_phen_path.exists(): 1509 raise ValueError( 1510 f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}" 1511 ) 1512 1513 # t_path = old_phen_path / "config.yml" 1514 # with t_path.open("r") as file: 1515 # c = yaml.safe_load(file) 1516 1517 try: 1518 # Create the directory 1519 temp_dir.mkdir(parents=True, exist_ok=True) 1520 _logger.debug(f"Temporary directory created: {temp_dir}") 1521 1522 # Create temporary directories 1523 changed_path = temp_dir / "changed" 1524 changed_path.mkdir(parents=True, exist_ok=True) 1525 old_path = temp_dir / "old" 1526 old_path.mkdir(parents=True, exist_ok=True) 1527 1528 # checkout changed 1529 if version == "latest": 1530 _logger.debug( 1531 f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..." 1532 ) 1533 shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True) 1534 else: 1535 _logger.debug( 1536 f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..." 1537 ) 1538 changed_repo = git.Repo.clone_from(changed_phen_path, changed_path) 1539 changed_repo.git.checkout(version) 1540 1541 # checkout old 1542 if old_version == "latest": 1543 _logger.debug( 1544 f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1545 ) 1546 shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True) 1547 else: 1548 _logger.debug( 1549 f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1550 ) 1551 old_repo = git.Repo.clone_from(old_phen_dir, old_path) 1552 old_repo.git.checkout(old_version) 1553 1554 report_filename = f"{version}_{old_version}_diff.md" 1555 report_path = changed_phen_path / report_filename 1556 # diff old with new 1557 diff_phen( 1558 changed_path, version, old_path, old_version, report_path, not_check_config 1559 ) 1560 1561 finally: 1562 # clean up tmp directory 1563 if temp_dir.exists(): 1564 shutil.rmtree(temp_dir)