acmc.phen
Phenotype Module
This module provides functionality for managing phenotypes.
1""" 2Phenotype Module 3================ 4This module provides functionality for managing phenotypes. 5""" 6 7import argparse 8import pandas as pd 9import numpy as np 10import json 11import os 12import sqlite3 13import sys 14import shutil 15import time 16import git 17import re 18import logging 19import requests 20import yaml 21import semver 22from git import Repo 23from cerberus import Validator 24from deepdiff import DeepDiff 25from pathlib import Path 26from urllib.parse import urlparse, urlunparse 27from typing import Tuple, Set, Any 28 29import acmc 30from acmc import trud, omop, parse, util 31 32# setup logging 33import acmc.logging_config as lc 34 35logger = lc.setup_logger() 36 37pd.set_option("mode.chained_assignment", None) 38 39PHEN_DIR = "phen" 40DEFAULT_PHEN_PATH = Path("./workspace") / PHEN_DIR 41 42CONCEPTS_DIR = "concepts" 43MAP_DIR = "map" 44CONCEPT_SET_DIR = "concept-sets" 45CSV_PATH = Path(CONCEPT_SET_DIR) / "csv" 46OMOP_PATH = Path(CONCEPT_SET_DIR) / "omop" 47DEFAULT_PHEN_DIR_LIST = [CONCEPTS_DIR, MAP_DIR, CONCEPT_SET_DIR] 48CONFIG_FILE = "config.yaml" 49VOCAB_VERSION_FILE = "vocab_version.yaml" 50SEMANTIC_VERSION_TYPES = ["major", "minor", "patch"] 51DEFAULT_VERSION_INC = "patch" 52 53DEFAULT_GIT_BRANCH = "main" 54 55SPLIT_COL_ACTION = "split_col" 56CODES_COL_ACTION = "codes_col" 57DIVIDE_COL_ACTION = "divide_col" 58COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION] 59 60CODE_FILE_TYPES = [".xlsx", ".xls", ".csv"] 61SOURCE_COL_SUFFIX = "_acmc_source" 62TARGET_COL_SUFFIX = "_acmc_target" 63 64# config.yaml schema 65CONFIG_SCHEMA = { 66 "phenotype": { 67 "type": "dict", 68 "required": True, 69 "schema": { 70 "version": { 71 "type": "string", 72 "required": True, 73 "regex": r"^\d+\.\d+\.\d+$", # Enforces 'vN.N.N' format 74 }, 75 "omop": { 76 "type": "dict", 77 "required": True, 78 "schema": { 79 "vocabulary_id": {"type": "string", "required": True}, 80 "vocabulary_name": {"type": "string", "required": True}, 81 "vocabulary_reference": { 82 "type": "string", 83 "required": True, 84 "regex": r"^https?://.*", # Ensures it's a URL 85 }, 86 }, 87 }, 88 "map": { 89 "type": "list", 90 "schema": { 91 "type": "string", 92 "allowed": list( 93 parse.SUPPORTED_CODE_TYPES 94 ), # Ensure only predefined values are allowed 95 }, 96 }, 97 "concept_sets": { 98 "type": "list", 99 "required": True, 100 "schema": { 101 "type": "dict", 102 "schema": { 103 "name": {"type": "string", "required": True}, 104 "file": { 105 "type": "dict", 106 "required": False, 107 "schema": { 108 "path": {"type": "string", "required": True}, 109 "columns": {"type": "dict", "required": True}, 110 "category": { 111 "type": "string" 112 }, # Optional but must be string if present 113 "actions": { 114 "type": "dict", 115 "schema": {"divide_col": {"type": "string"}}, 116 }, 117 }, 118 }, 119 "metadata": {"type": "dict", "required": True}, 120 }, 121 }, 122 }, 123 }, 124 } 125} 126 127 128class PhenValidationException(Exception): 129 """Custom exception class raised when validation errors in phenotype configuration file""" 130 131 def __init__(self, message, validation_errors=None): 132 super().__init__(message) 133 self.validation_errors = validation_errors 134 135 136def construct_git_url(remote_url: str): 137 """Constructs a git url for github or gitlab including a PAT token environment variable""" 138 # check the url 139 parsed_url = urlparse(remote_url) 140 141 # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd 142 # need to update this function if the URL scheme is different. 143 if "github.com" in parsed_url.netloc: 144 # get GitHub PAT from environment variable 145 auth = os.getenv("ACMC_GITHUB_PAT") 146 if not auth: 147 raise ValueError( 148 "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable." 149 ) 150 else: 151 # get GitLab PAT from environment variable 152 auth = os.getenv("ACMC_GITLAB_PAT") 153 if not auth: 154 raise ValueError( 155 "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable." 156 ) 157 auth = f"oauth2:{auth}" 158 159 # Construct the new URL with credentials 160 new_netloc = f"{auth}@{parsed_url.netloc}" 161 return urlunparse( 162 ( 163 parsed_url.scheme, 164 new_netloc, 165 parsed_url.path, 166 parsed_url.params, 167 parsed_url.query, 168 parsed_url.fragment, 169 ) 170 ) 171 172 173def create_empty_git_dir(path: Path): 174 """Creates a directory with a .gitkeep file so that it's tracked in git""" 175 path.mkdir(exist_ok=True) 176 keep_path = path / ".gitkeep" 177 keep_path.touch(exist_ok=True) 178 179 180def check_delete_dir(path: Path, msg: str) -> bool: 181 """Checks on the command line if a user wants to delete a directory 182 183 Args: 184 path (Path): path of the directory to be deleted 185 msg (str): message to be displayed to the user 186 187 Returns: 188 Boolean: True if deleted 189 """ 190 deleted = False 191 192 user_input = input(f"{msg}").strip().lower() 193 if user_input in ["yes", "y"]: 194 shutil.rmtree(path) 195 deleted = True 196 else: 197 logger.info("Directory was not deleted.") 198 199 return deleted 200 201 202def fork( 203 phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str = None 204): 205 """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin" 206 207 Args: 208 phen_dir (str): local directory path where the upstream repo is to be cloned 209 upstream_url (str): url to the upstream repo 210 upstream_version (str): version in the upstream repo to clone 211 new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None. 212 213 Raises: 214 ValueError: if the specified version is not in the upstream repo 215 ValueError: if the upstream repo is not a valid phenotype repo 216 ValueError: if there's any other problems with Git 217 """ 218 logger.info( 219 f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}" 220 ) 221 222 phen_path = Path(phen_dir) 223 # check if directory already exists and ask user if they want to recreate it 224 if ( 225 phen_path.exists() and phen_path.is_dir() 226 ): # Check if it exists and is a directory 227 configure = check_delete_dir( 228 phen_path, 229 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 230 ) 231 else: 232 configure = True 233 234 if not configure: 235 logger.info(f"Exiting, phenotype not initiatised") 236 return 237 238 try: 239 # Clone repo 240 git_url = construct_git_url(upstream_url) 241 repo = git.Repo.clone_from(git_url, phen_path) 242 243 # Fetch all branches and tags 244 repo.remotes.origin.fetch() 245 246 # Check if the version exists 247 available_refs = [ref.name.split("/")[-1] for ref in repo.references] 248 if upstream_version not in available_refs: 249 raise ValueError( 250 f"Version '{upstream_version}' not found in the repository: {upstream_url}." 251 ) 252 253 # Checkout the specified version 254 repo.git.checkout(upstream_version) 255 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 256 main_branch.checkout() 257 258 # Check if 'config.yaml' exists in the root directory 259 config_path = phen_path / "config.yaml" 260 if not os.path.isfile(config_path): 261 raise ValueError( 262 f"The forked repository is not a valid ACMC repo because 'config.yaml' is missing in the root directory." 263 ) 264 265 # Validate the phenotype is compatible with the acmc tool 266 validate(str(phen_path.resolve())) 267 268 # Delete each tag locally 269 tags = repo.tags 270 for tag in tags: 271 repo.delete_tag(tag) 272 logger.debug(f"Deleted tags from forked repo: {tag}") 273 274 # Add upstream remote 275 repo.create_remote("upstream", upstream_url) 276 remote = repo.remotes["origin"] 277 repo.delete_remote(remote) # Remove existing origin 278 279 # Optionally set a new origin remote 280 if new_origin_url: 281 git_url = construct_git_url(new_origin_url) 282 repo.create_remote("origin", git_url) 283 repo.git.push("--set-upstream", "origin", "main") 284 285 logger.info(f"Repository forked successfully at {phen_path}") 286 logger.info(f"Upstream set to {upstream_url}") 287 if new_origin_url: 288 logger.info(f"Origin set to {new_origin_url}") 289 290 except Exception as e: 291 if phen_path.exists(): 292 shutil.rmtree(phen_path) 293 raise ValueError(f"Error occurred during repository fork: {str(e)}") 294 295 296def init(phen_dir: str, remote_url: str): 297 """Initial phenotype directory as git repo with standard structure""" 298 logger.info(f"Initialising Phenotype in directory: {phen_dir}") 299 phen_path = Path(phen_dir) 300 301 # check if directory already exists and ask user if they want to recreate it 302 if ( 303 phen_path.exists() and phen_path.is_dir() 304 ): # Check if it exists and is a directory 305 configure = check_delete_dir( 306 phen_path, 307 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 308 ) 309 else: 310 configure = True 311 312 if not configure: 313 logger.info(f"Exiting, phenotype not initiatised") 314 return 315 316 # Initialise repo from local or remote 317 repo: Repo 318 # if remote then clone the repo otherwise init a local repo 319 if remote_url != None: 320 # add PAT token to the URL 321 git_url = construct_git_url(remote_url) 322 323 # clone the repo 324 repo = git.cmd.Git() 325 repo.clone(git_url, phen_path) 326 327 # open repo 328 repo = Repo(phen_path) 329 # check if there are any commits (new repo has no commits) 330 if ( 331 len(repo.branches) == 0 or repo.head.is_detached 332 ): # Handle detached HEAD (e.g., after init) 333 logger.debug("The phen repository has no commits yet.") 334 commit_count = 0 335 else: 336 # Get the total number of commits in the default branch 337 commit_count = sum(1 for _ in repo.iter_commits()) 338 logger.debug(f"Repo has previous commits: {commit_count}") 339 else: 340 # local repo, create the directories and init 341 phen_path.mkdir(parents=True, exist_ok=True) 342 logger.debug(f"Phen directory '{phen_path}' has been created.") 343 repo = git.Repo.init(phen_path) 344 commit_count = 0 345 346 phen_path = phen_path.resolve() 347 # initialise empty repos 348 if commit_count == 0: 349 # create initial commit 350 initial_file_path = phen_path / "README.md" 351 with open(initial_file_path, "w") as file: 352 file.write( 353 "# Initial commit\nThis is the first commit in the phen repository.\n" 354 ) 355 repo.index.add([initial_file_path]) 356 repo.index.commit("Initial commit") 357 commit_count = 1 358 359 # Checkout the phens default branch, creating it if it does not exist 360 if DEFAULT_GIT_BRANCH in repo.branches: 361 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 362 main_branch.checkout() 363 else: 364 main_branch = repo.create_head(DEFAULT_GIT_BRANCH) 365 main_branch.checkout() 366 367 # if the phen path does not contain the config file then initialise the phen type 368 config_path = phen_path / CONFIG_FILE 369 if config_path.exists(): 370 logger.debug(f"Phenotype configuration files already exist") 371 return 372 373 logger.info("Creating phen directory structure and config files") 374 for d in DEFAULT_PHEN_DIR_LIST: 375 create_empty_git_dir(phen_path / d) 376 377 # create empty phen config file 378 config = { 379 "phenotype": { 380 "version": "0.0.0", 381 "omop": { 382 "vocabulary_id": "", 383 "vocabulary_name": "", 384 "vocabulary_reference": "", 385 }, 386 "translate": [], 387 "concept_sets": [], 388 } 389 } 390 391 with open(phen_path / CONFIG_FILE, "w") as file: 392 yaml.dump( 393 config, 394 file, 395 Dumper=util.QuotedDumper, 396 default_flow_style=False, 397 sort_keys=False, 398 default_style='"', 399 ) 400 401 # add git ignore 402 ignore_content = """# Ignore SQLite database files 403*.db 404*.sqlite3 405 406# Ignore SQLite journal and metadata files 407*.db-journal 408*.sqlite3-journal 409 410# python 411.ipynb_checkpoints 412 """ 413 ignore_path = phen_path / ".gitignore" 414 with open(ignore_path, "w") as file: 415 file.write(ignore_content) 416 417 # add to git repo and commit 418 for d in DEFAULT_PHEN_DIR_LIST: 419 repo.git.add(phen_path / d) 420 repo.git.add(all=True) 421 repo.index.commit("initialised the phen git repo.") 422 423 logger.info(f"Phenotype initialised successfully") 424 425 426def validate(phen_dir: str): 427 """Validates the phenotype directory is a git repo with standard structure""" 428 logger.info(f"Validating phenotype: {phen_dir}") 429 phen_path = Path(phen_dir) 430 if not phen_path.is_dir(): 431 raise NotADirectoryError( 432 f"Error: '{str(phen_path.resolve())}' is not a directory" 433 ) 434 435 config_path = phen_path / CONFIG_FILE 436 if not config_path.is_file(): 437 raise FileNotFoundError( 438 f"Error: phen configuration file '{config_path}' does not exist." 439 ) 440 441 concepts_path = phen_path / CONCEPTS_DIR 442 if not concepts_path.is_dir(): 443 raise FileNotFoundError( 444 f"Error: source concepts directory {concepts_path} does not exist." 445 ) 446 447 # Calidate the directory is a git repo 448 try: 449 git.Repo(phen_path) 450 except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError): 451 raise Exception(f"Phen directory {phen_path} is not a git repo") 452 453 # Load configuration File 454 if config_path.suffix == ".yaml": 455 try: 456 with config_path.open("r") as file: 457 phenotype = yaml.safe_load(file) 458 459 validator = Validator(CONFIG_SCHEMA) 460 if validator.validate(phenotype): 461 logger.debug("YAML structure is valid.") 462 else: 463 logger.error(f"YAML structure validation failed: {validator.errors}") 464 raise Exception(f"YAML structure validation failed: {validator.errors}") 465 except yaml.YAMLError as e: 466 logger.error(f"YAML syntax error: {e}") 467 raise e 468 else: 469 raise Exception( 470 f"Unsupported configuration filetype: {str(config_path.resolve())}" 471 ) 472 473 # initiatise 474 validation_errors = [] 475 phenotype = phenotype["phenotype"] 476 code_types = parse.CodeTypeParser().code_types 477 478 # check the version number is of the format vn.n.n 479 match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"]) 480 if not match: 481 validation_errors.append( 482 f"Invalid version format in configuration file: {phenotype['version']}" 483 ) 484 485 # create a list of all the concept set names defined in the concept set configuration 486 concept_set_names = [] 487 for item in phenotype["concept_sets"]: 488 if item["name"] in concept_set_names: 489 validation_errors.append( 490 f"Duplicate concept set defined in concept sets {item['name'] }" 491 ) 492 else: 493 concept_set_names.append(item["name"]) 494 495 # check codes definition 496 for item in phenotype["concept_sets"]: 497 # check concepte code file exists 498 concept_code_file_path = concepts_path / item["file"]["path"] 499 if not concept_code_file_path.exists(): 500 validation_errors.append( 501 f"Coding file {str(concept_code_file_path.resolve())} does not exist" 502 ) 503 504 # check concepte code file is not empty 505 if concept_code_file_path.stat().st_size == 0: 506 validation_errors.append( 507 f"Coding file {str(concept_code_file_path.resolve())} is an empty file" 508 ) 509 510 # check code file type is supported 511 if concept_code_file_path.suffix not in CODE_FILE_TYPES: 512 raise ValueError( 513 f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types" 514 ) 515 516 # check columns specified are a supported medical coding type 517 for column in item["file"]["columns"]: 518 if column not in code_types: 519 validation_errors.append( 520 f"Column type {column} for file {concept_code_file_path} is not supported" 521 ) 522 523 # check the actions are supported 524 if "actions" in item["file"]: 525 for action in item["file"]["actions"]: 526 if action not in COL_ACTIONS: 527 validation_errors.append(f"Action {action} is not supported") 528 529 if len(validation_errors) > 0: 530 logger.error(validation_errors) 531 raise PhenValidationException( 532 f"Configuration file {str(config_path.resolve())} failed validation", 533 validation_errors, 534 ) 535 536 logger.info(f"Phenotype validated successfully") 537 538 539def read_table_file(path: Path, excel_sheet: str = ""): 540 """ 541 Load Code List File 542 """ 543 544 path = path.resolve() 545 if path.suffix == ".csv": 546 df = pd.read_csv(path, dtype=str) 547 elif path.suffix == ".xlsx" or path.suffix == ".xls": 548 if excel_sheet != "": 549 df = pd.read_excel(path, sheet_name=excel_sheet, dtype=str) 550 else: 551 df = pd.read_excel(path, dtype=str) 552 elif path.suffix == ".dta": 553 df = pd.read_stata(path) 554 else: 555 raise ValueError( 556 f"Unsupported filetype {path.suffix}, only support{CODE_FILE_TYPES} code file types" 557 ) 558 559 return df 560 561 562def process_actions(df: pd.DataFrame, concept_set: dict) -> pd.DataFrame: 563 # Perform Structural Changes to file before preprocessing 564 logger.debug("Processing file structural actions") 565 if ( 566 "actions" in concept_set["file"] 567 and "split_col" in concept_set["file"]["actions"] 568 and "codes_col" in concept_set["file"]["actions"] 569 ): 570 split_col = concept_set["file"]["actions"]["split_col"] 571 codes_col = concept_set["file"]["actions"]["codes_col"] 572 logger.debug( 573 "Action: Splitting", 574 split_col, 575 "column into:", 576 df[split_col].unique(), 577 ) 578 codes = df[codes_col] 579 oh = pd.get_dummies(df[split_col], dtype=bool) # one hot encode 580 oh = oh.where((oh != True), codes, axis=0) # fill in 1s with codes 581 oh[oh == False] = np.nan # replace 0s with None 582 df = pd.concat([df, oh], axis=1) # merge in new columns 583 584 return df 585 586 587# Perform QA Checks on columns individually and append to df 588def preprocess_source_concepts( 589 df: pd.DataFrame, concept_set: dict, code_file_path: Path 590) -> Tuple[pd.DataFrame, list]: 591 """Parses each column individually - Order and length will not be preserved!""" 592 out = pd.DataFrame([]) # create output df to append to 593 code_errors = [] # list of errors from processing 594 595 # remove unnamed columns due to extra commas, missing headers, or incorrect parsing 596 df = df.drop(columns=[col for col in df.columns if "Unnamed" in col]) 597 598 # Preprocess codes 599 code_types = parse.CodeTypeParser().code_types 600 for code_type in concept_set["file"]["columns"]: 601 parser = code_types[code_type] 602 logger.info(f"Processing {code_type} codes for {code_file_path}") 603 604 # get codes by column name 605 source_col_name = concept_set["file"]["columns"][code_type] 606 codes = df[source_col_name].dropna() 607 codes = codes.astype(str) # convert to string 608 codes = codes.str.strip() # remove excess spaces 609 610 # process codes, validating them using parser and returning the errors 611 codes, errors = parser.process(codes, code_file_path) 612 if len(errors) > 0: 613 code_errors.extend(errors) 614 logger.warning(f"Codes validation failed with {len(errors)} errors") 615 616 # add processed codes to df 617 new_col_name = f"{source_col_name}_SOURCE" 618 df = df.rename(columns={source_col_name: new_col_name}) 619 process_codes = pd.DataFrame({code_type: codes}).join(df) 620 out = pd.concat( 621 [out, process_codes], 622 ignore_index=True, 623 ) 624 625 logger.debug(out.head()) 626 627 return out, code_errors 628 629 630def get_code_type_from_col_name(col_name: str): 631 return col_name.split("_")[0] 632 633 634# Translate Df with multiple codes into single code type Series 635def translate_codes( 636 source_df: pd.DataFrame, target_code_type: str, concept_name: str 637) -> pd.DataFrame: 638 """Translates each source code type the source coding list into a target type and returns all conversions as a concept set""" 639 640 # codes = pd.DataFrame([], dtype=str) 641 codes = pd.DataFrame( 642 columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string" 643 ) 644 # Convert codes to target type 645 logger.info(f"Converting to target code type {target_code_type}") 646 647 for source_code_type in source_df.columns: 648 649 # if target code type is the same as thet source code type, no translation, just appending source as target 650 if source_code_type == target_code_type: 651 copy_df = pd.DataFrame( 652 { 653 "SOURCE_CONCEPT": source_df[source_code_type], 654 "SOURCE_CONCEPT_TYPE": source_code_type, 655 "CONCEPT": source_df[source_code_type], 656 } 657 ) 658 codes = pd.concat([codes, copy_df]) 659 logger.debug( 660 f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating" 661 ) 662 else: 663 # get the translation filename using source to target code types 664 filename = f"{source_code_type}_to_{target_code_type}.parquet" 665 map_path = trud.PROCESSED_PATH / filename 666 667 # do the mapping if it exists 668 if map_path.exists(): 669 # get mapping 670 df_map = pd.read_parquet(map_path) 671 672 # do mapping 673 translated_df = pd.merge( 674 source_df[source_code_type], df_map, how="left" 675 ) 676 677 # normalise the output 678 translated_df.columns = ["SOURCE_CONCEPT", "CONCEPT"] 679 translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type 680 681 # add to list of codes 682 codes = pd.concat([codes, translated_df]) 683 684 else: 685 logger.warning( 686 f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" 687 ) 688 689 codes = codes.dropna() # delete NaNs 690 691 # added concept set type to output if any translations 692 if len(codes.index) > 0: 693 codes["CONCEPT_SET"] = concept_name 694 else: 695 logger.debug(f"No codes converted with target code type {target_code_type}") 696 697 return codes 698 699 700def sql_row_exist( 701 conn: sqlite3.Connection, table: str, column: str, value: str 702) -> bool: 703 # Execute and check if a result exists 704 cur = conn.cursor() 705 query = f"SELECT 1 FROM {table} WHERE {column} = ? LIMIT 1;" 706 cur.execute(query, (value,)) 707 exists = cur.fetchone() is not None 708 709 return exists 710 711 712def write_code_errors(code_errors: list, code_errors_path: Path): 713 err_df = pd.DataFrame( 714 [ 715 { 716 "CONCEPT": ", ".join(err.codes[~err.mask].tolist()), 717 "VOCABULARY": err.code_type, 718 "SOURCE": err.codes_file, 719 "CAUSE": err.message, 720 } 721 for err in code_errors 722 ] 723 ) 724 725 err_df = err_df.drop_duplicates() # Remove Duplicates from Error file 726 err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"]) 727 err_df.to_csv(code_errors_path, index=False, mode="w") 728 729 730def write_vocab_version(phen_path: Path): 731 # write the vocab version files 732 733 if not trud.VERSION_PATH.exists(): 734 raise FileNotFoundError( 735 f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed" 736 ) 737 738 if not omop.VERSION_PATH.exists(): 739 raise FileNotFoundError( 740 f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed" 741 ) 742 743 with trud.VERSION_PATH.open("r") as file: 744 trud_version = yaml.safe_load(file) 745 746 with omop.VERSION_PATH.open("r") as file: 747 omop_version = yaml.safe_load(file) 748 749 # Create the combined YAML structure 750 version_data = { 751 "versions": { 752 "acmc": acmc.__version__, 753 "trud": trud_version, 754 "omop": omop_version, 755 } 756 } 757 758 with open(phen_path / VOCAB_VERSION_FILE, "w") as file: 759 yaml.dump( 760 version_data, 761 file, 762 Dumper=util.QuotedDumper, 763 default_flow_style=False, 764 sort_keys=False, 765 default_style='"', 766 ) 767 768 769def map(phen_dir: str, target_code_type: str): 770 logger.info(f"Processing phenotype: {phen_dir}") 771 772 # Validate configuration 773 validate(phen_dir) 774 775 # initialise paths 776 phen_path = Path(phen_dir) 777 config_path = phen_path / CONFIG_FILE 778 779 # load configuration 780 with config_path.open("r") as file: 781 config = yaml.safe_load(file) 782 phenotype = config["phenotype"] 783 784 if len(phenotype["map"]) == 0: 785 raise ValueError(f"No map codes defined in the phenotype configuration") 786 787 if target_code_type is not None and target_code_type not in phenotype["map"]: 788 raise ValueError( 789 f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}" 790 ) 791 792 if target_code_type is not None: 793 map_target_code_type(phen_path, phenotype, target_code_type) 794 else: 795 for t in phenotype["map"]: 796 map_target_code_type(phen_path, phenotype, t) 797 798 logger.info(f"Phenotype processed successfully") 799 800 801def map_target_code_type(phen_path: Path, phenotype: dict, target_code_type: str): 802 logger.debug(f"Target coding format: {target_code_type}") 803 concepts_path = phen_path / CONCEPTS_DIR 804 # Create output dataframe 805 out = pd.DataFrame([]) 806 code_errors = [] 807 808 # Process each folder in codes section 809 for concept_set in phenotype["concept_sets"]: 810 logger.debug(f"--- {concept_set['file']} ---") 811 812 # Load code file 813 codes_file_path = Path(concepts_path / concept_set["file"]["path"]) 814 df = read_table_file(codes_file_path) 815 816 # process structural actions 817 df = process_actions(df, concept_set) 818 819 # preprocessing and validate of source concepts 820 logger.debug("Processing and validating source concept codes") 821 df, errors = preprocess_source_concepts( 822 df, 823 concept_set, 824 codes_file_path, 825 ) 826 827 # create df with just the source code columns 828 source_column_names = list(concept_set["file"]["columns"].keys()) 829 source_df = df[source_column_names] 830 831 logger.debug(source_df.columns) 832 logger.debug(source_df.head()) 833 834 logger.debug(f"Length of errors from preprocess_source_concepts {len(errors)}") 835 if len(errors) > 0: 836 code_errors.extend(errors) 837 logger.debug(f" Length of code_errors {len(code_errors)}") 838 839 # Map source concepts codes to target codes 840 # if processing a source coding list with categorical data 841 if ( 842 "actions" in concept_set["file"] 843 and "divide_col" in concept_set["file"]["actions"] 844 and len(df) > 0 845 ): 846 divide_col = concept_set["file"]["actions"]["divide_col"] 847 logger.debug(f"Action: Dividing Table by {divide_col}") 848 logger.debug(f"column into: {df[divide_col].unique()}") 849 df_grp = df.groupby(divide_col) 850 for cat, grp in df_grp: 851 if cat == concept_set["file"]["category"]: 852 grp = grp.drop(columns=[divide_col]) # delete categorical column 853 source_df = grp[source_column_names] 854 trans_out = translate_codes( 855 source_df, 856 target_code_type=target_code_type, 857 concept_name=concept_set["name"], 858 ) 859 out = pd.concat([out, trans_out]) 860 else: 861 source_df = df[source_column_names] 862 trans_out = translate_codes( 863 source_df, 864 target_code_type=target_code_type, 865 concept_name=concept_set["name"], 866 ) 867 out = pd.concat([out, trans_out]) 868 869 if len(code_errors) > 0: 870 logger.error(f"The map processing has {len(code_errors)} errors") 871 error_path = phen_path / MAP_DIR / "errors" 872 error_path.mkdir(parents=True, exist_ok=True) 873 error_filename = f"{target_code_type}-code-errors.csv" 874 write_code_errors(code_errors, error_path / error_filename) 875 876 # Check there is output from processing 877 if len(out.index) == 0: 878 logger.error(f"No output after map processing") 879 raise Exception( 880 f"No output after map processing, check config {str(phen_path.resolve())}" 881 ) 882 883 # final processing 884 out = out.reset_index(drop=True) 885 out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) 886 out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"]) 887 888 out_count = len(out.index) 889 # added metadata 890 # Loop over each source_concept_type and perform the left join on all columns apart from source code columns 891 result_list = [] 892 source_column_names = list(concept_set["file"]["columns"].keys()) 893 for source_concept_type in source_column_names: 894 895 # Filter output based on the current source_concept_type 896 out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type] 897 filtered_count = len(out_filtered_df.index) 898 899 # Remove the source type columns except the current type will leave the metadata and the join 900 remove_types = [ 901 type for type in source_column_names if type != source_concept_type 902 ] 903 metadata_df = df.drop(columns=remove_types) 904 metadata_df = metadata_df.rename( 905 columns={source_concept_type: "SOURCE_CONCEPT"} 906 ) 907 metadata_df_count = len(metadata_df.index) 908 909 # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata 910 result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT") 911 result_count = len(result.index) 912 913 logger.debug( 914 f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}" 915 ) 916 917 # Append the result to the result_list 918 result_list.append(result) 919 920 # Concatenate all the results into a single DataFrame 921 final_out = pd.concat(result_list, ignore_index=True) 922 final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) 923 logger.debug( 924 f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}" 925 ) 926 927 # Save output to map directory 928 output_filename = target_code_type + ".csv" 929 map_path = phen_path / MAP_DIR / output_filename 930 final_out.to_csv(map_path, index=False) 931 logger.info(f"Saved mapped concepts to {str(map_path.resolve())}") 932 933 # save concept sets as separate files 934 concept_set_path = phen_path / CSV_PATH / target_code_type 935 936 # empty the concept-set directory except for hiddle files, e.g. .git 937 if concept_set_path.exists(): 938 for item in concept_set_path.iterdir(): 939 if not item.name.startswith("."): 940 item.unlink() 941 else: 942 concept_set_path.mkdir(parents=True, exist_ok=True) 943 944 # write each concept as a separate file 945 for name, concept in final_out.groupby("CONCEPT_SET"): 946 concept = concept.sort_values(by="CONCEPT") # sort rows 947 concept = concept.dropna(how="all", axis=1) # remove empty cols 948 concept = concept.reindex( 949 sorted(concept.columns), axis=1 950 ) # sort cols alphabetically 951 filename = f"{name}.csv" 952 concept_path = concept_set_path / filename 953 concept.to_csv(concept_path, index=False) 954 955 write_vocab_version(phen_path) 956 957 logger.info(f"Phenotype processed target code type {target_code_type}") 958 959 960def generate_version_tag( 961 repo: git.Repo, increment: str = DEFAULT_VERSION_INC, use_v_prefix: bool = False 962) -> str: 963 # Get all valid semantic version tags 964 versions = [] 965 for tag in repo.tags: 966 tag_name = ( 967 tag.name.lstrip("v") if use_v_prefix else tag.name 968 ) # Remove 'v' if needed 969 if semver.Version.is_valid(tag_name): 970 versions.append(semver.Version.parse(tag_name)) 971 972 # Determine the next version 973 if not versions: 974 new_version = semver.Version(0, 0, 1) 975 else: 976 latest_version = max(versions) 977 if increment == "major": 978 new_version = latest_version.bump_major() 979 elif increment == "minor": 980 new_version = latest_version.bump_minor() 981 else: 982 new_version = latest_version.bump_patch() 983 984 # Create the new tag 985 new_version_str = f"v{new_version}" if use_v_prefix else str(new_version) 986 987 return new_version_str 988 989 990def publish( 991 phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC 992): 993 """Publishes updates to the phenotype by commiting all changes to the repo directory""" 994 995 # Validate config 996 validate(phen_dir) 997 phen_path = Path(phen_dir) 998 999 # load git repo and set the branch 1000 repo = git.Repo(phen_path) 1001 if DEFAULT_GIT_BRANCH in repo.branches: 1002 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 1003 main_branch.checkout() 1004 else: 1005 raise AttributeError( 1006 f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}" 1007 ) 1008 1009 # check if any changes to publish 1010 if not repo.is_dirty() and not repo.untracked_files: 1011 if remote_url is not None and "origin" not in repo.remotes: 1012 logger.info(f"First publish to remote url {remote_url}") 1013 else: 1014 logger.info("Nothing to publish, no changes to the repo") 1015 return 1016 1017 # get next version 1018 new_version_str = generate_version_tag(repo, increment) 1019 logger.info(f"New version: {new_version_str}") 1020 1021 # Write version in configuration file 1022 config_path = phen_path / CONFIG_FILE 1023 with config_path.open("r") as file: 1024 config = yaml.safe_load(file) 1025 1026 config["phenotype"]["version"] = new_version_str 1027 with open(config_path, "w") as file: 1028 yaml.dump( 1029 config, 1030 file, 1031 Dumper=util.QuotedDumper, 1032 default_flow_style=False, 1033 sort_keys=False, 1034 default_style='"', 1035 ) 1036 1037 # Add and commit changes to repo including version updates 1038 commit_message = f"Committing updates to phenotype {phen_path}" 1039 repo.git.add("--all") 1040 repo.index.commit(commit_message) 1041 1042 # Add tag to the repo 1043 repo.create_tag(new_version_str) 1044 1045 # push to origin if a remote repo 1046 if remote_url is not None and "origin" not in repo.remotes: 1047 git_url = construct_git_url(remote_url) 1048 repo.create_remote("origin", git_url) 1049 1050 try: 1051 if "origin" in repo.remotes: 1052 logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}") 1053 origin = repo.remotes.origin 1054 logger.info(f"Pushing main branch to remote repo") 1055 repo.git.push("--set-upstream", "origin", "main") 1056 logger.info(f"Pushing version tags to remote git repo") 1057 origin.push(tags=True) 1058 logger.debug("Changes pushed to 'origin'") 1059 else: 1060 logger.debug("Remote 'origin' is not set") 1061 except Exception as e: 1062 tag_ref = repo.tags[new_version_str] 1063 repo.delete_tag(tag_ref) 1064 repo.git.reset("--soft", "HEAD~1") 1065 raise e 1066 1067 logger.info(f"Phenotype published successfully") 1068 1069 1070def export(phen_dir: str, version: str): 1071 """Exports a phen repo at a specific tagged version into a target directory""" 1072 logger.info(f"Exporting phenotype {phen_dir} at version {version}") 1073 1074 # validate configuration 1075 validate(phen_dir) 1076 phen_path = Path(phen_dir) 1077 1078 # load configuration 1079 config_path = phen_path / CONFIG_FILE 1080 with config_path.open("r") as file: 1081 config = yaml.safe_load(file) 1082 1083 map_path = phen_path / MAP_DIR 1084 if not map_path.exists(): 1085 logger.warning(f"Map path does not exist '{map_path}'") 1086 1087 export_path = phen_path / OMOP_PATH 1088 # check export directory exists and if not create it 1089 if not export_path.exists(): 1090 export_path.mkdir(parents=True) 1091 logger.debug(f"OMOP export directory '{export_path}' created.") 1092 1093 # omop export db 1094 export_db_path = omop.export( 1095 map_path, 1096 export_path, 1097 config["phenotype"]["version"], 1098 config["phenotype"]["omop"], 1099 ) 1100 1101 # write to tables 1102 # export as csv 1103 logger.info(f"Phenotype exported successfully") 1104 1105 1106def copy(phen_dir: str, target_dir: str, version: str): 1107 """Copys a phen repo at a specific tagged version into a target directory""" 1108 1109 # Validate 1110 validate(phen_dir) 1111 phen_path = Path(phen_dir) 1112 1113 # Check target directory exists 1114 target_path = Path(target_dir) 1115 if not target_path.exists(): 1116 raise FileNotFoundError(f"The target directory {target_path} does not exist") 1117 1118 # Set copy directory 1119 copy_path = target_path / version 1120 logger.info(f"Copying repo {phen_path} to {copy_path}") 1121 1122 if ( 1123 copy_path.exists() and copy_path.is_dir() 1124 ): # Check if it exists and is a directory 1125 copy = check_delete_dir( 1126 copy_path, 1127 f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ", 1128 ) 1129 else: 1130 copy = True 1131 1132 if not copy: 1133 logger.info(f"Not copying the version {version}") 1134 return 1135 1136 logger.debug(f"Cloning repo from {phen_path} into {copy_path}...") 1137 repo = git.Repo.clone_from(phen_path, copy_path) 1138 1139 # Check out the latest commit or specified version 1140 if version: 1141 # Checkout a specific version (e.g., branch, tag, or commit hash) 1142 logger.info(f"Checking out version {version}...") 1143 repo.git.checkout(version) 1144 else: 1145 # Checkout the latest commit (HEAD) 1146 logger.info(f"Checking out the latest commit...") 1147 repo.git.checkout("HEAD") 1148 1149 logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}") 1150 1151 logger.info(f"Phenotype copied successfully") 1152 1153 1154# Convert concept_sets list into dictionaries 1155def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]: 1156 """Extracts concepts as {name: file_path} dictionary and a name set.""" 1157 concepts_dict = { 1158 item["name"]: item["file"]["path"] 1159 for item in config_data["phenotype"]["concept_sets"] 1160 } 1161 name_set = set(concepts_dict.keys()) 1162 return concepts_dict, name_set 1163 1164 1165def extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]: 1166 """ 1167 Extracts clean keys from a DeepDiff dictionary. 1168 1169 :param diff: DeepDiff result dictionary 1170 :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed") 1171 :return: A set of clean key names 1172 """ 1173 return {key.split("root['")[1].split("']")[0] for key in diff.get(key_type, [])} 1174 1175 1176def diff_config(old_config: dict, new_config: dict) -> str: 1177 report = f"\n# Changes to phenotype configuration\n" 1178 report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n" 1179 1180 old_concepts, old_names = extract_concepts(old_config) 1181 new_concepts, new_names = extract_concepts(new_config) 1182 1183 # Check added and removed names 1184 added_names = new_names - old_names # Names that appear in new but not in old 1185 removed_names = old_names - new_names # Names that were in old but not in new 1186 1187 # find file path changes for unchanged names 1188 unchanged_names = old_names & new_names # Names that exist in both 1189 file_diff = DeepDiff( 1190 {name: old_concepts[name] for name in unchanged_names}, 1191 {name: new_concepts[name] for name in unchanged_names}, 1192 ) 1193 1194 # Find renamed concepts (same file, different name) 1195 renamed_concepts = [] 1196 for removed in removed_names: 1197 old_path = old_concepts[removed] 1198 for added in added_names: 1199 new_path = new_concepts[added] 1200 if old_path == new_path: 1201 renamed_concepts.append((removed, added)) 1202 1203 # Remove renamed concepts from added and removed sets 1204 for old_name, new_name in renamed_concepts: 1205 added_names.discard(new_name) 1206 removed_names.discard(old_name) 1207 1208 # generate config report 1209 if added_names: 1210 report += "## Added Concepts\n" 1211 for name in added_names: 1212 report += f"- `{name}` (File: `{new_concepts[name]}`)\n" 1213 report += "\n" 1214 1215 if removed_names: 1216 report += "## Removed Concepts\n" 1217 for name in removed_names: 1218 report += f"- `{name}` (File: `{old_concepts[name]}`)\n" 1219 report += "\n" 1220 1221 if renamed_concepts: 1222 report += "## Renamed Concepts\n" 1223 for old_name, new_name in renamed_concepts: 1224 report += ( 1225 f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n" 1226 ) 1227 report += "\n" 1228 1229 if "values_changed" in file_diff: 1230 report += "## Updated File Paths\n" 1231 for name, change in file_diff["values_changed"].items(): 1232 old_file = change["old_value"] 1233 new_file = change["new_value"] 1234 clean_name = name.split("root['")[1].split("']")[0] 1235 report += ( 1236 f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n" 1237 ) 1238 report += "\n" 1239 1240 if not ( 1241 added_names 1242 or removed_names 1243 or renamed_concepts 1244 or file_diff.get("values_changed") 1245 ): 1246 report += "No changes in concept sets.\n" 1247 1248 return report 1249 1250 1251def diff_map_files(old_map_path: Path, new_map_path: Path) -> str: 1252 old_output_files = [ 1253 file.name 1254 for file in old_map_path.iterdir() 1255 if file.is_file() and not file.name.startswith(".") 1256 ] 1257 new_output_files = [ 1258 file.name 1259 for file in new_map_path.iterdir() 1260 if file.is_file() and not file.name.startswith(".") 1261 ] 1262 1263 # Convert the lists to sets for easy comparison 1264 old_output_set = set(old_output_files) 1265 new_output_set = set(new_output_files) 1266 1267 # Outputs that are in old_output_set but not in new_output_set (removed files) 1268 removed_outputs = old_output_set - new_output_set 1269 # Outputs that are in new_output_set but not in old_output_set (added files) 1270 added_outputs = new_output_set - old_output_set 1271 # Outputs that are the intersection of old_output_set and new_output_set 1272 common_outputs = old_output_set & new_output_set 1273 1274 report = f"\n# Changes to available translations\n" 1275 report += f"This compares the coding translations files available.\n\n" 1276 report += f"- Removed outputs: {sorted(list(removed_outputs))}\n" 1277 report += f"- Added outputs: {sorted(list(added_outputs))}\n" 1278 report += f"- Common outputs: {sorted(list(common_outputs))}\n\n" 1279 1280 # Step N: Compare common outputs between versions 1281 report += f"# Changes to concepts in translation files\n\n" 1282 report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n" 1283 for file in common_outputs: 1284 old_output = old_map_path / file 1285 new_output = new_map_path / file 1286 1287 logger.debug(f"Old ouptput: {str(old_output.resolve())}") 1288 logger.debug(f"New ouptput: {str(new_output.resolve())}") 1289 1290 df1 = pd.read_csv(old_output) 1291 df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1292 df2 = pd.read_csv(new_output) 1293 df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1294 1295 # Check for added and removed concepts 1296 report += f"- File {file}\n" 1297 sorted_list = sorted(list(set(df1.index) - set(df2.index))) 1298 report += f"- Removed concepts {sorted_list}\n" 1299 sorted_list = sorted(list(set(df2.index) - set(df1.index))) 1300 report += f"- Added concepts {sorted_list}\n" 1301 1302 # Check for changed concepts 1303 diff = df2 - df1 # diff in counts 1304 diff = diff[ 1305 (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna() 1306 ] # get non-zero counts 1307 s = "\n" 1308 if len(diff.index) > 0: 1309 for concept, row in diff.iterrows(): 1310 s += "\t - {} {}\n".format(concept, row["CONCEPT"]) 1311 report += f"- Changed concepts {s}\n\n" 1312 else: 1313 report += f"- Changed concepts []\n\n" 1314 1315 return report 1316 1317 1318def diff_phen( 1319 new_phen_path: Path, 1320 new_version: str, 1321 old_phen_path: Path, 1322 old_version: str, 1323 report_path: Path, 1324): 1325 """Compare the differences between two versions of a phenotype""" 1326 1327 # validate phenotypes 1328 logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}") 1329 validate(str(old_phen_path.resolve())) 1330 logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}") 1331 validate(str(new_phen_path.resolve())) 1332 1333 # get old and new config 1334 old_config_path = old_phen_path / CONFIG_FILE 1335 with old_config_path.open("r") as file: 1336 old_config = yaml.safe_load(file) 1337 new_config_path = new_phen_path / CONFIG_FILE 1338 with new_config_path.open("r") as file: 1339 new_config = yaml.safe_load(file) 1340 1341 # write report heading 1342 report = f"# Phenotype Comparison Report\n" 1343 report += f"## Original phenotype\n" 1344 report += f" - {old_config['phenotype']['omop']['vocabulary_id']}\n" 1345 report += f" - {old_version}\n" 1346 report += f" - {str(old_phen_path.resolve())}\n" 1347 report += f"## Changed phenotype:\n" 1348 report += f" - {new_config['phenotype']['omop']['vocabulary_id']}\n" 1349 report += f" - {new_version}\n" 1350 report += f" - {str(new_phen_path.resolve())}\n" 1351 1352 # Step 1: check differences configuration files 1353 # Convert list of dicts into a dict: {name: file} 1354 report += diff_config(old_config, new_config) 1355 1356 # Step 2: check differences between map files 1357 # List files from output directories 1358 old_map_path = old_phen_path / MAP_DIR 1359 new_map_path = new_phen_path / MAP_DIR 1360 report += diff_map_files(old_map_path, new_map_path) 1361 1362 # initialise report file 1363 logger.debug(f"Writing to report file {str(report_path.resolve())}") 1364 report_file = open(report_path, "w") 1365 report_file.write(report) 1366 report_file.close() 1367 1368 logger.info(f"Phenotypes diff'd successfully") 1369 1370 1371def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str): 1372 # make tmp directory .acmc 1373 timestamp = time.strftime("%Y%m%d_%H%M%S") 1374 temp_dir = Path(f".acmc/diff_{timestamp}") 1375 1376 changed_phen_path = Path(phen_dir) 1377 if not changed_phen_path.exists(): 1378 raise ValueError( 1379 f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}" 1380 ) 1381 1382 old_phen_path = Path(old_phen_dir) 1383 if not old_phen_path.exists(): 1384 raise ValueError( 1385 f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}" 1386 ) 1387 1388 try: 1389 # Create the directory 1390 temp_dir.mkdir(parents=True, exist_ok=True) 1391 logger.debug(f"Temporary directory created: {temp_dir}") 1392 1393 # Create temporary directories 1394 changed_path = temp_dir / "changed" 1395 changed_path.mkdir(parents=True, exist_ok=True) 1396 old_path = temp_dir / "old" 1397 old_path.mkdir(parents=True, exist_ok=True) 1398 1399 # checkout changed 1400 if version == "latest": 1401 logger.debug( 1402 f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..." 1403 ) 1404 shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True) 1405 else: 1406 logger.debug( 1407 f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..." 1408 ) 1409 changed_repo = git.Repo.clone_from(changed_phen_path, changed_path) 1410 changed_repo.git.checkout(version) 1411 1412 # checkout old 1413 if old_version == "latest": 1414 logger.debug( 1415 f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1416 ) 1417 shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True) 1418 else: 1419 logger.debug( 1420 f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1421 ) 1422 old_repo = git.Repo.clone_from(old_phen_dir, old_path) 1423 old_repo.git.checkout(old_version) 1424 1425 report_filename = f"{version}_{old_version}_diff.md" 1426 report_path = changed_phen_path / report_filename 1427 # diff old with new 1428 diff_phen(changed_path, version, old_path, old_version, report_path) 1429 1430 finally: 1431 # clean up tmp directory 1432 if temp_dir.exists(): 1433 shutil.rmtree(temp_dir) 1434 print(f"Temporary directory removed: {temp_dir}")
129class PhenValidationException(Exception): 130 """Custom exception class raised when validation errors in phenotype configuration file""" 131 132 def __init__(self, message, validation_errors=None): 133 super().__init__(message) 134 self.validation_errors = validation_errors
Custom exception class raised when validation errors in phenotype configuration file
137def construct_git_url(remote_url: str): 138 """Constructs a git url for github or gitlab including a PAT token environment variable""" 139 # check the url 140 parsed_url = urlparse(remote_url) 141 142 # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd 143 # need to update this function if the URL scheme is different. 144 if "github.com" in parsed_url.netloc: 145 # get GitHub PAT from environment variable 146 auth = os.getenv("ACMC_GITHUB_PAT") 147 if not auth: 148 raise ValueError( 149 "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable." 150 ) 151 else: 152 # get GitLab PAT from environment variable 153 auth = os.getenv("ACMC_GITLAB_PAT") 154 if not auth: 155 raise ValueError( 156 "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable." 157 ) 158 auth = f"oauth2:{auth}" 159 160 # Construct the new URL with credentials 161 new_netloc = f"{auth}@{parsed_url.netloc}" 162 return urlunparse( 163 ( 164 parsed_url.scheme, 165 new_netloc, 166 parsed_url.path, 167 parsed_url.params, 168 parsed_url.query, 169 parsed_url.fragment, 170 ) 171 )
Constructs a git url for github or gitlab including a PAT token environment variable
174def create_empty_git_dir(path: Path): 175 """Creates a directory with a .gitkeep file so that it's tracked in git""" 176 path.mkdir(exist_ok=True) 177 keep_path = path / ".gitkeep" 178 keep_path.touch(exist_ok=True)
Creates a directory with a .gitkeep file so that it's tracked in git
181def check_delete_dir(path: Path, msg: str) -> bool: 182 """Checks on the command line if a user wants to delete a directory 183 184 Args: 185 path (Path): path of the directory to be deleted 186 msg (str): message to be displayed to the user 187 188 Returns: 189 Boolean: True if deleted 190 """ 191 deleted = False 192 193 user_input = input(f"{msg}").strip().lower() 194 if user_input in ["yes", "y"]: 195 shutil.rmtree(path) 196 deleted = True 197 else: 198 logger.info("Directory was not deleted.") 199 200 return deleted
Checks on the command line if a user wants to delete a directory
Args: path (Path): path of the directory to be deleted msg (str): message to be displayed to the user
Returns: Boolean: True if deleted
203def fork( 204 phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str = None 205): 206 """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin" 207 208 Args: 209 phen_dir (str): local directory path where the upstream repo is to be cloned 210 upstream_url (str): url to the upstream repo 211 upstream_version (str): version in the upstream repo to clone 212 new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None. 213 214 Raises: 215 ValueError: if the specified version is not in the upstream repo 216 ValueError: if the upstream repo is not a valid phenotype repo 217 ValueError: if there's any other problems with Git 218 """ 219 logger.info( 220 f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}" 221 ) 222 223 phen_path = Path(phen_dir) 224 # check if directory already exists and ask user if they want to recreate it 225 if ( 226 phen_path.exists() and phen_path.is_dir() 227 ): # Check if it exists and is a directory 228 configure = check_delete_dir( 229 phen_path, 230 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 231 ) 232 else: 233 configure = True 234 235 if not configure: 236 logger.info(f"Exiting, phenotype not initiatised") 237 return 238 239 try: 240 # Clone repo 241 git_url = construct_git_url(upstream_url) 242 repo = git.Repo.clone_from(git_url, phen_path) 243 244 # Fetch all branches and tags 245 repo.remotes.origin.fetch() 246 247 # Check if the version exists 248 available_refs = [ref.name.split("/")[-1] for ref in repo.references] 249 if upstream_version not in available_refs: 250 raise ValueError( 251 f"Version '{upstream_version}' not found in the repository: {upstream_url}." 252 ) 253 254 # Checkout the specified version 255 repo.git.checkout(upstream_version) 256 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 257 main_branch.checkout() 258 259 # Check if 'config.yaml' exists in the root directory 260 config_path = phen_path / "config.yaml" 261 if not os.path.isfile(config_path): 262 raise ValueError( 263 f"The forked repository is not a valid ACMC repo because 'config.yaml' is missing in the root directory." 264 ) 265 266 # Validate the phenotype is compatible with the acmc tool 267 validate(str(phen_path.resolve())) 268 269 # Delete each tag locally 270 tags = repo.tags 271 for tag in tags: 272 repo.delete_tag(tag) 273 logger.debug(f"Deleted tags from forked repo: {tag}") 274 275 # Add upstream remote 276 repo.create_remote("upstream", upstream_url) 277 remote = repo.remotes["origin"] 278 repo.delete_remote(remote) # Remove existing origin 279 280 # Optionally set a new origin remote 281 if new_origin_url: 282 git_url = construct_git_url(new_origin_url) 283 repo.create_remote("origin", git_url) 284 repo.git.push("--set-upstream", "origin", "main") 285 286 logger.info(f"Repository forked successfully at {phen_path}") 287 logger.info(f"Upstream set to {upstream_url}") 288 if new_origin_url: 289 logger.info(f"Origin set to {new_origin_url}") 290 291 except Exception as e: 292 if phen_path.exists(): 293 shutil.rmtree(phen_path) 294 raise ValueError(f"Error occurred during repository fork: {str(e)}")
Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
Args: phen_dir (str): local directory path where the upstream repo is to be cloned upstream_url (str): url to the upstream repo upstream_version (str): version in the upstream repo to clone new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
Raises: ValueError: if the specified version is not in the upstream repo ValueError: if the upstream repo is not a valid phenotype repo ValueError: if there's any other problems with Git
297def init(phen_dir: str, remote_url: str): 298 """Initial phenotype directory as git repo with standard structure""" 299 logger.info(f"Initialising Phenotype in directory: {phen_dir}") 300 phen_path = Path(phen_dir) 301 302 # check if directory already exists and ask user if they want to recreate it 303 if ( 304 phen_path.exists() and phen_path.is_dir() 305 ): # Check if it exists and is a directory 306 configure = check_delete_dir( 307 phen_path, 308 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 309 ) 310 else: 311 configure = True 312 313 if not configure: 314 logger.info(f"Exiting, phenotype not initiatised") 315 return 316 317 # Initialise repo from local or remote 318 repo: Repo 319 # if remote then clone the repo otherwise init a local repo 320 if remote_url != None: 321 # add PAT token to the URL 322 git_url = construct_git_url(remote_url) 323 324 # clone the repo 325 repo = git.cmd.Git() 326 repo.clone(git_url, phen_path) 327 328 # open repo 329 repo = Repo(phen_path) 330 # check if there are any commits (new repo has no commits) 331 if ( 332 len(repo.branches) == 0 or repo.head.is_detached 333 ): # Handle detached HEAD (e.g., after init) 334 logger.debug("The phen repository has no commits yet.") 335 commit_count = 0 336 else: 337 # Get the total number of commits in the default branch 338 commit_count = sum(1 for _ in repo.iter_commits()) 339 logger.debug(f"Repo has previous commits: {commit_count}") 340 else: 341 # local repo, create the directories and init 342 phen_path.mkdir(parents=True, exist_ok=True) 343 logger.debug(f"Phen directory '{phen_path}' has been created.") 344 repo = git.Repo.init(phen_path) 345 commit_count = 0 346 347 phen_path = phen_path.resolve() 348 # initialise empty repos 349 if commit_count == 0: 350 # create initial commit 351 initial_file_path = phen_path / "README.md" 352 with open(initial_file_path, "w") as file: 353 file.write( 354 "# Initial commit\nThis is the first commit in the phen repository.\n" 355 ) 356 repo.index.add([initial_file_path]) 357 repo.index.commit("Initial commit") 358 commit_count = 1 359 360 # Checkout the phens default branch, creating it if it does not exist 361 if DEFAULT_GIT_BRANCH in repo.branches: 362 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 363 main_branch.checkout() 364 else: 365 main_branch = repo.create_head(DEFAULT_GIT_BRANCH) 366 main_branch.checkout() 367 368 # if the phen path does not contain the config file then initialise the phen type 369 config_path = phen_path / CONFIG_FILE 370 if config_path.exists(): 371 logger.debug(f"Phenotype configuration files already exist") 372 return 373 374 logger.info("Creating phen directory structure and config files") 375 for d in DEFAULT_PHEN_DIR_LIST: 376 create_empty_git_dir(phen_path / d) 377 378 # create empty phen config file 379 config = { 380 "phenotype": { 381 "version": "0.0.0", 382 "omop": { 383 "vocabulary_id": "", 384 "vocabulary_name": "", 385 "vocabulary_reference": "", 386 }, 387 "translate": [], 388 "concept_sets": [], 389 } 390 } 391 392 with open(phen_path / CONFIG_FILE, "w") as file: 393 yaml.dump( 394 config, 395 file, 396 Dumper=util.QuotedDumper, 397 default_flow_style=False, 398 sort_keys=False, 399 default_style='"', 400 ) 401 402 # add git ignore 403 ignore_content = """# Ignore SQLite database files 404*.db 405*.sqlite3 406 407# Ignore SQLite journal and metadata files 408*.db-journal 409*.sqlite3-journal 410 411# python 412.ipynb_checkpoints 413 """ 414 ignore_path = phen_path / ".gitignore" 415 with open(ignore_path, "w") as file: 416 file.write(ignore_content) 417 418 # add to git repo and commit 419 for d in DEFAULT_PHEN_DIR_LIST: 420 repo.git.add(phen_path / d) 421 repo.git.add(all=True) 422 repo.index.commit("initialised the phen git repo.") 423 424 logger.info(f"Phenotype initialised successfully")
Initial phenotype directory as git repo with standard structure
427def validate(phen_dir: str): 428 """Validates the phenotype directory is a git repo with standard structure""" 429 logger.info(f"Validating phenotype: {phen_dir}") 430 phen_path = Path(phen_dir) 431 if not phen_path.is_dir(): 432 raise NotADirectoryError( 433 f"Error: '{str(phen_path.resolve())}' is not a directory" 434 ) 435 436 config_path = phen_path / CONFIG_FILE 437 if not config_path.is_file(): 438 raise FileNotFoundError( 439 f"Error: phen configuration file '{config_path}' does not exist." 440 ) 441 442 concepts_path = phen_path / CONCEPTS_DIR 443 if not concepts_path.is_dir(): 444 raise FileNotFoundError( 445 f"Error: source concepts directory {concepts_path} does not exist." 446 ) 447 448 # Calidate the directory is a git repo 449 try: 450 git.Repo(phen_path) 451 except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError): 452 raise Exception(f"Phen directory {phen_path} is not a git repo") 453 454 # Load configuration File 455 if config_path.suffix == ".yaml": 456 try: 457 with config_path.open("r") as file: 458 phenotype = yaml.safe_load(file) 459 460 validator = Validator(CONFIG_SCHEMA) 461 if validator.validate(phenotype): 462 logger.debug("YAML structure is valid.") 463 else: 464 logger.error(f"YAML structure validation failed: {validator.errors}") 465 raise Exception(f"YAML structure validation failed: {validator.errors}") 466 except yaml.YAMLError as e: 467 logger.error(f"YAML syntax error: {e}") 468 raise e 469 else: 470 raise Exception( 471 f"Unsupported configuration filetype: {str(config_path.resolve())}" 472 ) 473 474 # initiatise 475 validation_errors = [] 476 phenotype = phenotype["phenotype"] 477 code_types = parse.CodeTypeParser().code_types 478 479 # check the version number is of the format vn.n.n 480 match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"]) 481 if not match: 482 validation_errors.append( 483 f"Invalid version format in configuration file: {phenotype['version']}" 484 ) 485 486 # create a list of all the concept set names defined in the concept set configuration 487 concept_set_names = [] 488 for item in phenotype["concept_sets"]: 489 if item["name"] in concept_set_names: 490 validation_errors.append( 491 f"Duplicate concept set defined in concept sets {item['name'] }" 492 ) 493 else: 494 concept_set_names.append(item["name"]) 495 496 # check codes definition 497 for item in phenotype["concept_sets"]: 498 # check concepte code file exists 499 concept_code_file_path = concepts_path / item["file"]["path"] 500 if not concept_code_file_path.exists(): 501 validation_errors.append( 502 f"Coding file {str(concept_code_file_path.resolve())} does not exist" 503 ) 504 505 # check concepte code file is not empty 506 if concept_code_file_path.stat().st_size == 0: 507 validation_errors.append( 508 f"Coding file {str(concept_code_file_path.resolve())} is an empty file" 509 ) 510 511 # check code file type is supported 512 if concept_code_file_path.suffix not in CODE_FILE_TYPES: 513 raise ValueError( 514 f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types" 515 ) 516 517 # check columns specified are a supported medical coding type 518 for column in item["file"]["columns"]: 519 if column not in code_types: 520 validation_errors.append( 521 f"Column type {column} for file {concept_code_file_path} is not supported" 522 ) 523 524 # check the actions are supported 525 if "actions" in item["file"]: 526 for action in item["file"]["actions"]: 527 if action not in COL_ACTIONS: 528 validation_errors.append(f"Action {action} is not supported") 529 530 if len(validation_errors) > 0: 531 logger.error(validation_errors) 532 raise PhenValidationException( 533 f"Configuration file {str(config_path.resolve())} failed validation", 534 validation_errors, 535 ) 536 537 logger.info(f"Phenotype validated successfully")
Validates the phenotype directory is a git repo with standard structure
540def read_table_file(path: Path, excel_sheet: str = ""): 541 """ 542 Load Code List File 543 """ 544 545 path = path.resolve() 546 if path.suffix == ".csv": 547 df = pd.read_csv(path, dtype=str) 548 elif path.suffix == ".xlsx" or path.suffix == ".xls": 549 if excel_sheet != "": 550 df = pd.read_excel(path, sheet_name=excel_sheet, dtype=str) 551 else: 552 df = pd.read_excel(path, dtype=str) 553 elif path.suffix == ".dta": 554 df = pd.read_stata(path) 555 else: 556 raise ValueError( 557 f"Unsupported filetype {path.suffix}, only support{CODE_FILE_TYPES} code file types" 558 ) 559 560 return df
Load Code List File
563def process_actions(df: pd.DataFrame, concept_set: dict) -> pd.DataFrame: 564 # Perform Structural Changes to file before preprocessing 565 logger.debug("Processing file structural actions") 566 if ( 567 "actions" in concept_set["file"] 568 and "split_col" in concept_set["file"]["actions"] 569 and "codes_col" in concept_set["file"]["actions"] 570 ): 571 split_col = concept_set["file"]["actions"]["split_col"] 572 codes_col = concept_set["file"]["actions"]["codes_col"] 573 logger.debug( 574 "Action: Splitting", 575 split_col, 576 "column into:", 577 df[split_col].unique(), 578 ) 579 codes = df[codes_col] 580 oh = pd.get_dummies(df[split_col], dtype=bool) # one hot encode 581 oh = oh.where((oh != True), codes, axis=0) # fill in 1s with codes 582 oh[oh == False] = np.nan # replace 0s with None 583 df = pd.concat([df, oh], axis=1) # merge in new columns 584 585 return df
589def preprocess_source_concepts( 590 df: pd.DataFrame, concept_set: dict, code_file_path: Path 591) -> Tuple[pd.DataFrame, list]: 592 """Parses each column individually - Order and length will not be preserved!""" 593 out = pd.DataFrame([]) # create output df to append to 594 code_errors = [] # list of errors from processing 595 596 # remove unnamed columns due to extra commas, missing headers, or incorrect parsing 597 df = df.drop(columns=[col for col in df.columns if "Unnamed" in col]) 598 599 # Preprocess codes 600 code_types = parse.CodeTypeParser().code_types 601 for code_type in concept_set["file"]["columns"]: 602 parser = code_types[code_type] 603 logger.info(f"Processing {code_type} codes for {code_file_path}") 604 605 # get codes by column name 606 source_col_name = concept_set["file"]["columns"][code_type] 607 codes = df[source_col_name].dropna() 608 codes = codes.astype(str) # convert to string 609 codes = codes.str.strip() # remove excess spaces 610 611 # process codes, validating them using parser and returning the errors 612 codes, errors = parser.process(codes, code_file_path) 613 if len(errors) > 0: 614 code_errors.extend(errors) 615 logger.warning(f"Codes validation failed with {len(errors)} errors") 616 617 # add processed codes to df 618 new_col_name = f"{source_col_name}_SOURCE" 619 df = df.rename(columns={source_col_name: new_col_name}) 620 process_codes = pd.DataFrame({code_type: codes}).join(df) 621 out = pd.concat( 622 [out, process_codes], 623 ignore_index=True, 624 ) 625 626 logger.debug(out.head()) 627 628 return out, code_errors
Parses each column individually - Order and length will not be preserved!
636def translate_codes( 637 source_df: pd.DataFrame, target_code_type: str, concept_name: str 638) -> pd.DataFrame: 639 """Translates each source code type the source coding list into a target type and returns all conversions as a concept set""" 640 641 # codes = pd.DataFrame([], dtype=str) 642 codes = pd.DataFrame( 643 columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string" 644 ) 645 # Convert codes to target type 646 logger.info(f"Converting to target code type {target_code_type}") 647 648 for source_code_type in source_df.columns: 649 650 # if target code type is the same as thet source code type, no translation, just appending source as target 651 if source_code_type == target_code_type: 652 copy_df = pd.DataFrame( 653 { 654 "SOURCE_CONCEPT": source_df[source_code_type], 655 "SOURCE_CONCEPT_TYPE": source_code_type, 656 "CONCEPT": source_df[source_code_type], 657 } 658 ) 659 codes = pd.concat([codes, copy_df]) 660 logger.debug( 661 f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating" 662 ) 663 else: 664 # get the translation filename using source to target code types 665 filename = f"{source_code_type}_to_{target_code_type}.parquet" 666 map_path = trud.PROCESSED_PATH / filename 667 668 # do the mapping if it exists 669 if map_path.exists(): 670 # get mapping 671 df_map = pd.read_parquet(map_path) 672 673 # do mapping 674 translated_df = pd.merge( 675 source_df[source_code_type], df_map, how="left" 676 ) 677 678 # normalise the output 679 translated_df.columns = ["SOURCE_CONCEPT", "CONCEPT"] 680 translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type 681 682 # add to list of codes 683 codes = pd.concat([codes, translated_df]) 684 685 else: 686 logger.warning( 687 f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" 688 ) 689 690 codes = codes.dropna() # delete NaNs 691 692 # added concept set type to output if any translations 693 if len(codes.index) > 0: 694 codes["CONCEPT_SET"] = concept_name 695 else: 696 logger.debug(f"No codes converted with target code type {target_code_type}") 697 698 return codes
Translates each source code type the source coding list into a target type and returns all conversions as a concept set
701def sql_row_exist( 702 conn: sqlite3.Connection, table: str, column: str, value: str 703) -> bool: 704 # Execute and check if a result exists 705 cur = conn.cursor() 706 query = f"SELECT 1 FROM {table} WHERE {column} = ? LIMIT 1;" 707 cur.execute(query, (value,)) 708 exists = cur.fetchone() is not None 709 710 return exists
713def write_code_errors(code_errors: list, code_errors_path: Path): 714 err_df = pd.DataFrame( 715 [ 716 { 717 "CONCEPT": ", ".join(err.codes[~err.mask].tolist()), 718 "VOCABULARY": err.code_type, 719 "SOURCE": err.codes_file, 720 "CAUSE": err.message, 721 } 722 for err in code_errors 723 ] 724 ) 725 726 err_df = err_df.drop_duplicates() # Remove Duplicates from Error file 727 err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"]) 728 err_df.to_csv(code_errors_path, index=False, mode="w")
731def write_vocab_version(phen_path: Path): 732 # write the vocab version files 733 734 if not trud.VERSION_PATH.exists(): 735 raise FileNotFoundError( 736 f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed" 737 ) 738 739 if not omop.VERSION_PATH.exists(): 740 raise FileNotFoundError( 741 f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed" 742 ) 743 744 with trud.VERSION_PATH.open("r") as file: 745 trud_version = yaml.safe_load(file) 746 747 with omop.VERSION_PATH.open("r") as file: 748 omop_version = yaml.safe_load(file) 749 750 # Create the combined YAML structure 751 version_data = { 752 "versions": { 753 "acmc": acmc.__version__, 754 "trud": trud_version, 755 "omop": omop_version, 756 } 757 } 758 759 with open(phen_path / VOCAB_VERSION_FILE, "w") as file: 760 yaml.dump( 761 version_data, 762 file, 763 Dumper=util.QuotedDumper, 764 default_flow_style=False, 765 sort_keys=False, 766 default_style='"', 767 )
770def map(phen_dir: str, target_code_type: str): 771 logger.info(f"Processing phenotype: {phen_dir}") 772 773 # Validate configuration 774 validate(phen_dir) 775 776 # initialise paths 777 phen_path = Path(phen_dir) 778 config_path = phen_path / CONFIG_FILE 779 780 # load configuration 781 with config_path.open("r") as file: 782 config = yaml.safe_load(file) 783 phenotype = config["phenotype"] 784 785 if len(phenotype["map"]) == 0: 786 raise ValueError(f"No map codes defined in the phenotype configuration") 787 788 if target_code_type is not None and target_code_type not in phenotype["map"]: 789 raise ValueError( 790 f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}" 791 ) 792 793 if target_code_type is not None: 794 map_target_code_type(phen_path, phenotype, target_code_type) 795 else: 796 for t in phenotype["map"]: 797 map_target_code_type(phen_path, phenotype, t) 798 799 logger.info(f"Phenotype processed successfully")
802def map_target_code_type(phen_path: Path, phenotype: dict, target_code_type: str): 803 logger.debug(f"Target coding format: {target_code_type}") 804 concepts_path = phen_path / CONCEPTS_DIR 805 # Create output dataframe 806 out = pd.DataFrame([]) 807 code_errors = [] 808 809 # Process each folder in codes section 810 for concept_set in phenotype["concept_sets"]: 811 logger.debug(f"--- {concept_set['file']} ---") 812 813 # Load code file 814 codes_file_path = Path(concepts_path / concept_set["file"]["path"]) 815 df = read_table_file(codes_file_path) 816 817 # process structural actions 818 df = process_actions(df, concept_set) 819 820 # preprocessing and validate of source concepts 821 logger.debug("Processing and validating source concept codes") 822 df, errors = preprocess_source_concepts( 823 df, 824 concept_set, 825 codes_file_path, 826 ) 827 828 # create df with just the source code columns 829 source_column_names = list(concept_set["file"]["columns"].keys()) 830 source_df = df[source_column_names] 831 832 logger.debug(source_df.columns) 833 logger.debug(source_df.head()) 834 835 logger.debug(f"Length of errors from preprocess_source_concepts {len(errors)}") 836 if len(errors) > 0: 837 code_errors.extend(errors) 838 logger.debug(f" Length of code_errors {len(code_errors)}") 839 840 # Map source concepts codes to target codes 841 # if processing a source coding list with categorical data 842 if ( 843 "actions" in concept_set["file"] 844 and "divide_col" in concept_set["file"]["actions"] 845 and len(df) > 0 846 ): 847 divide_col = concept_set["file"]["actions"]["divide_col"] 848 logger.debug(f"Action: Dividing Table by {divide_col}") 849 logger.debug(f"column into: {df[divide_col].unique()}") 850 df_grp = df.groupby(divide_col) 851 for cat, grp in df_grp: 852 if cat == concept_set["file"]["category"]: 853 grp = grp.drop(columns=[divide_col]) # delete categorical column 854 source_df = grp[source_column_names] 855 trans_out = translate_codes( 856 source_df, 857 target_code_type=target_code_type, 858 concept_name=concept_set["name"], 859 ) 860 out = pd.concat([out, trans_out]) 861 else: 862 source_df = df[source_column_names] 863 trans_out = translate_codes( 864 source_df, 865 target_code_type=target_code_type, 866 concept_name=concept_set["name"], 867 ) 868 out = pd.concat([out, trans_out]) 869 870 if len(code_errors) > 0: 871 logger.error(f"The map processing has {len(code_errors)} errors") 872 error_path = phen_path / MAP_DIR / "errors" 873 error_path.mkdir(parents=True, exist_ok=True) 874 error_filename = f"{target_code_type}-code-errors.csv" 875 write_code_errors(code_errors, error_path / error_filename) 876 877 # Check there is output from processing 878 if len(out.index) == 0: 879 logger.error(f"No output after map processing") 880 raise Exception( 881 f"No output after map processing, check config {str(phen_path.resolve())}" 882 ) 883 884 # final processing 885 out = out.reset_index(drop=True) 886 out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) 887 out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"]) 888 889 out_count = len(out.index) 890 # added metadata 891 # Loop over each source_concept_type and perform the left join on all columns apart from source code columns 892 result_list = [] 893 source_column_names = list(concept_set["file"]["columns"].keys()) 894 for source_concept_type in source_column_names: 895 896 # Filter output based on the current source_concept_type 897 out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type] 898 filtered_count = len(out_filtered_df.index) 899 900 # Remove the source type columns except the current type will leave the metadata and the join 901 remove_types = [ 902 type for type in source_column_names if type != source_concept_type 903 ] 904 metadata_df = df.drop(columns=remove_types) 905 metadata_df = metadata_df.rename( 906 columns={source_concept_type: "SOURCE_CONCEPT"} 907 ) 908 metadata_df_count = len(metadata_df.index) 909 910 # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata 911 result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT") 912 result_count = len(result.index) 913 914 logger.debug( 915 f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}" 916 ) 917 918 # Append the result to the result_list 919 result_list.append(result) 920 921 # Concatenate all the results into a single DataFrame 922 final_out = pd.concat(result_list, ignore_index=True) 923 final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) 924 logger.debug( 925 f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}" 926 ) 927 928 # Save output to map directory 929 output_filename = target_code_type + ".csv" 930 map_path = phen_path / MAP_DIR / output_filename 931 final_out.to_csv(map_path, index=False) 932 logger.info(f"Saved mapped concepts to {str(map_path.resolve())}") 933 934 # save concept sets as separate files 935 concept_set_path = phen_path / CSV_PATH / target_code_type 936 937 # empty the concept-set directory except for hiddle files, e.g. .git 938 if concept_set_path.exists(): 939 for item in concept_set_path.iterdir(): 940 if not item.name.startswith("."): 941 item.unlink() 942 else: 943 concept_set_path.mkdir(parents=True, exist_ok=True) 944 945 # write each concept as a separate file 946 for name, concept in final_out.groupby("CONCEPT_SET"): 947 concept = concept.sort_values(by="CONCEPT") # sort rows 948 concept = concept.dropna(how="all", axis=1) # remove empty cols 949 concept = concept.reindex( 950 sorted(concept.columns), axis=1 951 ) # sort cols alphabetically 952 filename = f"{name}.csv" 953 concept_path = concept_set_path / filename 954 concept.to_csv(concept_path, index=False) 955 956 write_vocab_version(phen_path) 957 958 logger.info(f"Phenotype processed target code type {target_code_type}")
961def generate_version_tag( 962 repo: git.Repo, increment: str = DEFAULT_VERSION_INC, use_v_prefix: bool = False 963) -> str: 964 # Get all valid semantic version tags 965 versions = [] 966 for tag in repo.tags: 967 tag_name = ( 968 tag.name.lstrip("v") if use_v_prefix else tag.name 969 ) # Remove 'v' if needed 970 if semver.Version.is_valid(tag_name): 971 versions.append(semver.Version.parse(tag_name)) 972 973 # Determine the next version 974 if not versions: 975 new_version = semver.Version(0, 0, 1) 976 else: 977 latest_version = max(versions) 978 if increment == "major": 979 new_version = latest_version.bump_major() 980 elif increment == "minor": 981 new_version = latest_version.bump_minor() 982 else: 983 new_version = latest_version.bump_patch() 984 985 # Create the new tag 986 new_version_str = f"v{new_version}" if use_v_prefix else str(new_version) 987 988 return new_version_str
991def publish( 992 phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC 993): 994 """Publishes updates to the phenotype by commiting all changes to the repo directory""" 995 996 # Validate config 997 validate(phen_dir) 998 phen_path = Path(phen_dir) 999 1000 # load git repo and set the branch 1001 repo = git.Repo(phen_path) 1002 if DEFAULT_GIT_BRANCH in repo.branches: 1003 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 1004 main_branch.checkout() 1005 else: 1006 raise AttributeError( 1007 f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}" 1008 ) 1009 1010 # check if any changes to publish 1011 if not repo.is_dirty() and not repo.untracked_files: 1012 if remote_url is not None and "origin" not in repo.remotes: 1013 logger.info(f"First publish to remote url {remote_url}") 1014 else: 1015 logger.info("Nothing to publish, no changes to the repo") 1016 return 1017 1018 # get next version 1019 new_version_str = generate_version_tag(repo, increment) 1020 logger.info(f"New version: {new_version_str}") 1021 1022 # Write version in configuration file 1023 config_path = phen_path / CONFIG_FILE 1024 with config_path.open("r") as file: 1025 config = yaml.safe_load(file) 1026 1027 config["phenotype"]["version"] = new_version_str 1028 with open(config_path, "w") as file: 1029 yaml.dump( 1030 config, 1031 file, 1032 Dumper=util.QuotedDumper, 1033 default_flow_style=False, 1034 sort_keys=False, 1035 default_style='"', 1036 ) 1037 1038 # Add and commit changes to repo including version updates 1039 commit_message = f"Committing updates to phenotype {phen_path}" 1040 repo.git.add("--all") 1041 repo.index.commit(commit_message) 1042 1043 # Add tag to the repo 1044 repo.create_tag(new_version_str) 1045 1046 # push to origin if a remote repo 1047 if remote_url is not None and "origin" not in repo.remotes: 1048 git_url = construct_git_url(remote_url) 1049 repo.create_remote("origin", git_url) 1050 1051 try: 1052 if "origin" in repo.remotes: 1053 logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}") 1054 origin = repo.remotes.origin 1055 logger.info(f"Pushing main branch to remote repo") 1056 repo.git.push("--set-upstream", "origin", "main") 1057 logger.info(f"Pushing version tags to remote git repo") 1058 origin.push(tags=True) 1059 logger.debug("Changes pushed to 'origin'") 1060 else: 1061 logger.debug("Remote 'origin' is not set") 1062 except Exception as e: 1063 tag_ref = repo.tags[new_version_str] 1064 repo.delete_tag(tag_ref) 1065 repo.git.reset("--soft", "HEAD~1") 1066 raise e 1067 1068 logger.info(f"Phenotype published successfully")
Publishes updates to the phenotype by commiting all changes to the repo directory
1071def export(phen_dir: str, version: str): 1072 """Exports a phen repo at a specific tagged version into a target directory""" 1073 logger.info(f"Exporting phenotype {phen_dir} at version {version}") 1074 1075 # validate configuration 1076 validate(phen_dir) 1077 phen_path = Path(phen_dir) 1078 1079 # load configuration 1080 config_path = phen_path / CONFIG_FILE 1081 with config_path.open("r") as file: 1082 config = yaml.safe_load(file) 1083 1084 map_path = phen_path / MAP_DIR 1085 if not map_path.exists(): 1086 logger.warning(f"Map path does not exist '{map_path}'") 1087 1088 export_path = phen_path / OMOP_PATH 1089 # check export directory exists and if not create it 1090 if not export_path.exists(): 1091 export_path.mkdir(parents=True) 1092 logger.debug(f"OMOP export directory '{export_path}' created.") 1093 1094 # omop export db 1095 export_db_path = omop.export( 1096 map_path, 1097 export_path, 1098 config["phenotype"]["version"], 1099 config["phenotype"]["omop"], 1100 ) 1101 1102 # write to tables 1103 # export as csv 1104 logger.info(f"Phenotype exported successfully")
Exports a phen repo at a specific tagged version into a target directory
1107def copy(phen_dir: str, target_dir: str, version: str): 1108 """Copys a phen repo at a specific tagged version into a target directory""" 1109 1110 # Validate 1111 validate(phen_dir) 1112 phen_path = Path(phen_dir) 1113 1114 # Check target directory exists 1115 target_path = Path(target_dir) 1116 if not target_path.exists(): 1117 raise FileNotFoundError(f"The target directory {target_path} does not exist") 1118 1119 # Set copy directory 1120 copy_path = target_path / version 1121 logger.info(f"Copying repo {phen_path} to {copy_path}") 1122 1123 if ( 1124 copy_path.exists() and copy_path.is_dir() 1125 ): # Check if it exists and is a directory 1126 copy = check_delete_dir( 1127 copy_path, 1128 f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ", 1129 ) 1130 else: 1131 copy = True 1132 1133 if not copy: 1134 logger.info(f"Not copying the version {version}") 1135 return 1136 1137 logger.debug(f"Cloning repo from {phen_path} into {copy_path}...") 1138 repo = git.Repo.clone_from(phen_path, copy_path) 1139 1140 # Check out the latest commit or specified version 1141 if version: 1142 # Checkout a specific version (e.g., branch, tag, or commit hash) 1143 logger.info(f"Checking out version {version}...") 1144 repo.git.checkout(version) 1145 else: 1146 # Checkout the latest commit (HEAD) 1147 logger.info(f"Checking out the latest commit...") 1148 repo.git.checkout("HEAD") 1149 1150 logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}") 1151 1152 logger.info(f"Phenotype copied successfully")
Copys a phen repo at a specific tagged version into a target directory
1156def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]: 1157 """Extracts concepts as {name: file_path} dictionary and a name set.""" 1158 concepts_dict = { 1159 item["name"]: item["file"]["path"] 1160 for item in config_data["phenotype"]["concept_sets"] 1161 } 1162 name_set = set(concepts_dict.keys()) 1163 return concepts_dict, name_set
Extracts concepts as {name: file_path} dictionary and a name set.
1166def extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]: 1167 """ 1168 Extracts clean keys from a DeepDiff dictionary. 1169 1170 :param diff: DeepDiff result dictionary 1171 :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed") 1172 :return: A set of clean key names 1173 """ 1174 return {key.split("root['")[1].split("']")[0] for key in diff.get(key_type, [])}
Extracts clean keys from a DeepDiff dictionary.
:param diff: DeepDiff result dictionary :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed") :return: A set of clean key names
1177def diff_config(old_config: dict, new_config: dict) -> str: 1178 report = f"\n# Changes to phenotype configuration\n" 1179 report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n" 1180 1181 old_concepts, old_names = extract_concepts(old_config) 1182 new_concepts, new_names = extract_concepts(new_config) 1183 1184 # Check added and removed names 1185 added_names = new_names - old_names # Names that appear in new but not in old 1186 removed_names = old_names - new_names # Names that were in old but not in new 1187 1188 # find file path changes for unchanged names 1189 unchanged_names = old_names & new_names # Names that exist in both 1190 file_diff = DeepDiff( 1191 {name: old_concepts[name] for name in unchanged_names}, 1192 {name: new_concepts[name] for name in unchanged_names}, 1193 ) 1194 1195 # Find renamed concepts (same file, different name) 1196 renamed_concepts = [] 1197 for removed in removed_names: 1198 old_path = old_concepts[removed] 1199 for added in added_names: 1200 new_path = new_concepts[added] 1201 if old_path == new_path: 1202 renamed_concepts.append((removed, added)) 1203 1204 # Remove renamed concepts from added and removed sets 1205 for old_name, new_name in renamed_concepts: 1206 added_names.discard(new_name) 1207 removed_names.discard(old_name) 1208 1209 # generate config report 1210 if added_names: 1211 report += "## Added Concepts\n" 1212 for name in added_names: 1213 report += f"- `{name}` (File: `{new_concepts[name]}`)\n" 1214 report += "\n" 1215 1216 if removed_names: 1217 report += "## Removed Concepts\n" 1218 for name in removed_names: 1219 report += f"- `{name}` (File: `{old_concepts[name]}`)\n" 1220 report += "\n" 1221 1222 if renamed_concepts: 1223 report += "## Renamed Concepts\n" 1224 for old_name, new_name in renamed_concepts: 1225 report += ( 1226 f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n" 1227 ) 1228 report += "\n" 1229 1230 if "values_changed" in file_diff: 1231 report += "## Updated File Paths\n" 1232 for name, change in file_diff["values_changed"].items(): 1233 old_file = change["old_value"] 1234 new_file = change["new_value"] 1235 clean_name = name.split("root['")[1].split("']")[0] 1236 report += ( 1237 f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n" 1238 ) 1239 report += "\n" 1240 1241 if not ( 1242 added_names 1243 or removed_names 1244 or renamed_concepts 1245 or file_diff.get("values_changed") 1246 ): 1247 report += "No changes in concept sets.\n" 1248 1249 return report
1252def diff_map_files(old_map_path: Path, new_map_path: Path) -> str: 1253 old_output_files = [ 1254 file.name 1255 for file in old_map_path.iterdir() 1256 if file.is_file() and not file.name.startswith(".") 1257 ] 1258 new_output_files = [ 1259 file.name 1260 for file in new_map_path.iterdir() 1261 if file.is_file() and not file.name.startswith(".") 1262 ] 1263 1264 # Convert the lists to sets for easy comparison 1265 old_output_set = set(old_output_files) 1266 new_output_set = set(new_output_files) 1267 1268 # Outputs that are in old_output_set but not in new_output_set (removed files) 1269 removed_outputs = old_output_set - new_output_set 1270 # Outputs that are in new_output_set but not in old_output_set (added files) 1271 added_outputs = new_output_set - old_output_set 1272 # Outputs that are the intersection of old_output_set and new_output_set 1273 common_outputs = old_output_set & new_output_set 1274 1275 report = f"\n# Changes to available translations\n" 1276 report += f"This compares the coding translations files available.\n\n" 1277 report += f"- Removed outputs: {sorted(list(removed_outputs))}\n" 1278 report += f"- Added outputs: {sorted(list(added_outputs))}\n" 1279 report += f"- Common outputs: {sorted(list(common_outputs))}\n\n" 1280 1281 # Step N: Compare common outputs between versions 1282 report += f"# Changes to concepts in translation files\n\n" 1283 report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n" 1284 for file in common_outputs: 1285 old_output = old_map_path / file 1286 new_output = new_map_path / file 1287 1288 logger.debug(f"Old ouptput: {str(old_output.resolve())}") 1289 logger.debug(f"New ouptput: {str(new_output.resolve())}") 1290 1291 df1 = pd.read_csv(old_output) 1292 df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1293 df2 = pd.read_csv(new_output) 1294 df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1295 1296 # Check for added and removed concepts 1297 report += f"- File {file}\n" 1298 sorted_list = sorted(list(set(df1.index) - set(df2.index))) 1299 report += f"- Removed concepts {sorted_list}\n" 1300 sorted_list = sorted(list(set(df2.index) - set(df1.index))) 1301 report += f"- Added concepts {sorted_list}\n" 1302 1303 # Check for changed concepts 1304 diff = df2 - df1 # diff in counts 1305 diff = diff[ 1306 (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna() 1307 ] # get non-zero counts 1308 s = "\n" 1309 if len(diff.index) > 0: 1310 for concept, row in diff.iterrows(): 1311 s += "\t - {} {}\n".format(concept, row["CONCEPT"]) 1312 report += f"- Changed concepts {s}\n\n" 1313 else: 1314 report += f"- Changed concepts []\n\n" 1315 1316 return report
1319def diff_phen( 1320 new_phen_path: Path, 1321 new_version: str, 1322 old_phen_path: Path, 1323 old_version: str, 1324 report_path: Path, 1325): 1326 """Compare the differences between two versions of a phenotype""" 1327 1328 # validate phenotypes 1329 logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}") 1330 validate(str(old_phen_path.resolve())) 1331 logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}") 1332 validate(str(new_phen_path.resolve())) 1333 1334 # get old and new config 1335 old_config_path = old_phen_path / CONFIG_FILE 1336 with old_config_path.open("r") as file: 1337 old_config = yaml.safe_load(file) 1338 new_config_path = new_phen_path / CONFIG_FILE 1339 with new_config_path.open("r") as file: 1340 new_config = yaml.safe_load(file) 1341 1342 # write report heading 1343 report = f"# Phenotype Comparison Report\n" 1344 report += f"## Original phenotype\n" 1345 report += f" - {old_config['phenotype']['omop']['vocabulary_id']}\n" 1346 report += f" - {old_version}\n" 1347 report += f" - {str(old_phen_path.resolve())}\n" 1348 report += f"## Changed phenotype:\n" 1349 report += f" - {new_config['phenotype']['omop']['vocabulary_id']}\n" 1350 report += f" - {new_version}\n" 1351 report += f" - {str(new_phen_path.resolve())}\n" 1352 1353 # Step 1: check differences configuration files 1354 # Convert list of dicts into a dict: {name: file} 1355 report += diff_config(old_config, new_config) 1356 1357 # Step 2: check differences between map files 1358 # List files from output directories 1359 old_map_path = old_phen_path / MAP_DIR 1360 new_map_path = new_phen_path / MAP_DIR 1361 report += diff_map_files(old_map_path, new_map_path) 1362 1363 # initialise report file 1364 logger.debug(f"Writing to report file {str(report_path.resolve())}") 1365 report_file = open(report_path, "w") 1366 report_file.write(report) 1367 report_file.close() 1368 1369 logger.info(f"Phenotypes diff'd successfully")
Compare the differences between two versions of a phenotype
1372def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str): 1373 # make tmp directory .acmc 1374 timestamp = time.strftime("%Y%m%d_%H%M%S") 1375 temp_dir = Path(f".acmc/diff_{timestamp}") 1376 1377 changed_phen_path = Path(phen_dir) 1378 if not changed_phen_path.exists(): 1379 raise ValueError( 1380 f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}" 1381 ) 1382 1383 old_phen_path = Path(old_phen_dir) 1384 if not old_phen_path.exists(): 1385 raise ValueError( 1386 f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}" 1387 ) 1388 1389 try: 1390 # Create the directory 1391 temp_dir.mkdir(parents=True, exist_ok=True) 1392 logger.debug(f"Temporary directory created: {temp_dir}") 1393 1394 # Create temporary directories 1395 changed_path = temp_dir / "changed" 1396 changed_path.mkdir(parents=True, exist_ok=True) 1397 old_path = temp_dir / "old" 1398 old_path.mkdir(parents=True, exist_ok=True) 1399 1400 # checkout changed 1401 if version == "latest": 1402 logger.debug( 1403 f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..." 1404 ) 1405 shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True) 1406 else: 1407 logger.debug( 1408 f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..." 1409 ) 1410 changed_repo = git.Repo.clone_from(changed_phen_path, changed_path) 1411 changed_repo.git.checkout(version) 1412 1413 # checkout old 1414 if old_version == "latest": 1415 logger.debug( 1416 f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1417 ) 1418 shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True) 1419 else: 1420 logger.debug( 1421 f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1422 ) 1423 old_repo = git.Repo.clone_from(old_phen_dir, old_path) 1424 old_repo.git.checkout(old_version) 1425 1426 report_filename = f"{version}_{old_version}_diff.md" 1427 report_path = changed_phen_path / report_filename 1428 # diff old with new 1429 diff_phen(changed_path, version, old_path, old_version, report_path) 1430 1431 finally: 1432 # clean up tmp directory 1433 if temp_dir.exists(): 1434 shutil.rmtree(temp_dir) 1435 print(f"Temporary directory removed: {temp_dir}")