acmc.phen
Phenotype Module
This module provides functionality for managing phenotypes.
1""" 2Phenotype Module 3================ 4This module provides functionality for managing phenotypes. 5""" 6 7import argparse 8import pandas as pd 9import numpy as np 10import json 11import os 12import sqlite3 13import sys 14import shutil 15import time 16import git 17import re 18import logging 19import requests 20import yaml 21import semver 22from git import Repo 23from cerberus import Validator # type: ignore 24from deepdiff import DeepDiff 25from pathlib import Path 26from urllib.parse import urlparse, urlunparse 27from typing import Tuple, Set, Any 28 29import acmc 30from acmc import trud, omop, parse, util 31 32# setup logging 33import acmc.logging_config as lc 34 35logger = lc.setup_logger() 36 37pd.set_option("mode.chained_assignment", None) 38 39PHEN_DIR = "phen" 40DEFAULT_PHEN_PATH = Path("./workspace") / PHEN_DIR 41 42CONCEPTS_DIR = "concepts" 43MAP_DIR = "map" 44CONCEPT_SET_DIR = "concept-sets" 45CSV_PATH = Path(CONCEPT_SET_DIR) / "csv" 46OMOP_PATH = Path(CONCEPT_SET_DIR) / "omop" 47DEFAULT_PHEN_DIR_LIST = [CONCEPTS_DIR, MAP_DIR, CONCEPT_SET_DIR] 48CONFIG_FILE = "config.yaml" 49VOCAB_VERSION_FILE = "vocab_version.yaml" 50SEMANTIC_VERSION_TYPES = ["major", "minor", "patch"] 51DEFAULT_VERSION_INC = "patch" 52 53DEFAULT_GIT_BRANCH = "main" 54 55SPLIT_COL_ACTION = "split_col" 56CODES_COL_ACTION = "codes_col" 57DIVIDE_COL_ACTION = "divide_col" 58COL_ACTIONS = [SPLIT_COL_ACTION, CODES_COL_ACTION, DIVIDE_COL_ACTION] 59 60CODE_FILE_TYPES = [".xlsx", ".xls", ".csv"] 61SOURCE_COL_SUFFIX = "_acmc_source" 62TARGET_COL_SUFFIX = "_acmc_target" 63 64# config.yaml schema 65CONFIG_SCHEMA = { 66 "phenotype": { 67 "type": "dict", 68 "required": True, 69 "schema": { 70 "version": { 71 "type": "string", 72 "required": True, 73 "regex": r"^\d+\.\d+\.\d+$", # Enforces 'vN.N.N' format 74 }, 75 "omop": { 76 "type": "dict", 77 "required": True, 78 "schema": { 79 "vocabulary_id": {"type": "string", "required": True}, 80 "vocabulary_name": {"type": "string", "required": True}, 81 "vocabulary_reference": { 82 "type": "string", 83 "required": True, 84 "regex": r"^https?://.*", # Ensures it's a URL 85 }, 86 }, 87 }, 88 "map": { 89 "type": "list", 90 "schema": { 91 "type": "string", 92 "allowed": list( 93 parse.SUPPORTED_CODE_TYPES 94 ), # Ensure only predefined values are allowed 95 }, 96 }, 97 "concept_sets": { 98 "type": "list", 99 "required": True, 100 "schema": { 101 "type": "dict", 102 "schema": { 103 "name": {"type": "string", "required": True}, 104 "file": { 105 "type": "dict", 106 "required": False, 107 "schema": { 108 "path": {"type": "string", "required": True}, 109 "columns": {"type": "dict", "required": True}, 110 "category": { 111 "type": "string" 112 }, # Optional but must be string if present 113 "actions": { 114 "type": "dict", 115 "schema": {"divide_col": {"type": "string"}}, 116 }, 117 }, 118 }, 119 "metadata": {"type": "dict", "required": True}, 120 }, 121 }, 122 }, 123 }, 124 } 125} 126 127 128class PhenValidationException(Exception): 129 """Custom exception class raised when validation errors in phenotype configuration file""" 130 131 def __init__(self, message, validation_errors=None): 132 super().__init__(message) 133 self.validation_errors = validation_errors 134 135 136def construct_git_url(remote_url: str): 137 """Constructs a git url for github or gitlab including a PAT token environment variable""" 138 # check the url 139 parsed_url = urlparse(remote_url) 140 141 # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd 142 # need to update this function if the URL scheme is different. 143 if "github.com" in parsed_url.netloc: 144 # get GitHub PAT from environment variable 145 auth = os.getenv("ACMC_GITHUB_PAT") 146 if not auth: 147 raise ValueError( 148 "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable." 149 ) 150 else: 151 # get GitLab PAT from environment variable 152 auth = os.getenv("ACMC_GITLAB_PAT") 153 if not auth: 154 raise ValueError( 155 "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable." 156 ) 157 auth = f"oauth2:{auth}" 158 159 # Construct the new URL with credentials 160 new_netloc = f"{auth}@{parsed_url.netloc}" 161 return urlunparse( 162 ( 163 parsed_url.scheme, 164 new_netloc, 165 parsed_url.path, 166 parsed_url.params, 167 parsed_url.query, 168 parsed_url.fragment, 169 ) 170 ) 171 172 173def create_empty_git_dir(path: Path): 174 """Creates a directory with a .gitkeep file so that it's tracked in git""" 175 path.mkdir(exist_ok=True) 176 keep_path = path / ".gitkeep" 177 keep_path.touch(exist_ok=True) 178 179 180def check_delete_dir(path: Path, msg: str) -> bool: 181 """Checks on the command line if a user wants to delete a directory 182 183 Args: 184 path (Path): path of the directory to be deleted 185 msg (str): message to be displayed to the user 186 187 Returns: 188 Boolean: True if deleted 189 """ 190 deleted = False 191 192 user_input = input(f"{msg}").strip().lower() 193 if user_input in ["yes", "y"]: 194 shutil.rmtree(path) 195 deleted = True 196 else: 197 logger.info("Directory was not deleted.") 198 199 return deleted 200 201 202def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str): 203 """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin" 204 205 Args: 206 phen_dir (str): local directory path where the upstream repo is to be cloned 207 upstream_url (str): url to the upstream repo 208 upstream_version (str): version in the upstream repo to clone 209 new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None. 210 211 Raises: 212 ValueError: if the specified version is not in the upstream repo 213 ValueError: if the upstream repo is not a valid phenotype repo 214 ValueError: if there's any other problems with Git 215 """ 216 logger.info( 217 f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}" 218 ) 219 220 phen_path = Path(phen_dir) 221 # check if directory already exists and ask user if they want to recreate it 222 if ( 223 phen_path.exists() and phen_path.is_dir() 224 ): # Check if it exists and is a directory 225 configure = check_delete_dir( 226 phen_path, 227 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 228 ) 229 else: 230 configure = True 231 232 if not configure: 233 logger.info(f"Exiting, phenotype not initiatised") 234 return 235 236 try: 237 # Clone repo 238 git_url = construct_git_url(upstream_url) 239 repo = git.Repo.clone_from(git_url, phen_path) 240 241 # Fetch all branches and tags 242 repo.remotes.origin.fetch() 243 244 # Check if the version exists 245 available_refs = [ref.name.split("/")[-1] for ref in repo.references] 246 if upstream_version not in available_refs: 247 raise ValueError( 248 f"Version '{upstream_version}' not found in the repository: {upstream_url}." 249 ) 250 251 # Checkout the specified version 252 repo.git.checkout(upstream_version) 253 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 254 main_branch.checkout() 255 256 # Check if 'config.yaml' exists in the root directory 257 config_path = phen_path / "config.yaml" 258 if not os.path.isfile(config_path): 259 raise ValueError( 260 f"The forked repository is not a valid ACMC repo because 'config.yaml' is missing in the root directory." 261 ) 262 263 # Validate the phenotype is compatible with the acmc tool 264 validate(str(phen_path.resolve())) 265 266 # Delete each tag locally 267 tags = repo.tags 268 for tag in tags: 269 repo.delete_tag(tag) 270 logger.debug(f"Deleted tags from forked repo: {tag}") 271 272 # Add upstream remote 273 repo.create_remote("upstream", upstream_url) 274 remote = repo.remotes["origin"] 275 repo.delete_remote(remote) # Remove existing origin 276 277 # Optionally set a new origin remote 278 if new_origin_url: 279 git_url = construct_git_url(new_origin_url) 280 repo.create_remote("origin", git_url) 281 repo.git.push("--set-upstream", "origin", "main") 282 283 logger.info(f"Repository forked successfully at {phen_path}") 284 logger.info(f"Upstream set to {upstream_url}") 285 if new_origin_url: 286 logger.info(f"Origin set to {new_origin_url}") 287 288 except Exception as e: 289 if phen_path.exists(): 290 shutil.rmtree(phen_path) 291 raise ValueError(f"Error occurred during repository fork: {str(e)}") 292 293 294def init(phen_dir: str, remote_url: str): 295 """Initial phenotype directory as git repo with standard structure""" 296 logger.info(f"Initialising Phenotype in directory: {phen_dir}") 297 phen_path = Path(phen_dir) 298 299 # check if directory already exists and ask user if they want to recreate it 300 if ( 301 phen_path.exists() and phen_path.is_dir() 302 ): # Check if it exists and is a directory 303 configure = check_delete_dir( 304 phen_path, 305 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 306 ) 307 else: 308 configure = True 309 310 if not configure: 311 logger.info(f"Exiting, phenotype not initiatised") 312 return 313 314 # Initialise repo from local or remote 315 repo: Repo 316 317 # if remote then clone the repo otherwise init a local repo 318 if remote_url != None: 319 # add PAT token to the URL 320 git_url = construct_git_url(remote_url) 321 322 # clone the repo 323 git_cmd = git.cmd.Git() 324 git_cmd.clone(git_url, phen_path) 325 326 # open repo 327 repo = Repo(phen_path) 328 # check if there are any commits (new repo has no commits) 329 if ( 330 len(repo.branches) == 0 or repo.head.is_detached 331 ): # Handle detached HEAD (e.g., after init) 332 logger.debug("The phen repository has no commits yet.") 333 commit_count = 0 334 else: 335 # Get the total number of commits in the default branch 336 commit_count = sum(1 for _ in repo.iter_commits()) 337 logger.debug(f"Repo has previous commits: {commit_count}") 338 else: 339 # local repo, create the directories and init 340 phen_path.mkdir(parents=True, exist_ok=True) 341 logger.debug(f"Phen directory '{phen_path}' has been created.") 342 repo = git.Repo.init(phen_path) 343 commit_count = 0 344 345 phen_path = phen_path.resolve() 346 # initialise empty repos 347 if commit_count == 0: 348 # create initial commit 349 initial_file_path = phen_path / "README.md" 350 with open(initial_file_path, "w") as file: 351 file.write( 352 "# Initial commit\nThis is the first commit in the phen repository.\n" 353 ) 354 repo.index.add([initial_file_path]) 355 repo.index.commit("Initial commit") 356 commit_count = 1 357 358 # Checkout the phens default branch, creating it if it does not exist 359 if DEFAULT_GIT_BRANCH in repo.branches: 360 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 361 main_branch.checkout() 362 else: 363 main_branch = repo.create_head(DEFAULT_GIT_BRANCH) 364 main_branch.checkout() 365 366 # if the phen path does not contain the config file then initialise the phen type 367 config_path = phen_path / CONFIG_FILE 368 if config_path.exists(): 369 logger.debug(f"Phenotype configuration files already exist") 370 return 371 372 logger.info("Creating phen directory structure and config files") 373 for d in DEFAULT_PHEN_DIR_LIST: 374 create_empty_git_dir(phen_path / d) 375 376 # create empty phen config file 377 config = { 378 "phenotype": { 379 "version": "0.0.0", 380 "omop": { 381 "vocabulary_id": "", 382 "vocabulary_name": "", 383 "vocabulary_reference": "", 384 }, 385 "translate": [], 386 "concept_sets": [], 387 } 388 } 389 390 with open(phen_path / CONFIG_FILE, "w") as file: 391 yaml.dump( 392 config, 393 file, 394 Dumper=util.QuotedDumper, 395 default_flow_style=False, 396 sort_keys=False, 397 default_style='"', 398 ) 399 400 # add git ignore 401 ignore_content = """# Ignore SQLite database files 402*.db 403*.sqlite3 404 405# Ignore SQLite journal and metadata files 406*.db-journal 407*.sqlite3-journal 408 409# python 410.ipynb_checkpoints 411 """ 412 ignore_path = phen_path / ".gitignore" 413 with open(ignore_path, "w") as file: 414 file.write(ignore_content) 415 416 # add to git repo and commit 417 for d in DEFAULT_PHEN_DIR_LIST: 418 repo.git.add(phen_path / d) 419 repo.git.add(all=True) 420 repo.index.commit("initialised the phen git repo.") 421 422 logger.info(f"Phenotype initialised successfully") 423 424 425def validate(phen_dir: str): 426 """Validates the phenotype directory is a git repo with standard structure""" 427 logger.info(f"Validating phenotype: {phen_dir}") 428 phen_path = Path(phen_dir) 429 if not phen_path.is_dir(): 430 raise NotADirectoryError( 431 f"Error: '{str(phen_path.resolve())}' is not a directory" 432 ) 433 434 config_path = phen_path / CONFIG_FILE 435 if not config_path.is_file(): 436 raise FileNotFoundError( 437 f"Error: phen configuration file '{config_path}' does not exist." 438 ) 439 440 concepts_path = phen_path / CONCEPTS_DIR 441 if not concepts_path.is_dir(): 442 raise FileNotFoundError( 443 f"Error: source concepts directory {concepts_path} does not exist." 444 ) 445 446 # Calidate the directory is a git repo 447 try: 448 git.Repo(phen_path) 449 except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError): 450 raise Exception(f"Phen directory {phen_path} is not a git repo") 451 452 # Load configuration File 453 if config_path.suffix == ".yaml": 454 try: 455 with config_path.open("r") as file: 456 phenotype = yaml.safe_load(file) 457 458 validator = Validator(CONFIG_SCHEMA) 459 if validator.validate(phenotype): 460 logger.debug("YAML structure is valid.") 461 else: 462 logger.error(f"YAML structure validation failed: {validator.errors}") 463 raise Exception(f"YAML structure validation failed: {validator.errors}") 464 except yaml.YAMLError as e: 465 logger.error(f"YAML syntax error: {e}") 466 raise e 467 else: 468 raise Exception( 469 f"Unsupported configuration filetype: {str(config_path.resolve())}" 470 ) 471 472 # initiatise 473 validation_errors = [] 474 phenotype = phenotype["phenotype"] 475 code_types = parse.CodeTypeParser().code_types 476 477 # check the version number is of the format vn.n.n 478 match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"]) 479 if not match: 480 validation_errors.append( 481 f"Invalid version format in configuration file: {phenotype['version']}" 482 ) 483 484 # create a list of all the concept set names defined in the concept set configuration 485 concept_set_names = [] 486 for item in phenotype["concept_sets"]: 487 if item["name"] in concept_set_names: 488 validation_errors.append( 489 f"Duplicate concept set defined in concept sets {item['name'] }" 490 ) 491 else: 492 concept_set_names.append(item["name"]) 493 494 # check codes definition 495 for item in phenotype["concept_sets"]: 496 # check concepte code file exists 497 concept_code_file_path = concepts_path / item["file"]["path"] 498 if not concept_code_file_path.exists(): 499 validation_errors.append( 500 f"Coding file {str(concept_code_file_path.resolve())} does not exist" 501 ) 502 503 # check concepte code file is not empty 504 if concept_code_file_path.stat().st_size == 0: 505 validation_errors.append( 506 f"Coding file {str(concept_code_file_path.resolve())} is an empty file" 507 ) 508 509 # check code file type is supported 510 if concept_code_file_path.suffix not in CODE_FILE_TYPES: 511 raise ValueError( 512 f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types" 513 ) 514 515 # check columns specified are a supported medical coding type 516 for column in item["file"]["columns"]: 517 if column not in code_types: 518 validation_errors.append( 519 f"Column type {column} for file {concept_code_file_path} is not supported" 520 ) 521 522 # check the actions are supported 523 if "actions" in item["file"]: 524 for action in item["file"]["actions"]: 525 if action not in COL_ACTIONS: 526 validation_errors.append(f"Action {action} is not supported") 527 528 if len(validation_errors) > 0: 529 logger.error(validation_errors) 530 raise PhenValidationException( 531 f"Configuration file {str(config_path.resolve())} failed validation", 532 validation_errors, 533 ) 534 535 logger.info(f"Phenotype validated successfully") 536 537 538def read_table_file(path: Path, excel_sheet: str = ""): 539 """ 540 Load Code List File 541 """ 542 543 path = path.resolve() 544 if path.suffix == ".csv": 545 df = pd.read_csv(path, dtype=str) 546 elif path.suffix == ".xlsx" or path.suffix == ".xls": 547 if excel_sheet != "": 548 df = pd.read_excel(path, sheet_name=excel_sheet, dtype=str) 549 else: 550 df = pd.read_excel(path, dtype=str) 551 elif path.suffix == ".dta": 552 df = pd.read_stata(path) 553 else: 554 raise ValueError( 555 f"Unsupported filetype {path.suffix}, only support{CODE_FILE_TYPES} code file types" 556 ) 557 558 return df 559 560 561def process_actions(df: pd.DataFrame, concept_set: dict) -> pd.DataFrame: 562 # Perform Structural Changes to file before preprocessing 563 logger.debug("Processing file structural actions") 564 if ( 565 "actions" in concept_set["file"] 566 and "split_col" in concept_set["file"]["actions"] 567 and "codes_col" in concept_set["file"]["actions"] 568 ): 569 split_col = concept_set["file"]["actions"]["split_col"] 570 codes_col = concept_set["file"]["actions"]["codes_col"] 571 logger.debug( 572 "Action: Splitting", 573 split_col, 574 "column into:", 575 df[split_col].unique(), 576 ) 577 codes = df[codes_col] 578 oh = pd.get_dummies(df[split_col], dtype=bool) # one hot encode 579 oh = oh.where((oh != True), codes, axis=0) # fill in 1s with codes 580 oh[oh == False] = np.nan # replace 0s with None 581 df = pd.concat([df, oh], axis=1) # merge in new columns 582 583 return df 584 585 586# Perform QA Checks on columns individually and append to df 587def preprocess_source_concepts( 588 df: pd.DataFrame, concept_set: dict, code_file_path: Path 589) -> Tuple[pd.DataFrame, list]: 590 """Parses each column individually - Order and length will not be preserved!""" 591 out = pd.DataFrame([]) # create output df to append to 592 code_errors = [] # list of errors from processing 593 594 # remove unnamed columns due to extra commas, missing headers, or incorrect parsing 595 df = df.drop(columns=[col for col in df.columns if "Unnamed" in col]) 596 597 # Preprocess codes 598 code_types = parse.CodeTypeParser().code_types 599 for code_type in concept_set["file"]["columns"]: 600 parser = code_types[code_type] 601 logger.info(f"Processing {code_type} codes for {code_file_path}") 602 603 # get codes by column name 604 source_col_name = concept_set["file"]["columns"][code_type] 605 codes = df[source_col_name].dropna() 606 codes = codes.astype(str) # convert to string 607 codes = codes.str.strip() # remove excess spaces 608 609 # process codes, validating them using parser and returning the errors 610 codes, errors = parser.process(codes, code_file_path) 611 if len(errors) > 0: 612 code_errors.extend(errors) 613 logger.warning(f"Codes validation failed with {len(errors)} errors") 614 615 # add processed codes to df 616 new_col_name = f"{source_col_name}_SOURCE" 617 df = df.rename(columns={source_col_name: new_col_name}) 618 process_codes = pd.DataFrame({code_type: codes}).join(df) 619 out = pd.concat( 620 [out, process_codes], 621 ignore_index=True, 622 ) 623 624 logger.debug(out.head()) 625 626 return out, code_errors 627 628 629def get_code_type_from_col_name(col_name: str): 630 return col_name.split("_")[0] 631 632 633# Translate Df with multiple codes into single code type Series 634def translate_codes( 635 source_df: pd.DataFrame, target_code_type: str, concept_name: str 636) -> pd.DataFrame: 637 """Translates each source code type the source coding list into a target type and returns all conversions as a concept set""" 638 639 # codes = pd.DataFrame([], dtype=str) 640 codes = pd.DataFrame( 641 columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string" 642 ) 643 # Convert codes to target type 644 logger.info(f"Converting to target code type {target_code_type}") 645 646 for source_code_type in source_df.columns: 647 # if target code type is the same as thet source code type, no translation, just appending source as target 648 if source_code_type == target_code_type: 649 copy_df = pd.DataFrame( 650 { 651 "SOURCE_CONCEPT": source_df[source_code_type], 652 "SOURCE_CONCEPT_TYPE": source_code_type, 653 "CONCEPT": source_df[source_code_type], 654 } 655 ) 656 codes = pd.concat([codes, copy_df]) 657 logger.debug( 658 f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating" 659 ) 660 else: 661 # get the translation filename using source to target code types 662 filename = f"{source_code_type}_to_{target_code_type}.parquet" 663 map_path = trud.PROCESSED_PATH / filename 664 665 # do the mapping if it exists 666 if map_path.exists(): 667 # get mapping 668 df_map = pd.read_parquet(map_path) 669 670 # do mapping 671 translated_df = pd.merge( 672 source_df[source_code_type], df_map, how="left" 673 ) 674 675 # normalise the output 676 translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"]) 677 translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type 678 679 # add to list of codes 680 codes = pd.concat([codes, translated_df]) 681 682 else: 683 logger.warning( 684 f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" 685 ) 686 687 codes = codes.dropna() # delete NaNs 688 689 # added concept set type to output if any translations 690 if len(codes.index) > 0: 691 codes["CONCEPT_SET"] = concept_name 692 else: 693 logger.debug(f"No codes converted with target code type {target_code_type}") 694 695 return codes 696 697 698def sql_row_exist( 699 conn: sqlite3.Connection, table: str, column: str, value: str 700) -> bool: 701 # Execute and check if a result exists 702 cur = conn.cursor() 703 query = f"SELECT 1 FROM {table} WHERE {column} = ? LIMIT 1;" 704 cur.execute(query, (value,)) 705 exists = cur.fetchone() is not None 706 707 return exists 708 709 710def write_code_errors(code_errors: list, code_errors_path: Path): 711 err_df = pd.DataFrame( 712 [ 713 { 714 "CONCEPT": ", ".join(err.codes[~err.mask].tolist()), 715 "VOCABULARY": err.code_type, 716 "SOURCE": err.codes_file, 717 "CAUSE": err.message, 718 } 719 for err in code_errors 720 ] 721 ) 722 723 err_df = err_df.drop_duplicates() # Remove Duplicates from Error file 724 err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"]) 725 err_df.to_csv(code_errors_path, index=False, mode="w") 726 727 728def write_vocab_version(phen_path: Path): 729 # write the vocab version files 730 731 if not trud.VERSION_PATH.exists(): 732 raise FileNotFoundError( 733 f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed" 734 ) 735 736 if not omop.VERSION_PATH.exists(): 737 raise FileNotFoundError( 738 f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed" 739 ) 740 741 with trud.VERSION_PATH.open("r") as file: 742 trud_version = yaml.safe_load(file) 743 744 with omop.VERSION_PATH.open("r") as file: 745 omop_version = yaml.safe_load(file) 746 747 # Create the combined YAML structure 748 version_data = { 749 "versions": { 750 "acmc": acmc.__version__, 751 "trud": trud_version, 752 "omop": omop_version, 753 } 754 } 755 756 with open(phen_path / VOCAB_VERSION_FILE, "w") as file: 757 yaml.dump( 758 version_data, 759 file, 760 Dumper=util.QuotedDumper, 761 default_flow_style=False, 762 sort_keys=False, 763 default_style='"', 764 ) 765 766 767def map(phen_dir: str, target_code_type: str): 768 logger.info(f"Processing phenotype: {phen_dir}") 769 770 # Validate configuration 771 validate(phen_dir) 772 773 # initialise paths 774 phen_path = Path(phen_dir) 775 config_path = phen_path / CONFIG_FILE 776 777 # load configuration 778 with config_path.open("r") as file: 779 config = yaml.safe_load(file) 780 phenotype = config["phenotype"] 781 782 if len(phenotype["map"]) == 0: 783 raise ValueError(f"No map codes defined in the phenotype configuration") 784 785 if target_code_type is not None and target_code_type not in phenotype["map"]: 786 raise ValueError( 787 f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}" 788 ) 789 790 if target_code_type is not None: 791 map_target_code_type(phen_path, phenotype, target_code_type) 792 else: 793 for t in phenotype["map"]: 794 map_target_code_type(phen_path, phenotype, t) 795 796 logger.info(f"Phenotype processed successfully") 797 798 799def map_target_code_type(phen_path: Path, phenotype: dict, target_code_type: str): 800 logger.debug(f"Target coding format: {target_code_type}") 801 concepts_path = phen_path / CONCEPTS_DIR 802 # Create output dataframe 803 out = pd.DataFrame([]) 804 code_errors = [] 805 806 # Process each folder in codes section 807 for concept_set in phenotype["concept_sets"]: 808 logger.debug(f"--- {concept_set['file']} ---") 809 810 # Load code file 811 codes_file_path = Path(concepts_path / concept_set["file"]["path"]) 812 df = read_table_file(codes_file_path) 813 814 # process structural actions 815 df = process_actions(df, concept_set) 816 817 # preprocessing and validate of source concepts 818 logger.debug("Processing and validating source concept codes") 819 df, errors = preprocess_source_concepts( 820 df, 821 concept_set, 822 codes_file_path, 823 ) 824 825 # create df with just the source code columns 826 source_column_names = list(concept_set["file"]["columns"].keys()) 827 source_df = df[source_column_names] 828 829 logger.debug(source_df.columns) 830 logger.debug(source_df.head()) 831 832 logger.debug(f"Length of errors from preprocess_source_concepts {len(errors)}") 833 if len(errors) > 0: 834 code_errors.extend(errors) 835 logger.debug(f" Length of code_errors {len(code_errors)}") 836 837 # Map source concepts codes to target codes 838 # if processing a source coding list with categorical data 839 if ( 840 "actions" in concept_set["file"] 841 and "divide_col" in concept_set["file"]["actions"] 842 and len(df) > 0 843 ): 844 divide_col = concept_set["file"]["actions"]["divide_col"] 845 logger.debug(f"Action: Dividing Table by {divide_col}") 846 logger.debug(f"column into: {df[divide_col].unique()}") 847 df_grp = df.groupby(divide_col) 848 for cat, grp in df_grp: 849 if cat == concept_set["file"]["category"]: 850 grp = grp.drop(columns=[divide_col]) # delete categorical column 851 source_df = grp[source_column_names] 852 trans_out = translate_codes( 853 source_df, 854 target_code_type=target_code_type, 855 concept_name=concept_set["name"], 856 ) 857 out = pd.concat([out, trans_out]) 858 else: 859 source_df = df[source_column_names] 860 trans_out = translate_codes( 861 source_df, 862 target_code_type=target_code_type, 863 concept_name=concept_set["name"], 864 ) 865 out = pd.concat([out, trans_out]) 866 867 if len(code_errors) > 0: 868 logger.error(f"The map processing has {len(code_errors)} errors") 869 error_path = phen_path / MAP_DIR / "errors" 870 error_path.mkdir(parents=True, exist_ok=True) 871 error_filename = f"{target_code_type}-code-errors.csv" 872 write_code_errors(code_errors, error_path / error_filename) 873 874 # Check there is output from processing 875 if len(out.index) == 0: 876 logger.error(f"No output after map processing") 877 raise Exception( 878 f"No output after map processing, check config {str(phen_path.resolve())}" 879 ) 880 881 # final processing 882 out = out.reset_index(drop=True) 883 out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) 884 out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"]) 885 886 out_count = len(out.index) 887 # added metadata 888 # Loop over each source_concept_type and perform the left join on all columns apart from source code columns 889 result_list = [] 890 source_column_names = list(concept_set["file"]["columns"].keys()) 891 for source_concept_type in source_column_names: 892 # Filter output based on the current source_concept_type 893 out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type] 894 filtered_count = len(out_filtered_df.index) 895 896 # Remove the source type columns except the current type will leave the metadata and the join 897 remove_types = [ 898 type for type in source_column_names if type != source_concept_type 899 ] 900 metadata_df = df.drop(columns=remove_types) 901 metadata_df = metadata_df.rename( 902 columns={source_concept_type: "SOURCE_CONCEPT"} 903 ) 904 metadata_df_count = len(metadata_df.index) 905 906 # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata 907 result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT") 908 result_count = len(result.index) 909 910 logger.debug( 911 f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}" 912 ) 913 914 # Append the result to the result_list 915 result_list.append(result) 916 917 # Concatenate all the results into a single DataFrame 918 final_out = pd.concat(result_list, ignore_index=True) 919 final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) 920 logger.debug( 921 f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}" 922 ) 923 924 # Save output to map directory 925 output_filename = target_code_type + ".csv" 926 map_path = phen_path / MAP_DIR / output_filename 927 final_out.to_csv(map_path, index=False) 928 logger.info(f"Saved mapped concepts to {str(map_path.resolve())}") 929 930 # save concept sets as separate files 931 concept_set_path = phen_path / CSV_PATH / target_code_type 932 933 # empty the concept-set directory except for hiddle files, e.g. .git 934 if concept_set_path.exists(): 935 for item in concept_set_path.iterdir(): 936 if not item.name.startswith("."): 937 item.unlink() 938 else: 939 concept_set_path.mkdir(parents=True, exist_ok=True) 940 941 # write each concept as a separate file 942 for name, concept in final_out.groupby("CONCEPT_SET"): 943 concept = concept.sort_values(by="CONCEPT") # sort rows 944 concept = concept.dropna(how="all", axis=1) # remove empty cols 945 concept = concept.reindex( 946 sorted(concept.columns), axis=1 947 ) # sort cols alphabetically 948 filename = f"{name}.csv" 949 concept_path = concept_set_path / filename 950 concept.to_csv(concept_path, index=False) 951 952 write_vocab_version(phen_path) 953 954 logger.info(f"Phenotype processed target code type {target_code_type}") 955 956 957def generate_version_tag( 958 repo: git.Repo, increment: str = DEFAULT_VERSION_INC, use_v_prefix: bool = False 959) -> str: 960 # Get all valid semantic version tags 961 versions = [] 962 for tag in repo.tags: 963 tag_name = ( 964 tag.name.lstrip("v") if use_v_prefix else tag.name 965 ) # Remove 'v' if needed 966 if semver.Version.is_valid(tag_name): 967 versions.append(semver.Version.parse(tag_name)) 968 969 # Determine the next version 970 if not versions: 971 new_version = semver.Version(0, 0, 1) 972 else: 973 latest_version = max(versions) 974 if increment == "major": 975 new_version = latest_version.bump_major() 976 elif increment == "minor": 977 new_version = latest_version.bump_minor() 978 else: 979 new_version = latest_version.bump_patch() 980 981 # Create the new tag 982 new_version_str = f"v{new_version}" if use_v_prefix else str(new_version) 983 984 return new_version_str 985 986 987def publish( 988 phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC 989): 990 """Publishes updates to the phenotype by commiting all changes to the repo directory""" 991 992 # Validate config 993 validate(phen_dir) 994 phen_path = Path(phen_dir) 995 996 # load git repo and set the branch 997 repo = git.Repo(phen_path) 998 if DEFAULT_GIT_BRANCH in repo.branches: 999 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 1000 main_branch.checkout() 1001 else: 1002 raise AttributeError( 1003 f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}" 1004 ) 1005 1006 # check if any changes to publish 1007 if not repo.is_dirty() and not repo.untracked_files: 1008 if remote_url is not None and "origin" not in repo.remotes: 1009 logger.info(f"First publish to remote url {remote_url}") 1010 else: 1011 logger.info("Nothing to publish, no changes to the repo") 1012 return 1013 1014 # get next version 1015 new_version_str = generate_version_tag(repo, increment) 1016 logger.info(f"New version: {new_version_str}") 1017 1018 # Write version in configuration file 1019 config_path = phen_path / CONFIG_FILE 1020 with config_path.open("r") as file: 1021 config = yaml.safe_load(file) 1022 1023 config["phenotype"]["version"] = new_version_str 1024 with open(config_path, "w") as file: 1025 yaml.dump( 1026 config, 1027 file, 1028 Dumper=util.QuotedDumper, 1029 default_flow_style=False, 1030 sort_keys=False, 1031 default_style='"', 1032 ) 1033 1034 # Add and commit changes to repo including version updates 1035 commit_message = f"Committing updates to phenotype {phen_path}" 1036 repo.git.add("--all") 1037 repo.index.commit(commit_message) 1038 1039 # Add tag to the repo 1040 repo.create_tag(new_version_str) 1041 1042 # push to origin if a remote repo 1043 if remote_url is not None and "origin" not in repo.remotes: 1044 git_url = construct_git_url(remote_url) 1045 repo.create_remote("origin", git_url) 1046 1047 try: 1048 if "origin" in repo.remotes: 1049 logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}") 1050 origin = repo.remotes.origin 1051 logger.info(f"Pushing main branch to remote repo") 1052 repo.git.push("--set-upstream", "origin", "main") 1053 logger.info(f"Pushing version tags to remote git repo") 1054 origin.push(tags=True) 1055 logger.debug("Changes pushed to 'origin'") 1056 else: 1057 logger.debug("Remote 'origin' is not set") 1058 except Exception as e: 1059 tag_ref = repo.tags[new_version_str] 1060 repo.delete_tag(tag_ref) 1061 repo.git.reset("--soft", "HEAD~1") 1062 raise e 1063 1064 logger.info(f"Phenotype published successfully") 1065 1066 1067def export(phen_dir: str, version: str): 1068 """Exports a phen repo at a specific tagged version into a target directory""" 1069 logger.info(f"Exporting phenotype {phen_dir} at version {version}") 1070 1071 # validate configuration 1072 validate(phen_dir) 1073 phen_path = Path(phen_dir) 1074 1075 # load configuration 1076 config_path = phen_path / CONFIG_FILE 1077 with config_path.open("r") as file: 1078 config = yaml.safe_load(file) 1079 1080 map_path = phen_path / MAP_DIR 1081 if not map_path.exists(): 1082 logger.warning(f"Map path does not exist '{map_path}'") 1083 1084 export_path = phen_path / OMOP_PATH 1085 # check export directory exists and if not create it 1086 if not export_path.exists(): 1087 export_path.mkdir(parents=True) 1088 logger.debug(f"OMOP export directory '{export_path}' created.") 1089 1090 # omop export db 1091 export_db_path = omop.export( 1092 map_path, 1093 export_path, 1094 config["phenotype"]["version"], 1095 config["phenotype"]["omop"], 1096 ) 1097 1098 # write to tables 1099 # export as csv 1100 logger.info(f"Phenotype exported successfully") 1101 1102 1103def copy(phen_dir: str, target_dir: str, version: str): 1104 """Copys a phen repo at a specific tagged version into a target directory""" 1105 1106 # Validate 1107 validate(phen_dir) 1108 phen_path = Path(phen_dir) 1109 1110 # Check target directory exists 1111 target_path = Path(target_dir) 1112 if not target_path.exists(): 1113 raise FileNotFoundError(f"The target directory {target_path} does not exist") 1114 1115 # Set copy directory 1116 copy_path = target_path / version 1117 logger.info(f"Copying repo {phen_path} to {copy_path}") 1118 1119 if ( 1120 copy_path.exists() and copy_path.is_dir() 1121 ): # Check if it exists and is a directory 1122 copy = check_delete_dir( 1123 copy_path, 1124 f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ", 1125 ) 1126 else: 1127 copy = True 1128 1129 if not copy: 1130 logger.info(f"Not copying the version {version}") 1131 return 1132 1133 logger.debug(f"Cloning repo from {phen_path} into {copy_path}...") 1134 repo = git.Repo.clone_from(phen_path, copy_path) 1135 1136 # Check out the latest commit or specified version 1137 if version: 1138 # Checkout a specific version (e.g., branch, tag, or commit hash) 1139 logger.info(f"Checking out version {version}...") 1140 repo.git.checkout(version) 1141 else: 1142 # Checkout the latest commit (HEAD) 1143 logger.info(f"Checking out the latest commit...") 1144 repo.git.checkout("HEAD") 1145 1146 logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}") 1147 1148 logger.info(f"Phenotype copied successfully") 1149 1150 1151# Convert concept_sets list into dictionaries 1152def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]: 1153 """Extracts concepts as {name: file_path} dictionary and a name set.""" 1154 concepts_dict = { 1155 item["name"]: item["file"]["path"] 1156 for item in config_data["phenotype"]["concept_sets"] 1157 } 1158 name_set = set(concepts_dict.keys()) 1159 return concepts_dict, name_set 1160 1161 1162def extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]: 1163 """ 1164 Extracts clean keys from a DeepDiff dictionary. 1165 1166 :param diff: DeepDiff result dictionary 1167 :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed") 1168 :return: A set of clean key names 1169 """ 1170 return {key.split("root['")[1].split("']")[0] for key in diff.get(key_type, [])} 1171 1172 1173def diff_config(old_config: dict, new_config: dict) -> str: 1174 report = f"\n# Changes to phenotype configuration\n" 1175 report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n" 1176 1177 old_concepts, old_names = extract_concepts(old_config) 1178 new_concepts, new_names = extract_concepts(new_config) 1179 1180 # Check added and removed names 1181 added_names = new_names - old_names # Names that appear in new but not in old 1182 removed_names = old_names - new_names # Names that were in old but not in new 1183 1184 # find file path changes for unchanged names 1185 unchanged_names = old_names & new_names # Names that exist in both 1186 file_diff = DeepDiff( 1187 {name: old_concepts[name] for name in unchanged_names}, 1188 {name: new_concepts[name] for name in unchanged_names}, 1189 ) 1190 1191 # Find renamed concepts (same file, different name) 1192 renamed_concepts = [] 1193 for removed in removed_names: 1194 old_path = old_concepts[removed] 1195 for added in added_names: 1196 new_path = new_concepts[added] 1197 if old_path == new_path: 1198 renamed_concepts.append((removed, added)) 1199 1200 # Remove renamed concepts from added and removed sets 1201 for old_name, new_name in renamed_concepts: 1202 added_names.discard(new_name) 1203 removed_names.discard(old_name) 1204 1205 # generate config report 1206 if added_names: 1207 report += "## Added Concepts\n" 1208 for name in added_names: 1209 report += f"- `{name}` (File: `{new_concepts[name]}`)\n" 1210 report += "\n" 1211 1212 if removed_names: 1213 report += "## Removed Concepts\n" 1214 for name in removed_names: 1215 report += f"- `{name}` (File: `{old_concepts[name]}`)\n" 1216 report += "\n" 1217 1218 if renamed_concepts: 1219 report += "## Renamed Concepts\n" 1220 for old_name, new_name in renamed_concepts: 1221 report += ( 1222 f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n" 1223 ) 1224 report += "\n" 1225 1226 if "values_changed" in file_diff: 1227 report += "## Updated File Paths\n" 1228 for name, change in file_diff["values_changed"].items(): 1229 old_file = change["old_value"] 1230 new_file = change["new_value"] 1231 clean_name = name.split("root['")[1].split("']")[0] 1232 report += ( 1233 f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n" 1234 ) 1235 report += "\n" 1236 1237 if not ( 1238 added_names 1239 or removed_names 1240 or renamed_concepts 1241 or file_diff.get("values_changed") 1242 ): 1243 report += "No changes in concept sets.\n" 1244 1245 return report 1246 1247 1248def diff_map_files(old_map_path: Path, new_map_path: Path) -> str: 1249 old_output_files = [ 1250 file.name 1251 for file in old_map_path.iterdir() 1252 if file.is_file() and not file.name.startswith(".") 1253 ] 1254 new_output_files = [ 1255 file.name 1256 for file in new_map_path.iterdir() 1257 if file.is_file() and not file.name.startswith(".") 1258 ] 1259 1260 # Convert the lists to sets for easy comparison 1261 old_output_set = set(old_output_files) 1262 new_output_set = set(new_output_files) 1263 1264 # Outputs that are in old_output_set but not in new_output_set (removed files) 1265 removed_outputs = old_output_set - new_output_set 1266 # Outputs that are in new_output_set but not in old_output_set (added files) 1267 added_outputs = new_output_set - old_output_set 1268 # Outputs that are the intersection of old_output_set and new_output_set 1269 common_outputs = old_output_set & new_output_set 1270 1271 report = f"\n# Changes to available translations\n" 1272 report += f"This compares the coding translations files available.\n\n" 1273 report += f"- Removed outputs: {sorted(list(removed_outputs))}\n" 1274 report += f"- Added outputs: {sorted(list(added_outputs))}\n" 1275 report += f"- Common outputs: {sorted(list(common_outputs))}\n\n" 1276 1277 # Step N: Compare common outputs between versions 1278 report += f"# Changes to concepts in translation files\n\n" 1279 report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n" 1280 for file in common_outputs: 1281 old_output = old_map_path / file 1282 new_output = new_map_path / file 1283 1284 logger.debug(f"Old ouptput: {str(old_output.resolve())}") 1285 logger.debug(f"New ouptput: {str(new_output.resolve())}") 1286 1287 df1 = pd.read_csv(old_output) 1288 df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1289 df2 = pd.read_csv(new_output) 1290 df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1291 1292 # Check for added and removed concepts 1293 report += f"- File {file}\n" 1294 sorted_list = sorted(list(set(df1.index) - set(df2.index))) 1295 report += f"- Removed concepts {sorted_list}\n" 1296 sorted_list = sorted(list(set(df2.index) - set(df1.index))) 1297 report += f"- Added concepts {sorted_list}\n" 1298 1299 # Check for changed concepts 1300 diff = df2 - df1 # diff in counts 1301 diff = diff[ 1302 (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna() 1303 ] # get non-zero counts 1304 s = "\n" 1305 if len(diff.index) > 0: 1306 for concept, row in diff.iterrows(): 1307 s += "\t - {} {}\n".format(concept, row["CONCEPT"]) 1308 report += f"- Changed concepts {s}\n\n" 1309 else: 1310 report += f"- Changed concepts []\n\n" 1311 1312 return report 1313 1314 1315def diff_phen( 1316 new_phen_path: Path, 1317 new_version: str, 1318 old_phen_path: Path, 1319 old_version: str, 1320 report_path: Path, 1321): 1322 """Compare the differences between two versions of a phenotype""" 1323 1324 # validate phenotypes 1325 logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}") 1326 validate(str(old_phen_path.resolve())) 1327 logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}") 1328 validate(str(new_phen_path.resolve())) 1329 1330 # get old and new config 1331 old_config_path = old_phen_path / CONFIG_FILE 1332 with old_config_path.open("r") as file: 1333 old_config = yaml.safe_load(file) 1334 new_config_path = new_phen_path / CONFIG_FILE 1335 with new_config_path.open("r") as file: 1336 new_config = yaml.safe_load(file) 1337 1338 # write report heading 1339 report = f"# Phenotype Comparison Report\n" 1340 report += f"## Original phenotype\n" 1341 report += f" - {old_config['phenotype']['omop']['vocabulary_id']}\n" 1342 report += f" - {old_version}\n" 1343 report += f" - {str(old_phen_path.resolve())}\n" 1344 report += f"## Changed phenotype:\n" 1345 report += f" - {new_config['phenotype']['omop']['vocabulary_id']}\n" 1346 report += f" - {new_version}\n" 1347 report += f" - {str(new_phen_path.resolve())}\n" 1348 1349 # Step 1: check differences configuration files 1350 # Convert list of dicts into a dict: {name: file} 1351 report += diff_config(old_config, new_config) 1352 1353 # Step 2: check differences between map files 1354 # List files from output directories 1355 old_map_path = old_phen_path / MAP_DIR 1356 new_map_path = new_phen_path / MAP_DIR 1357 report += diff_map_files(old_map_path, new_map_path) 1358 1359 # initialise report file 1360 logger.debug(f"Writing to report file {str(report_path.resolve())}") 1361 report_file = open(report_path, "w") 1362 report_file.write(report) 1363 report_file.close() 1364 1365 logger.info(f"Phenotypes diff'd successfully") 1366 1367 1368def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str): 1369 # make tmp directory .acmc 1370 timestamp = time.strftime("%Y%m%d_%H%M%S") 1371 temp_dir = Path(f".acmc/diff_{timestamp}") 1372 1373 changed_phen_path = Path(phen_dir) 1374 if not changed_phen_path.exists(): 1375 raise ValueError( 1376 f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}" 1377 ) 1378 1379 old_phen_path = Path(old_phen_dir) 1380 if not old_phen_path.exists(): 1381 raise ValueError( 1382 f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}" 1383 ) 1384 1385 try: 1386 # Create the directory 1387 temp_dir.mkdir(parents=True, exist_ok=True) 1388 logger.debug(f"Temporary directory created: {temp_dir}") 1389 1390 # Create temporary directories 1391 changed_path = temp_dir / "changed" 1392 changed_path.mkdir(parents=True, exist_ok=True) 1393 old_path = temp_dir / "old" 1394 old_path.mkdir(parents=True, exist_ok=True) 1395 1396 # checkout changed 1397 if version == "latest": 1398 logger.debug( 1399 f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..." 1400 ) 1401 shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True) 1402 else: 1403 logger.debug( 1404 f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..." 1405 ) 1406 changed_repo = git.Repo.clone_from(changed_phen_path, changed_path) 1407 changed_repo.git.checkout(version) 1408 1409 # checkout old 1410 if old_version == "latest": 1411 logger.debug( 1412 f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1413 ) 1414 shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True) 1415 else: 1416 logger.debug( 1417 f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1418 ) 1419 old_repo = git.Repo.clone_from(old_phen_dir, old_path) 1420 old_repo.git.checkout(old_version) 1421 1422 report_filename = f"{version}_{old_version}_diff.md" 1423 report_path = changed_phen_path / report_filename 1424 # diff old with new 1425 diff_phen(changed_path, version, old_path, old_version, report_path) 1426 1427 finally: 1428 # clean up tmp directory 1429 if temp_dir.exists(): 1430 shutil.rmtree(temp_dir) 1431 print(f"Temporary directory removed: {temp_dir}")
129class PhenValidationException(Exception): 130 """Custom exception class raised when validation errors in phenotype configuration file""" 131 132 def __init__(self, message, validation_errors=None): 133 super().__init__(message) 134 self.validation_errors = validation_errors
Custom exception class raised when validation errors in phenotype configuration file
137def construct_git_url(remote_url: str): 138 """Constructs a git url for github or gitlab including a PAT token environment variable""" 139 # check the url 140 parsed_url = urlparse(remote_url) 141 142 # if github in the URL otherwise assume it's gitlab, if we want to use others such as codeberg we'd 143 # need to update this function if the URL scheme is different. 144 if "github.com" in parsed_url.netloc: 145 # get GitHub PAT from environment variable 146 auth = os.getenv("ACMC_GITHUB_PAT") 147 if not auth: 148 raise ValueError( 149 "GitHub PAT not found. Set the ACMC_GITHUB_PAT environment variable." 150 ) 151 else: 152 # get GitLab PAT from environment variable 153 auth = os.getenv("ACMC_GITLAB_PAT") 154 if not auth: 155 raise ValueError( 156 "GitLab PAT not found. Set the ACMC_GITLAB_PAT environment variable." 157 ) 158 auth = f"oauth2:{auth}" 159 160 # Construct the new URL with credentials 161 new_netloc = f"{auth}@{parsed_url.netloc}" 162 return urlunparse( 163 ( 164 parsed_url.scheme, 165 new_netloc, 166 parsed_url.path, 167 parsed_url.params, 168 parsed_url.query, 169 parsed_url.fragment, 170 ) 171 )
Constructs a git url for github or gitlab including a PAT token environment variable
174def create_empty_git_dir(path: Path): 175 """Creates a directory with a .gitkeep file so that it's tracked in git""" 176 path.mkdir(exist_ok=True) 177 keep_path = path / ".gitkeep" 178 keep_path.touch(exist_ok=True)
Creates a directory with a .gitkeep file so that it's tracked in git
181def check_delete_dir(path: Path, msg: str) -> bool: 182 """Checks on the command line if a user wants to delete a directory 183 184 Args: 185 path (Path): path of the directory to be deleted 186 msg (str): message to be displayed to the user 187 188 Returns: 189 Boolean: True if deleted 190 """ 191 deleted = False 192 193 user_input = input(f"{msg}").strip().lower() 194 if user_input in ["yes", "y"]: 195 shutil.rmtree(path) 196 deleted = True 197 else: 198 logger.info("Directory was not deleted.") 199 200 return deleted
Checks on the command line if a user wants to delete a directory
Args: path (Path): path of the directory to be deleted msg (str): message to be displayed to the user
Returns: Boolean: True if deleted
203def fork(phen_dir: str, upstream_url: str, upstream_version: str, new_origin_url: str): 204 """Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin" 205 206 Args: 207 phen_dir (str): local directory path where the upstream repo is to be cloned 208 upstream_url (str): url to the upstream repo 209 upstream_version (str): version in the upstream repo to clone 210 new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None. 211 212 Raises: 213 ValueError: if the specified version is not in the upstream repo 214 ValueError: if the upstream repo is not a valid phenotype repo 215 ValueError: if there's any other problems with Git 216 """ 217 logger.info( 218 f"Forking upstream repo {upstream_url} {upstream_version} into directory: {phen_dir}" 219 ) 220 221 phen_path = Path(phen_dir) 222 # check if directory already exists and ask user if they want to recreate it 223 if ( 224 phen_path.exists() and phen_path.is_dir() 225 ): # Check if it exists and is a directory 226 configure = check_delete_dir( 227 phen_path, 228 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 229 ) 230 else: 231 configure = True 232 233 if not configure: 234 logger.info(f"Exiting, phenotype not initiatised") 235 return 236 237 try: 238 # Clone repo 239 git_url = construct_git_url(upstream_url) 240 repo = git.Repo.clone_from(git_url, phen_path) 241 242 # Fetch all branches and tags 243 repo.remotes.origin.fetch() 244 245 # Check if the version exists 246 available_refs = [ref.name.split("/")[-1] for ref in repo.references] 247 if upstream_version not in available_refs: 248 raise ValueError( 249 f"Version '{upstream_version}' not found in the repository: {upstream_url}." 250 ) 251 252 # Checkout the specified version 253 repo.git.checkout(upstream_version) 254 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 255 main_branch.checkout() 256 257 # Check if 'config.yaml' exists in the root directory 258 config_path = phen_path / "config.yaml" 259 if not os.path.isfile(config_path): 260 raise ValueError( 261 f"The forked repository is not a valid ACMC repo because 'config.yaml' is missing in the root directory." 262 ) 263 264 # Validate the phenotype is compatible with the acmc tool 265 validate(str(phen_path.resolve())) 266 267 # Delete each tag locally 268 tags = repo.tags 269 for tag in tags: 270 repo.delete_tag(tag) 271 logger.debug(f"Deleted tags from forked repo: {tag}") 272 273 # Add upstream remote 274 repo.create_remote("upstream", upstream_url) 275 remote = repo.remotes["origin"] 276 repo.delete_remote(remote) # Remove existing origin 277 278 # Optionally set a new origin remote 279 if new_origin_url: 280 git_url = construct_git_url(new_origin_url) 281 repo.create_remote("origin", git_url) 282 repo.git.push("--set-upstream", "origin", "main") 283 284 logger.info(f"Repository forked successfully at {phen_path}") 285 logger.info(f"Upstream set to {upstream_url}") 286 if new_origin_url: 287 logger.info(f"Origin set to {new_origin_url}") 288 289 except Exception as e: 290 if phen_path.exists(): 291 shutil.rmtree(phen_path) 292 raise ValueError(f"Error occurred during repository fork: {str(e)}")
Forks an upstream phenotype in a remote repo at a specific version to a local director, and optionally sets to a new remote origin"
Args: phen_dir (str): local directory path where the upstream repo is to be cloned upstream_url (str): url to the upstream repo upstream_version (str): version in the upstream repo to clone new_origin_url (str, optional): url of the remote repo to set as the new origin. Defaults to None.
Raises: ValueError: if the specified version is not in the upstream repo ValueError: if the upstream repo is not a valid phenotype repo ValueError: if there's any other problems with Git
295def init(phen_dir: str, remote_url: str): 296 """Initial phenotype directory as git repo with standard structure""" 297 logger.info(f"Initialising Phenotype in directory: {phen_dir}") 298 phen_path = Path(phen_dir) 299 300 # check if directory already exists and ask user if they want to recreate it 301 if ( 302 phen_path.exists() and phen_path.is_dir() 303 ): # Check if it exists and is a directory 304 configure = check_delete_dir( 305 phen_path, 306 f"The phen directory already exists. Do you want to reinitialise? (yes/no): ", 307 ) 308 else: 309 configure = True 310 311 if not configure: 312 logger.info(f"Exiting, phenotype not initiatised") 313 return 314 315 # Initialise repo from local or remote 316 repo: Repo 317 318 # if remote then clone the repo otherwise init a local repo 319 if remote_url != None: 320 # add PAT token to the URL 321 git_url = construct_git_url(remote_url) 322 323 # clone the repo 324 git_cmd = git.cmd.Git() 325 git_cmd.clone(git_url, phen_path) 326 327 # open repo 328 repo = Repo(phen_path) 329 # check if there are any commits (new repo has no commits) 330 if ( 331 len(repo.branches) == 0 or repo.head.is_detached 332 ): # Handle detached HEAD (e.g., after init) 333 logger.debug("The phen repository has no commits yet.") 334 commit_count = 0 335 else: 336 # Get the total number of commits in the default branch 337 commit_count = sum(1 for _ in repo.iter_commits()) 338 logger.debug(f"Repo has previous commits: {commit_count}") 339 else: 340 # local repo, create the directories and init 341 phen_path.mkdir(parents=True, exist_ok=True) 342 logger.debug(f"Phen directory '{phen_path}' has been created.") 343 repo = git.Repo.init(phen_path) 344 commit_count = 0 345 346 phen_path = phen_path.resolve() 347 # initialise empty repos 348 if commit_count == 0: 349 # create initial commit 350 initial_file_path = phen_path / "README.md" 351 with open(initial_file_path, "w") as file: 352 file.write( 353 "# Initial commit\nThis is the first commit in the phen repository.\n" 354 ) 355 repo.index.add([initial_file_path]) 356 repo.index.commit("Initial commit") 357 commit_count = 1 358 359 # Checkout the phens default branch, creating it if it does not exist 360 if DEFAULT_GIT_BRANCH in repo.branches: 361 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 362 main_branch.checkout() 363 else: 364 main_branch = repo.create_head(DEFAULT_GIT_BRANCH) 365 main_branch.checkout() 366 367 # if the phen path does not contain the config file then initialise the phen type 368 config_path = phen_path / CONFIG_FILE 369 if config_path.exists(): 370 logger.debug(f"Phenotype configuration files already exist") 371 return 372 373 logger.info("Creating phen directory structure and config files") 374 for d in DEFAULT_PHEN_DIR_LIST: 375 create_empty_git_dir(phen_path / d) 376 377 # create empty phen config file 378 config = { 379 "phenotype": { 380 "version": "0.0.0", 381 "omop": { 382 "vocabulary_id": "", 383 "vocabulary_name": "", 384 "vocabulary_reference": "", 385 }, 386 "translate": [], 387 "concept_sets": [], 388 } 389 } 390 391 with open(phen_path / CONFIG_FILE, "w") as file: 392 yaml.dump( 393 config, 394 file, 395 Dumper=util.QuotedDumper, 396 default_flow_style=False, 397 sort_keys=False, 398 default_style='"', 399 ) 400 401 # add git ignore 402 ignore_content = """# Ignore SQLite database files 403*.db 404*.sqlite3 405 406# Ignore SQLite journal and metadata files 407*.db-journal 408*.sqlite3-journal 409 410# python 411.ipynb_checkpoints 412 """ 413 ignore_path = phen_path / ".gitignore" 414 with open(ignore_path, "w") as file: 415 file.write(ignore_content) 416 417 # add to git repo and commit 418 for d in DEFAULT_PHEN_DIR_LIST: 419 repo.git.add(phen_path / d) 420 repo.git.add(all=True) 421 repo.index.commit("initialised the phen git repo.") 422 423 logger.info(f"Phenotype initialised successfully")
Initial phenotype directory as git repo with standard structure
426def validate(phen_dir: str): 427 """Validates the phenotype directory is a git repo with standard structure""" 428 logger.info(f"Validating phenotype: {phen_dir}") 429 phen_path = Path(phen_dir) 430 if not phen_path.is_dir(): 431 raise NotADirectoryError( 432 f"Error: '{str(phen_path.resolve())}' is not a directory" 433 ) 434 435 config_path = phen_path / CONFIG_FILE 436 if not config_path.is_file(): 437 raise FileNotFoundError( 438 f"Error: phen configuration file '{config_path}' does not exist." 439 ) 440 441 concepts_path = phen_path / CONCEPTS_DIR 442 if not concepts_path.is_dir(): 443 raise FileNotFoundError( 444 f"Error: source concepts directory {concepts_path} does not exist." 445 ) 446 447 # Calidate the directory is a git repo 448 try: 449 git.Repo(phen_path) 450 except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError): 451 raise Exception(f"Phen directory {phen_path} is not a git repo") 452 453 # Load configuration File 454 if config_path.suffix == ".yaml": 455 try: 456 with config_path.open("r") as file: 457 phenotype = yaml.safe_load(file) 458 459 validator = Validator(CONFIG_SCHEMA) 460 if validator.validate(phenotype): 461 logger.debug("YAML structure is valid.") 462 else: 463 logger.error(f"YAML structure validation failed: {validator.errors}") 464 raise Exception(f"YAML structure validation failed: {validator.errors}") 465 except yaml.YAMLError as e: 466 logger.error(f"YAML syntax error: {e}") 467 raise e 468 else: 469 raise Exception( 470 f"Unsupported configuration filetype: {str(config_path.resolve())}" 471 ) 472 473 # initiatise 474 validation_errors = [] 475 phenotype = phenotype["phenotype"] 476 code_types = parse.CodeTypeParser().code_types 477 478 # check the version number is of the format vn.n.n 479 match = re.match(r"(\d+\.\d+\.\d+)", phenotype["version"]) 480 if not match: 481 validation_errors.append( 482 f"Invalid version format in configuration file: {phenotype['version']}" 483 ) 484 485 # create a list of all the concept set names defined in the concept set configuration 486 concept_set_names = [] 487 for item in phenotype["concept_sets"]: 488 if item["name"] in concept_set_names: 489 validation_errors.append( 490 f"Duplicate concept set defined in concept sets {item['name'] }" 491 ) 492 else: 493 concept_set_names.append(item["name"]) 494 495 # check codes definition 496 for item in phenotype["concept_sets"]: 497 # check concepte code file exists 498 concept_code_file_path = concepts_path / item["file"]["path"] 499 if not concept_code_file_path.exists(): 500 validation_errors.append( 501 f"Coding file {str(concept_code_file_path.resolve())} does not exist" 502 ) 503 504 # check concepte code file is not empty 505 if concept_code_file_path.stat().st_size == 0: 506 validation_errors.append( 507 f"Coding file {str(concept_code_file_path.resolve())} is an empty file" 508 ) 509 510 # check code file type is supported 511 if concept_code_file_path.suffix not in CODE_FILE_TYPES: 512 raise ValueError( 513 f"Unsupported filetype {concept_code_file_path.suffix}, only support csv, xlsx, xls code file types" 514 ) 515 516 # check columns specified are a supported medical coding type 517 for column in item["file"]["columns"]: 518 if column not in code_types: 519 validation_errors.append( 520 f"Column type {column} for file {concept_code_file_path} is not supported" 521 ) 522 523 # check the actions are supported 524 if "actions" in item["file"]: 525 for action in item["file"]["actions"]: 526 if action not in COL_ACTIONS: 527 validation_errors.append(f"Action {action} is not supported") 528 529 if len(validation_errors) > 0: 530 logger.error(validation_errors) 531 raise PhenValidationException( 532 f"Configuration file {str(config_path.resolve())} failed validation", 533 validation_errors, 534 ) 535 536 logger.info(f"Phenotype validated successfully")
Validates the phenotype directory is a git repo with standard structure
539def read_table_file(path: Path, excel_sheet: str = ""): 540 """ 541 Load Code List File 542 """ 543 544 path = path.resolve() 545 if path.suffix == ".csv": 546 df = pd.read_csv(path, dtype=str) 547 elif path.suffix == ".xlsx" or path.suffix == ".xls": 548 if excel_sheet != "": 549 df = pd.read_excel(path, sheet_name=excel_sheet, dtype=str) 550 else: 551 df = pd.read_excel(path, dtype=str) 552 elif path.suffix == ".dta": 553 df = pd.read_stata(path) 554 else: 555 raise ValueError( 556 f"Unsupported filetype {path.suffix}, only support{CODE_FILE_TYPES} code file types" 557 ) 558 559 return df
Load Code List File
562def process_actions(df: pd.DataFrame, concept_set: dict) -> pd.DataFrame: 563 # Perform Structural Changes to file before preprocessing 564 logger.debug("Processing file structural actions") 565 if ( 566 "actions" in concept_set["file"] 567 and "split_col" in concept_set["file"]["actions"] 568 and "codes_col" in concept_set["file"]["actions"] 569 ): 570 split_col = concept_set["file"]["actions"]["split_col"] 571 codes_col = concept_set["file"]["actions"]["codes_col"] 572 logger.debug( 573 "Action: Splitting", 574 split_col, 575 "column into:", 576 df[split_col].unique(), 577 ) 578 codes = df[codes_col] 579 oh = pd.get_dummies(df[split_col], dtype=bool) # one hot encode 580 oh = oh.where((oh != True), codes, axis=0) # fill in 1s with codes 581 oh[oh == False] = np.nan # replace 0s with None 582 df = pd.concat([df, oh], axis=1) # merge in new columns 583 584 return df
588def preprocess_source_concepts( 589 df: pd.DataFrame, concept_set: dict, code_file_path: Path 590) -> Tuple[pd.DataFrame, list]: 591 """Parses each column individually - Order and length will not be preserved!""" 592 out = pd.DataFrame([]) # create output df to append to 593 code_errors = [] # list of errors from processing 594 595 # remove unnamed columns due to extra commas, missing headers, or incorrect parsing 596 df = df.drop(columns=[col for col in df.columns if "Unnamed" in col]) 597 598 # Preprocess codes 599 code_types = parse.CodeTypeParser().code_types 600 for code_type in concept_set["file"]["columns"]: 601 parser = code_types[code_type] 602 logger.info(f"Processing {code_type} codes for {code_file_path}") 603 604 # get codes by column name 605 source_col_name = concept_set["file"]["columns"][code_type] 606 codes = df[source_col_name].dropna() 607 codes = codes.astype(str) # convert to string 608 codes = codes.str.strip() # remove excess spaces 609 610 # process codes, validating them using parser and returning the errors 611 codes, errors = parser.process(codes, code_file_path) 612 if len(errors) > 0: 613 code_errors.extend(errors) 614 logger.warning(f"Codes validation failed with {len(errors)} errors") 615 616 # add processed codes to df 617 new_col_name = f"{source_col_name}_SOURCE" 618 df = df.rename(columns={source_col_name: new_col_name}) 619 process_codes = pd.DataFrame({code_type: codes}).join(df) 620 out = pd.concat( 621 [out, process_codes], 622 ignore_index=True, 623 ) 624 625 logger.debug(out.head()) 626 627 return out, code_errors
Parses each column individually - Order and length will not be preserved!
635def translate_codes( 636 source_df: pd.DataFrame, target_code_type: str, concept_name: str 637) -> pd.DataFrame: 638 """Translates each source code type the source coding list into a target type and returns all conversions as a concept set""" 639 640 # codes = pd.DataFrame([], dtype=str) 641 codes = pd.DataFrame( 642 columns=["SOURCE_CONCEPT", "SOURCE_CONCEPT_TYPE", "CONCEPT"], dtype="string" 643 ) 644 # Convert codes to target type 645 logger.info(f"Converting to target code type {target_code_type}") 646 647 for source_code_type in source_df.columns: 648 # if target code type is the same as thet source code type, no translation, just appending source as target 649 if source_code_type == target_code_type: 650 copy_df = pd.DataFrame( 651 { 652 "SOURCE_CONCEPT": source_df[source_code_type], 653 "SOURCE_CONCEPT_TYPE": source_code_type, 654 "CONCEPT": source_df[source_code_type], 655 } 656 ) 657 codes = pd.concat([codes, copy_df]) 658 logger.debug( 659 f"Target code type {target_code_type} is the same as source code type {len(source_df)}, copying codes rather than translating" 660 ) 661 else: 662 # get the translation filename using source to target code types 663 filename = f"{source_code_type}_to_{target_code_type}.parquet" 664 map_path = trud.PROCESSED_PATH / filename 665 666 # do the mapping if it exists 667 if map_path.exists(): 668 # get mapping 669 df_map = pd.read_parquet(map_path) 670 671 # do mapping 672 translated_df = pd.merge( 673 source_df[source_code_type], df_map, how="left" 674 ) 675 676 # normalise the output 677 translated_df.columns = pd.Index(["SOURCE_CONCEPT", "CONCEPT"]) 678 translated_df["SOURCE_CONCEPT_TYPE"] = source_code_type 679 680 # add to list of codes 681 codes = pd.concat([codes, translated_df]) 682 683 else: 684 logger.warning( 685 f"No mapping from {source_code_type} to {target_code_type}, file {str(map_path.resolve())} does not exist" 686 ) 687 688 codes = codes.dropna() # delete NaNs 689 690 # added concept set type to output if any translations 691 if len(codes.index) > 0: 692 codes["CONCEPT_SET"] = concept_name 693 else: 694 logger.debug(f"No codes converted with target code type {target_code_type}") 695 696 return codes
Translates each source code type the source coding list into a target type and returns all conversions as a concept set
699def sql_row_exist( 700 conn: sqlite3.Connection, table: str, column: str, value: str 701) -> bool: 702 # Execute and check if a result exists 703 cur = conn.cursor() 704 query = f"SELECT 1 FROM {table} WHERE {column} = ? LIMIT 1;" 705 cur.execute(query, (value,)) 706 exists = cur.fetchone() is not None 707 708 return exists
711def write_code_errors(code_errors: list, code_errors_path: Path): 712 err_df = pd.DataFrame( 713 [ 714 { 715 "CONCEPT": ", ".join(err.codes[~err.mask].tolist()), 716 "VOCABULARY": err.code_type, 717 "SOURCE": err.codes_file, 718 "CAUSE": err.message, 719 } 720 for err in code_errors 721 ] 722 ) 723 724 err_df = err_df.drop_duplicates() # Remove Duplicates from Error file 725 err_df = err_df.sort_values(by=["SOURCE", "VOCABULARY", "CONCEPT"]) 726 err_df.to_csv(code_errors_path, index=False, mode="w")
729def write_vocab_version(phen_path: Path): 730 # write the vocab version files 731 732 if not trud.VERSION_PATH.exists(): 733 raise FileNotFoundError( 734 f"TRUD version path {trud.VERSION_PATH} does not exist, please check TRUD is installed" 735 ) 736 737 if not omop.VERSION_PATH.exists(): 738 raise FileNotFoundError( 739 f"OMOP version path {omop.VERSION_PATH} does not exist, please check OMOP is installed" 740 ) 741 742 with trud.VERSION_PATH.open("r") as file: 743 trud_version = yaml.safe_load(file) 744 745 with omop.VERSION_PATH.open("r") as file: 746 omop_version = yaml.safe_load(file) 747 748 # Create the combined YAML structure 749 version_data = { 750 "versions": { 751 "acmc": acmc.__version__, 752 "trud": trud_version, 753 "omop": omop_version, 754 } 755 } 756 757 with open(phen_path / VOCAB_VERSION_FILE, "w") as file: 758 yaml.dump( 759 version_data, 760 file, 761 Dumper=util.QuotedDumper, 762 default_flow_style=False, 763 sort_keys=False, 764 default_style='"', 765 )
768def map(phen_dir: str, target_code_type: str): 769 logger.info(f"Processing phenotype: {phen_dir}") 770 771 # Validate configuration 772 validate(phen_dir) 773 774 # initialise paths 775 phen_path = Path(phen_dir) 776 config_path = phen_path / CONFIG_FILE 777 778 # load configuration 779 with config_path.open("r") as file: 780 config = yaml.safe_load(file) 781 phenotype = config["phenotype"] 782 783 if len(phenotype["map"]) == 0: 784 raise ValueError(f"No map codes defined in the phenotype configuration") 785 786 if target_code_type is not None and target_code_type not in phenotype["map"]: 787 raise ValueError( 788 f"Target code type {target_code_type} not in phenotype configuration map {phenotype['map']}" 789 ) 790 791 if target_code_type is not None: 792 map_target_code_type(phen_path, phenotype, target_code_type) 793 else: 794 for t in phenotype["map"]: 795 map_target_code_type(phen_path, phenotype, t) 796 797 logger.info(f"Phenotype processed successfully")
800def map_target_code_type(phen_path: Path, phenotype: dict, target_code_type: str): 801 logger.debug(f"Target coding format: {target_code_type}") 802 concepts_path = phen_path / CONCEPTS_DIR 803 # Create output dataframe 804 out = pd.DataFrame([]) 805 code_errors = [] 806 807 # Process each folder in codes section 808 for concept_set in phenotype["concept_sets"]: 809 logger.debug(f"--- {concept_set['file']} ---") 810 811 # Load code file 812 codes_file_path = Path(concepts_path / concept_set["file"]["path"]) 813 df = read_table_file(codes_file_path) 814 815 # process structural actions 816 df = process_actions(df, concept_set) 817 818 # preprocessing and validate of source concepts 819 logger.debug("Processing and validating source concept codes") 820 df, errors = preprocess_source_concepts( 821 df, 822 concept_set, 823 codes_file_path, 824 ) 825 826 # create df with just the source code columns 827 source_column_names = list(concept_set["file"]["columns"].keys()) 828 source_df = df[source_column_names] 829 830 logger.debug(source_df.columns) 831 logger.debug(source_df.head()) 832 833 logger.debug(f"Length of errors from preprocess_source_concepts {len(errors)}") 834 if len(errors) > 0: 835 code_errors.extend(errors) 836 logger.debug(f" Length of code_errors {len(code_errors)}") 837 838 # Map source concepts codes to target codes 839 # if processing a source coding list with categorical data 840 if ( 841 "actions" in concept_set["file"] 842 and "divide_col" in concept_set["file"]["actions"] 843 and len(df) > 0 844 ): 845 divide_col = concept_set["file"]["actions"]["divide_col"] 846 logger.debug(f"Action: Dividing Table by {divide_col}") 847 logger.debug(f"column into: {df[divide_col].unique()}") 848 df_grp = df.groupby(divide_col) 849 for cat, grp in df_grp: 850 if cat == concept_set["file"]["category"]: 851 grp = grp.drop(columns=[divide_col]) # delete categorical column 852 source_df = grp[source_column_names] 853 trans_out = translate_codes( 854 source_df, 855 target_code_type=target_code_type, 856 concept_name=concept_set["name"], 857 ) 858 out = pd.concat([out, trans_out]) 859 else: 860 source_df = df[source_column_names] 861 trans_out = translate_codes( 862 source_df, 863 target_code_type=target_code_type, 864 concept_name=concept_set["name"], 865 ) 866 out = pd.concat([out, trans_out]) 867 868 if len(code_errors) > 0: 869 logger.error(f"The map processing has {len(code_errors)} errors") 870 error_path = phen_path / MAP_DIR / "errors" 871 error_path.mkdir(parents=True, exist_ok=True) 872 error_filename = f"{target_code_type}-code-errors.csv" 873 write_code_errors(code_errors, error_path / error_filename) 874 875 # Check there is output from processing 876 if len(out.index) == 0: 877 logger.error(f"No output after map processing") 878 raise Exception( 879 f"No output after map processing, check config {str(phen_path.resolve())}" 880 ) 881 882 # final processing 883 out = out.reset_index(drop=True) 884 out = out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) 885 out = out.sort_values(by=["CONCEPT_SET", "CONCEPT"]) 886 887 out_count = len(out.index) 888 # added metadata 889 # Loop over each source_concept_type and perform the left join on all columns apart from source code columns 890 result_list = [] 891 source_column_names = list(concept_set["file"]["columns"].keys()) 892 for source_concept_type in source_column_names: 893 # Filter output based on the current source_concept_type 894 out_filtered_df = out[out["SOURCE_CONCEPT_TYPE"] == source_concept_type] 895 filtered_count = len(out_filtered_df.index) 896 897 # Remove the source type columns except the current type will leave the metadata and the join 898 remove_types = [ 899 type for type in source_column_names if type != source_concept_type 900 ] 901 metadata_df = df.drop(columns=remove_types) 902 metadata_df = metadata_df.rename( 903 columns={source_concept_type: "SOURCE_CONCEPT"} 904 ) 905 metadata_df_count = len(metadata_df.index) 906 907 # Perform the left join with df2 on SOURCE_CONCEPT to add the metadata 908 result = pd.merge(out_filtered_df, metadata_df, how="left", on="SOURCE_CONCEPT") 909 result_count = len(result.index) 910 911 logger.debug( 912 f"Adding metadata for {source_concept_type}: out_count {out_count}, filtered_count {filtered_count}, metadata_df_count {metadata_df_count}, result_count {result_count}" 913 ) 914 915 # Append the result to the result_list 916 result_list.append(result) 917 918 # Concatenate all the results into a single DataFrame 919 final_out = pd.concat(result_list, ignore_index=True) 920 final_out = final_out.drop_duplicates(subset=["CONCEPT_SET", "CONCEPT"]) 921 logger.debug( 922 f"Check metadata processing counts: before {len(out.index)} : after {len(final_out.index)}" 923 ) 924 925 # Save output to map directory 926 output_filename = target_code_type + ".csv" 927 map_path = phen_path / MAP_DIR / output_filename 928 final_out.to_csv(map_path, index=False) 929 logger.info(f"Saved mapped concepts to {str(map_path.resolve())}") 930 931 # save concept sets as separate files 932 concept_set_path = phen_path / CSV_PATH / target_code_type 933 934 # empty the concept-set directory except for hiddle files, e.g. .git 935 if concept_set_path.exists(): 936 for item in concept_set_path.iterdir(): 937 if not item.name.startswith("."): 938 item.unlink() 939 else: 940 concept_set_path.mkdir(parents=True, exist_ok=True) 941 942 # write each concept as a separate file 943 for name, concept in final_out.groupby("CONCEPT_SET"): 944 concept = concept.sort_values(by="CONCEPT") # sort rows 945 concept = concept.dropna(how="all", axis=1) # remove empty cols 946 concept = concept.reindex( 947 sorted(concept.columns), axis=1 948 ) # sort cols alphabetically 949 filename = f"{name}.csv" 950 concept_path = concept_set_path / filename 951 concept.to_csv(concept_path, index=False) 952 953 write_vocab_version(phen_path) 954 955 logger.info(f"Phenotype processed target code type {target_code_type}")
958def generate_version_tag( 959 repo: git.Repo, increment: str = DEFAULT_VERSION_INC, use_v_prefix: bool = False 960) -> str: 961 # Get all valid semantic version tags 962 versions = [] 963 for tag in repo.tags: 964 tag_name = ( 965 tag.name.lstrip("v") if use_v_prefix else tag.name 966 ) # Remove 'v' if needed 967 if semver.Version.is_valid(tag_name): 968 versions.append(semver.Version.parse(tag_name)) 969 970 # Determine the next version 971 if not versions: 972 new_version = semver.Version(0, 0, 1) 973 else: 974 latest_version = max(versions) 975 if increment == "major": 976 new_version = latest_version.bump_major() 977 elif increment == "minor": 978 new_version = latest_version.bump_minor() 979 else: 980 new_version = latest_version.bump_patch() 981 982 # Create the new tag 983 new_version_str = f"v{new_version}" if use_v_prefix else str(new_version) 984 985 return new_version_str
988def publish( 989 phen_dir: str, msg: str, remote_url: str, increment: str = DEFAULT_VERSION_INC 990): 991 """Publishes updates to the phenotype by commiting all changes to the repo directory""" 992 993 # Validate config 994 validate(phen_dir) 995 phen_path = Path(phen_dir) 996 997 # load git repo and set the branch 998 repo = git.Repo(phen_path) 999 if DEFAULT_GIT_BRANCH in repo.branches: 1000 main_branch = repo.heads[DEFAULT_GIT_BRANCH] 1001 main_branch.checkout() 1002 else: 1003 raise AttributeError( 1004 f"Phen repo does not contain the default branch {DEFAULT_GIT_BRANCH}" 1005 ) 1006 1007 # check if any changes to publish 1008 if not repo.is_dirty() and not repo.untracked_files: 1009 if remote_url is not None and "origin" not in repo.remotes: 1010 logger.info(f"First publish to remote url {remote_url}") 1011 else: 1012 logger.info("Nothing to publish, no changes to the repo") 1013 return 1014 1015 # get next version 1016 new_version_str = generate_version_tag(repo, increment) 1017 logger.info(f"New version: {new_version_str}") 1018 1019 # Write version in configuration file 1020 config_path = phen_path / CONFIG_FILE 1021 with config_path.open("r") as file: 1022 config = yaml.safe_load(file) 1023 1024 config["phenotype"]["version"] = new_version_str 1025 with open(config_path, "w") as file: 1026 yaml.dump( 1027 config, 1028 file, 1029 Dumper=util.QuotedDumper, 1030 default_flow_style=False, 1031 sort_keys=False, 1032 default_style='"', 1033 ) 1034 1035 # Add and commit changes to repo including version updates 1036 commit_message = f"Committing updates to phenotype {phen_path}" 1037 repo.git.add("--all") 1038 repo.index.commit(commit_message) 1039 1040 # Add tag to the repo 1041 repo.create_tag(new_version_str) 1042 1043 # push to origin if a remote repo 1044 if remote_url is not None and "origin" not in repo.remotes: 1045 git_url = construct_git_url(remote_url) 1046 repo.create_remote("origin", git_url) 1047 1048 try: 1049 if "origin" in repo.remotes: 1050 logger.debug(f"Remote 'origin' is set {repo.remotes.origin.url}") 1051 origin = repo.remotes.origin 1052 logger.info(f"Pushing main branch to remote repo") 1053 repo.git.push("--set-upstream", "origin", "main") 1054 logger.info(f"Pushing version tags to remote git repo") 1055 origin.push(tags=True) 1056 logger.debug("Changes pushed to 'origin'") 1057 else: 1058 logger.debug("Remote 'origin' is not set") 1059 except Exception as e: 1060 tag_ref = repo.tags[new_version_str] 1061 repo.delete_tag(tag_ref) 1062 repo.git.reset("--soft", "HEAD~1") 1063 raise e 1064 1065 logger.info(f"Phenotype published successfully")
Publishes updates to the phenotype by commiting all changes to the repo directory
1068def export(phen_dir: str, version: str): 1069 """Exports a phen repo at a specific tagged version into a target directory""" 1070 logger.info(f"Exporting phenotype {phen_dir} at version {version}") 1071 1072 # validate configuration 1073 validate(phen_dir) 1074 phen_path = Path(phen_dir) 1075 1076 # load configuration 1077 config_path = phen_path / CONFIG_FILE 1078 with config_path.open("r") as file: 1079 config = yaml.safe_load(file) 1080 1081 map_path = phen_path / MAP_DIR 1082 if not map_path.exists(): 1083 logger.warning(f"Map path does not exist '{map_path}'") 1084 1085 export_path = phen_path / OMOP_PATH 1086 # check export directory exists and if not create it 1087 if not export_path.exists(): 1088 export_path.mkdir(parents=True) 1089 logger.debug(f"OMOP export directory '{export_path}' created.") 1090 1091 # omop export db 1092 export_db_path = omop.export( 1093 map_path, 1094 export_path, 1095 config["phenotype"]["version"], 1096 config["phenotype"]["omop"], 1097 ) 1098 1099 # write to tables 1100 # export as csv 1101 logger.info(f"Phenotype exported successfully")
Exports a phen repo at a specific tagged version into a target directory
1104def copy(phen_dir: str, target_dir: str, version: str): 1105 """Copys a phen repo at a specific tagged version into a target directory""" 1106 1107 # Validate 1108 validate(phen_dir) 1109 phen_path = Path(phen_dir) 1110 1111 # Check target directory exists 1112 target_path = Path(target_dir) 1113 if not target_path.exists(): 1114 raise FileNotFoundError(f"The target directory {target_path} does not exist") 1115 1116 # Set copy directory 1117 copy_path = target_path / version 1118 logger.info(f"Copying repo {phen_path} to {copy_path}") 1119 1120 if ( 1121 copy_path.exists() and copy_path.is_dir() 1122 ): # Check if it exists and is a directory 1123 copy = check_delete_dir( 1124 copy_path, 1125 f"The directory {str(copy_path.resolve())} already exists. Do you want to overwrite? (yes/no): ", 1126 ) 1127 else: 1128 copy = True 1129 1130 if not copy: 1131 logger.info(f"Not copying the version {version}") 1132 return 1133 1134 logger.debug(f"Cloning repo from {phen_path} into {copy_path}...") 1135 repo = git.Repo.clone_from(phen_path, copy_path) 1136 1137 # Check out the latest commit or specified version 1138 if version: 1139 # Checkout a specific version (e.g., branch, tag, or commit hash) 1140 logger.info(f"Checking out version {version}...") 1141 repo.git.checkout(version) 1142 else: 1143 # Checkout the latest commit (HEAD) 1144 logger.info(f"Checking out the latest commit...") 1145 repo.git.checkout("HEAD") 1146 1147 logger.debug(f"Copied {phen_path} {repo.head.commit.hexsha[:7]} in {copy_path}") 1148 1149 logger.info(f"Phenotype copied successfully")
Copys a phen repo at a specific tagged version into a target directory
1153def extract_concepts(config_data: dict) -> Tuple[dict, Set[str]]: 1154 """Extracts concepts as {name: file_path} dictionary and a name set.""" 1155 concepts_dict = { 1156 item["name"]: item["file"]["path"] 1157 for item in config_data["phenotype"]["concept_sets"] 1158 } 1159 name_set = set(concepts_dict.keys()) 1160 return concepts_dict, name_set
Extracts concepts as {name: file_path} dictionary and a name set.
1163def extract_clean_deepdiff_keys(diff: dict, key_type: str) -> Set[Any]: 1164 """ 1165 Extracts clean keys from a DeepDiff dictionary. 1166 1167 :param diff: DeepDiff result dictionary 1168 :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed") 1169 :return: A set of clean key names 1170 """ 1171 return {key.split("root['")[1].split("']")[0] for key in diff.get(key_type, [])}
Extracts clean keys from a DeepDiff dictionary.
:param diff: DeepDiff result dictionary :param key_type: The type of change to extract (e.g., "dictionary_item_added", "dictionary_item_removed") :return: A set of clean key names
1174def diff_config(old_config: dict, new_config: dict) -> str: 1175 report = f"\n# Changes to phenotype configuration\n" 1176 report += f"This compares changes in the phenotype configuration including added, removed and renamed concept sets and changes to concept set source concept code file paths\n\n" 1177 1178 old_concepts, old_names = extract_concepts(old_config) 1179 new_concepts, new_names = extract_concepts(new_config) 1180 1181 # Check added and removed names 1182 added_names = new_names - old_names # Names that appear in new but not in old 1183 removed_names = old_names - new_names # Names that were in old but not in new 1184 1185 # find file path changes for unchanged names 1186 unchanged_names = old_names & new_names # Names that exist in both 1187 file_diff = DeepDiff( 1188 {name: old_concepts[name] for name in unchanged_names}, 1189 {name: new_concepts[name] for name in unchanged_names}, 1190 ) 1191 1192 # Find renamed concepts (same file, different name) 1193 renamed_concepts = [] 1194 for removed in removed_names: 1195 old_path = old_concepts[removed] 1196 for added in added_names: 1197 new_path = new_concepts[added] 1198 if old_path == new_path: 1199 renamed_concepts.append((removed, added)) 1200 1201 # Remove renamed concepts from added and removed sets 1202 for old_name, new_name in renamed_concepts: 1203 added_names.discard(new_name) 1204 removed_names.discard(old_name) 1205 1206 # generate config report 1207 if added_names: 1208 report += "## Added Concepts\n" 1209 for name in added_names: 1210 report += f"- `{name}` (File: `{new_concepts[name]}`)\n" 1211 report += "\n" 1212 1213 if removed_names: 1214 report += "## Removed Concepts\n" 1215 for name in removed_names: 1216 report += f"- `{name}` (File: `{old_concepts[name]}`)\n" 1217 report += "\n" 1218 1219 if renamed_concepts: 1220 report += "## Renamed Concepts\n" 1221 for old_name, new_name in renamed_concepts: 1222 report += ( 1223 f"- `{old_name}` ➝ `{new_name}` (File: `{old_concepts[old_name]}`)\n" 1224 ) 1225 report += "\n" 1226 1227 if "values_changed" in file_diff: 1228 report += "## Updated File Paths\n" 1229 for name, change in file_diff["values_changed"].items(): 1230 old_file = change["old_value"] 1231 new_file = change["new_value"] 1232 clean_name = name.split("root['")[1].split("']")[0] 1233 report += ( 1234 f"- `{clean_name}` changed file from `{old_file}` ➝ `{new_file}`\n" 1235 ) 1236 report += "\n" 1237 1238 if not ( 1239 added_names 1240 or removed_names 1241 or renamed_concepts 1242 or file_diff.get("values_changed") 1243 ): 1244 report += "No changes in concept sets.\n" 1245 1246 return report
1249def diff_map_files(old_map_path: Path, new_map_path: Path) -> str: 1250 old_output_files = [ 1251 file.name 1252 for file in old_map_path.iterdir() 1253 if file.is_file() and not file.name.startswith(".") 1254 ] 1255 new_output_files = [ 1256 file.name 1257 for file in new_map_path.iterdir() 1258 if file.is_file() and not file.name.startswith(".") 1259 ] 1260 1261 # Convert the lists to sets for easy comparison 1262 old_output_set = set(old_output_files) 1263 new_output_set = set(new_output_files) 1264 1265 # Outputs that are in old_output_set but not in new_output_set (removed files) 1266 removed_outputs = old_output_set - new_output_set 1267 # Outputs that are in new_output_set but not in old_output_set (added files) 1268 added_outputs = new_output_set - old_output_set 1269 # Outputs that are the intersection of old_output_set and new_output_set 1270 common_outputs = old_output_set & new_output_set 1271 1272 report = f"\n# Changes to available translations\n" 1273 report += f"This compares the coding translations files available.\n\n" 1274 report += f"- Removed outputs: {sorted(list(removed_outputs))}\n" 1275 report += f"- Added outputs: {sorted(list(added_outputs))}\n" 1276 report += f"- Common outputs: {sorted(list(common_outputs))}\n\n" 1277 1278 # Step N: Compare common outputs between versions 1279 report += f"# Changes to concepts in translation files\n\n" 1280 report += f"This compares the added and removed concepts in each of the coding translation files. Note that this might be different to the config.yaml if the translations have not been run for the current config.\n\n" 1281 for file in common_outputs: 1282 old_output = old_map_path / file 1283 new_output = new_map_path / file 1284 1285 logger.debug(f"Old ouptput: {str(old_output.resolve())}") 1286 logger.debug(f"New ouptput: {str(new_output.resolve())}") 1287 1288 df1 = pd.read_csv(old_output) 1289 df1 = df1[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1290 df2 = pd.read_csv(new_output) 1291 df2 = df2[["CONCEPT", "CONCEPT_SET"]].groupby("CONCEPT_SET").count() 1292 1293 # Check for added and removed concepts 1294 report += f"- File {file}\n" 1295 sorted_list = sorted(list(set(df1.index) - set(df2.index))) 1296 report += f"- Removed concepts {sorted_list}\n" 1297 sorted_list = sorted(list(set(df2.index) - set(df1.index))) 1298 report += f"- Added concepts {sorted_list}\n" 1299 1300 # Check for changed concepts 1301 diff = df2 - df1 # diff in counts 1302 diff = diff[ 1303 (~(diff["CONCEPT"] == 0.0)) & diff["CONCEPT"].notna() 1304 ] # get non-zero counts 1305 s = "\n" 1306 if len(diff.index) > 0: 1307 for concept, row in diff.iterrows(): 1308 s += "\t - {} {}\n".format(concept, row["CONCEPT"]) 1309 report += f"- Changed concepts {s}\n\n" 1310 else: 1311 report += f"- Changed concepts []\n\n" 1312 1313 return report
1316def diff_phen( 1317 new_phen_path: Path, 1318 new_version: str, 1319 old_phen_path: Path, 1320 old_version: str, 1321 report_path: Path, 1322): 1323 """Compare the differences between two versions of a phenotype""" 1324 1325 # validate phenotypes 1326 logger.debug(f"Validating for diff old path: {str(old_phen_path.resolve())}") 1327 validate(str(old_phen_path.resolve())) 1328 logger.debug(f"Validating for diff new path: {str(new_phen_path.resolve())}") 1329 validate(str(new_phen_path.resolve())) 1330 1331 # get old and new config 1332 old_config_path = old_phen_path / CONFIG_FILE 1333 with old_config_path.open("r") as file: 1334 old_config = yaml.safe_load(file) 1335 new_config_path = new_phen_path / CONFIG_FILE 1336 with new_config_path.open("r") as file: 1337 new_config = yaml.safe_load(file) 1338 1339 # write report heading 1340 report = f"# Phenotype Comparison Report\n" 1341 report += f"## Original phenotype\n" 1342 report += f" - {old_config['phenotype']['omop']['vocabulary_id']}\n" 1343 report += f" - {old_version}\n" 1344 report += f" - {str(old_phen_path.resolve())}\n" 1345 report += f"## Changed phenotype:\n" 1346 report += f" - {new_config['phenotype']['omop']['vocabulary_id']}\n" 1347 report += f" - {new_version}\n" 1348 report += f" - {str(new_phen_path.resolve())}\n" 1349 1350 # Step 1: check differences configuration files 1351 # Convert list of dicts into a dict: {name: file} 1352 report += diff_config(old_config, new_config) 1353 1354 # Step 2: check differences between map files 1355 # List files from output directories 1356 old_map_path = old_phen_path / MAP_DIR 1357 new_map_path = new_phen_path / MAP_DIR 1358 report += diff_map_files(old_map_path, new_map_path) 1359 1360 # initialise report file 1361 logger.debug(f"Writing to report file {str(report_path.resolve())}") 1362 report_file = open(report_path, "w") 1363 report_file.write(report) 1364 report_file.close() 1365 1366 logger.info(f"Phenotypes diff'd successfully")
Compare the differences between two versions of a phenotype
1369def diff(phen_dir: str, version: str, old_phen_dir: str, old_version: str): 1370 # make tmp directory .acmc 1371 timestamp = time.strftime("%Y%m%d_%H%M%S") 1372 temp_dir = Path(f".acmc/diff_{timestamp}") 1373 1374 changed_phen_path = Path(phen_dir) 1375 if not changed_phen_path.exists(): 1376 raise ValueError( 1377 f"Changed phenotype directory does not exist: {str(changed_phen_path.resolve())}" 1378 ) 1379 1380 old_phen_path = Path(old_phen_dir) 1381 if not old_phen_path.exists(): 1382 raise ValueError( 1383 f"Old phenotype directory does not exist: {str(old_phen_path.resolve())}" 1384 ) 1385 1386 try: 1387 # Create the directory 1388 temp_dir.mkdir(parents=True, exist_ok=True) 1389 logger.debug(f"Temporary directory created: {temp_dir}") 1390 1391 # Create temporary directories 1392 changed_path = temp_dir / "changed" 1393 changed_path.mkdir(parents=True, exist_ok=True) 1394 old_path = temp_dir / "old" 1395 old_path.mkdir(parents=True, exist_ok=True) 1396 1397 # checkout changed 1398 if version == "latest": 1399 logger.debug( 1400 f"Copying changed repo from {phen_dir} into {changed_path} at version {version}..." 1401 ) 1402 shutil.copytree(changed_phen_path, changed_path, dirs_exist_ok=True) 1403 else: 1404 logger.debug( 1405 f"Cloning changed repo from {phen_dir} into {changed_path} at version {version}..." 1406 ) 1407 changed_repo = git.Repo.clone_from(changed_phen_path, changed_path) 1408 changed_repo.git.checkout(version) 1409 1410 # checkout old 1411 if old_version == "latest": 1412 logger.debug( 1413 f"Copying old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1414 ) 1415 shutil.copytree(old_phen_path, old_path, dirs_exist_ok=True) 1416 else: 1417 logger.debug( 1418 f"Cloning old repo from {old_phen_dir} into {old_path} at version {old_version}..." 1419 ) 1420 old_repo = git.Repo.clone_from(old_phen_dir, old_path) 1421 old_repo.git.checkout(old_version) 1422 1423 report_filename = f"{version}_{old_version}_diff.md" 1424 report_path = changed_phen_path / report_filename 1425 # diff old with new 1426 diff_phen(changed_path, version, old_path, old_version, report_path) 1427 1428 finally: 1429 # clean up tmp directory 1430 if temp_dir.exists(): 1431 shutil.rmtree(temp_dir) 1432 print(f"Temporary directory removed: {temp_dir}")