acmc.trud
trud.py module
This module provides functionality to manage installation of the NHS TRUD vocabularies.
1""" 2trud.py module 3 4This module provides functionality to manage installation of the NHS TRUD vocabularies. 5 6""" 7 8import os 9import sys 10import requests 11import argparse 12import shutil 13import hashlib 14import zipfile 15import pandas as pd 16import simpledbf # type: ignore 17import yaml 18from pathlib import Path 19from acmc import util, logging_config as lc 20 21# setup logging 22_logger = lc.setup_logger() 23 24FQDN = "isd.digital.nhs.uk" 25"""Fully Qualified Domain Name of NHS digital TRUD service API""" 26 27VOCAB_PATH = Path("./vocab/trud") 28"""Default path to the TRUD vocabulary directory relative to the the acmc execution directory""" 29 30VERSION_FILE = "trud_version.yml" 31"""TRUD version file""" 32 33VERSION_PATH = VOCAB_PATH / VERSION_FILE 34"""Default path to the TRUD version file""" 35 36DOWNLOADS_PATH = VOCAB_PATH / "downloads" 37"""Default path to the TRUD vocabulary downloads directory""" 38 39PROCESSED_PATH = VOCAB_PATH / "processed" 40""" Default path to the processed TRUD mappings directory""" 41 42 43def get_releases(item_id: str, API_KEY: str, latest=False) -> list: 44 """Retrieve release information for an item from the TRUD API.""" 45 url = f"https://{FQDN}/trud/api/v1/keys/{API_KEY}/items/{item_id}/releases" 46 if latest: 47 url += "?latest" 48 49 response = requests.get(url) 50 if response.status_code != 200: 51 _logger.error( 52 f"Failed to fetch releases for item {item_id}. Status code: {response.status_code}, error {response.json()['message']}. If no releases found for API key, please ensure you are subscribed to the data release and that it is not pending approval" 53 ) 54 response.raise_for_status() 55 56 data = response.json() 57 if data.get("message") != "OK": 58 msg = f"Unknown error occurred {data.get('message')}" 59 _logger.error(msg) 60 raise Exception(msg) 61 62 return data.get("releases", []) 63 64 65def download_release_file( 66 item_id: str, release_ordinal: str, release: dict, file_json_prefix: str 67) -> Path: 68 """Download specified file type for a given release of an item.""" 69 70 # check folder is a directory 71 if not DOWNLOADS_PATH.is_dir(): 72 raise NotADirectoryError( 73 f"Error: '{DOWNLOADS_PATH}' for TRUD resources is not a directory" 74 ) 75 76 file_type = file_json_prefix 77 file_url = release.get(f"{file_json_prefix}FileUrl") 78 if file_url == None: 79 raise ValueError(f"File url not in json data {file_json_prefix}FileUrl") 80 81 file_name = release.get(f"{file_json_prefix}FileName") 82 if file_name == None: 83 raise ValueError(f"File name not in json data {file_json_prefix}FileName") 84 85 file_destination = DOWNLOADS_PATH / file_name 86 87 if not file_url or not file_name: 88 raise ValueError( 89 f"Missing {file_type} file information for release {release_ordinal} of item {item_id}." 90 ) 91 92 _logger.info( 93 f"Downloading item {item_id} {file_type} file: {file_name} from {file_url} to {file_destination}" 94 ) 95 response = requests.get(file_url, stream=True) 96 97 if response.status_code == 200: 98 with open(file_destination, "wb") as f: 99 f.write(response.content) 100 else: 101 _logger.error( 102 f"Failed to download {file_type} file for item {item_id}. Status code: {response.status_code}" 103 ) 104 response.raise_for_status() 105 106 return file_destination 107 108 109def validate_download_hash(file_destination: str, item_hash: str): 110 with open(file_destination, "rb") as f: 111 hash = hashlib.sha256(f.read()).hexdigest() 112 _logger.debug(hash) 113 if hash.upper() == item_hash.upper(): 114 _logger.debug(f"Verified hash of {file_destination} {hash}") 115 else: 116 msg = f"Could not validate origin of {file_destination}. The SHA-256 hash should be: {item_hash}, but got {hash} instead" 117 _logger.error(msg) 118 raise ValueError(msg) 119 120 121def unzip_download(file_destination: str): 122 # check folder is a directory 123 if not DOWNLOADS_PATH.is_dir(): 124 raise NotADirectoryError( 125 f"Error: '{DOWNLOADS_PATH}' for TRUD resoruces is not a directory" 126 ) 127 128 with zipfile.ZipFile(file_destination, "r") as zip_ref: 129 zip_ref.extractall(DOWNLOADS_PATH) 130 131 132def extract_icd10(): 133 # ICD10_edition5 134 file_path = ( 135 DOWNLOADS_PATH 136 / "ICD10_Edition5_XML_20160401" 137 / "Content" 138 / "ICD10_Edition5_CodesAndTitlesAndMetadata_GB_20160401.xml" 139 ) 140 df = pd.read_xml(file_path) 141 df = df[["CODE", "ALT_CODE", "DESCRIPTION"]] 142 df = df.rename( 143 columns={"CODE": "icd10", "ALT_CODE": "icd10_alt", "DESCRIPTION": "description"} 144 ) 145 output_path = PROCESSED_PATH / "icd10.parquet" 146 df.to_parquet(output_path, index=False) 147 _logger.info(f"Extracted: {output_path}") 148 149 150def extract_opsc4(): 151 file_path = ( 152 DOWNLOADS_PATH 153 / "OPCS410 Data files txt" 154 / "OPCS410 CodesAndTitles Nov 2022 V1.0.txt" 155 ) 156 157 df = pd.read_csv(file_path, sep="\t", dtype=str, header=None) 158 df = df.rename(columns={0: "opcs4", 1: "description"}) 159 160 output_path = PROCESSED_PATH / "opcs4.parquet" 161 df.to_parquet(output_path, index=False) 162 _logger.info(f"Extracted: {output_path}") 163 164 165def extract_nhs_data_migrations(): 166 # NHS Data Migrations 167 168 # snomed only 169 file_path = ( 170 DOWNLOADS_PATH 171 / "Mapping Tables" 172 / "Updated" 173 / "Clinically Assured" 174 / "sctcremap_uk_20200401000001.txt" 175 ) 176 df = pd.read_csv(file_path, sep="\t") 177 df = df[["SCT_CONCEPTID"]] 178 df = df.rename(columns={"SCT_CONCEPTID": "snomed"}) 179 df = df.drop_duplicates() 180 df = df.astype(str) 181 182 output_path = PROCESSED_PATH / "snomed.parquet" 183 df.to_parquet(output_path, index=False) 184 _logger.info(f"Extracted: {output_path}") 185 186 # r2 -> r3 187 file_path = ( 188 DOWNLOADS_PATH 189 / "Mapping Tables" 190 / "Updated" 191 / "Clinically Assured" 192 / "rctctv3map_uk_20200401000001.txt" 193 ) 194 df = pd.read_csv(file_path, sep="\t") 195 df = df[["V2_CONCEPTID", "CTV3_CONCEPTID"]] 196 df = df.rename(columns={"V2_CONCEPTID": "read2", "CTV3_CONCEPTID": "read3"}) 197 198 output_path = PROCESSED_PATH / "read2_to_read3.parquet" 199 df.to_parquet(output_path, index=False) 200 _logger.info(f"Extracted: {output_path}") 201 202 # r3->r2 203 file_path = ( 204 DOWNLOADS_PATH 205 / "Mapping Tables" 206 / "Updated" 207 / "Clinically Assured" 208 / "ctv3rctmap_uk_20200401000002.txt" 209 ) 210 df = pd.read_csv(file_path, sep="\t") 211 df = df[["CTV3_CONCEPTID", "V2_CONCEPTID"]] 212 df = df.rename(columns={"CTV3_CONCEPTID": "read3", "V2_CONCEPTID": "read2"}) 213 df = df.drop_duplicates() 214 df = df[~df["read2"].str.match("^.*_.*$")] # remove r2 codes with '_' 215 216 output_path = PROCESSED_PATH / "read3_to_read2.parquet" 217 df.to_parquet(output_path, index=False) 218 _logger.info(f"Extracted: {output_path}") 219 220 # r2 -> snomed 221 file_path = ( 222 DOWNLOADS_PATH 223 / "Mapping Tables" 224 / "Updated" 225 / "Clinically Assured" 226 / "rcsctmap2_uk_20200401000001.txt" 227 ) 228 df = pd.read_csv(file_path, sep="\t", dtype=str) 229 df = df[["ReadCode", "ConceptId"]] 230 df = df.rename(columns={"ReadCode": "read2", "ConceptId": "snomed"}) 231 232 output_path = PROCESSED_PATH / "read2_to_snomed.parquet" 233 df.to_parquet(output_path, index=False) 234 _logger.info(f"Extracted: {output_path}") 235 236 # r3->snomed 237 file_path = ( 238 DOWNLOADS_PATH 239 / "Mapping Tables" 240 / "Updated" 241 / "Clinically Assured" 242 / "ctv3sctmap2_uk_20200401000001.txt" 243 ) 244 df = pd.read_csv(file_path, sep="\t", dtype=str) 245 df = df[["CTV3_TERMID", "SCT_CONCEPTID"]] 246 df = df.rename(columns={"CTV3_TERMID": "read3", "SCT_CONCEPTID": "snomed"}) 247 df["snomed"] = df["snomed"].astype(str) 248 df = df[~df["snomed"].str.match("^.*_.*$")] # remove snomed codes with '_' 249 250 output_path = PROCESSED_PATH / "read3_to_snomed.parquet" 251 df.to_parquet(output_path, index=False) 252 _logger.info(f"Extracted: {output_path}") 253 254 255def extract_nhs_read_browser(): 256 # r2 only 257 input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ANCESTOR.DBF" 258 df = simpledbf.Dbf5(input_path).to_dataframe() 259 df = pd.concat([df["READCODE"], df["DESCENDANT"]]) 260 df = pd.DataFrame(df.drop_duplicates()) 261 df = df.rename(columns={0: "read2"}) 262 output_path = PROCESSED_PATH / "read2.parquet" 263 df.to_parquet(output_path, index=False) 264 _logger.info(f"Extracted: {output_path}") 265 266 # r2 -> atc 267 input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ATC.DBF" 268 df = simpledbf.Dbf5(input_path).to_dataframe() 269 df = df[["READCODE", "ATC"]] 270 df = df.rename(columns={"READCODE": "read2", "ATC": "atc"}) 271 output_path = PROCESSED_PATH / "read2_to_atc.parquet" 272 df.to_parquet(output_path, index=False) 273 _logger.info(f"Extracted: {output_path}") 274 275 # r2 -> icd10 276 input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ICD10.DBF" 277 df = simpledbf.Dbf5(input_path).to_dataframe() 278 df = df[["READ_CODE", "TARG_CODE"]] 279 df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "icd10"}) 280 df = df[~df["icd10"].str.match("^.*-.*$")] # remove codes with '-' 281 df = df[~df["read2"].str.match("^.*-.*$")] # remove codes with '-' 282 output_path = PROCESSED_PATH / "read2_to_icd10.parquet" 283 df.to_parquet(output_path, index=False) 284 _logger.info(f"Extracted: {output_path}") 285 286 # r2 -> opcs4 287 input_path = DOWNLOADS_PATH / "Standard" / "V2" / "OPCS4V3.DBF" 288 df = simpledbf.Dbf5(input_path).to_dataframe() 289 df = df[["READ_CODE", "TARG_CODE"]] 290 df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "opcs4"}) 291 df = df[~df["opcs4"].str.match("^.*-.*$")] # remove codes with '-' 292 df = df[~df["read2"].str.match("^.*-.*$")] # remove codes with '-' 293 output_path = PROCESSED_PATH / "read2_to_opcs4.parquet" 294 df.to_parquet(output_path, index=False) 295 _logger.info(f"Extracted: {output_path}") 296 297 # r3 only 298 input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ANCESTOR.DBF" 299 df = simpledbf.Dbf5(input_path).to_dataframe() 300 df = pd.concat([df["READCODE"], df["DESCENDANT"]]) 301 df = pd.DataFrame(df.drop_duplicates()) 302 df = df.rename(columns={0: "read3"}) 303 output_path = PROCESSED_PATH / "read3.parquet" 304 df.to_parquet(output_path, index=False) 305 _logger.info(f"Extracted: {output_path}") 306 307 # r3 -> icd10 308 input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ICD10.DBF" 309 df = simpledbf.Dbf5(input_path).to_dataframe() 310 df = df[["READ_CODE", "TARG_CODE"]] 311 df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "icd10"}) 312 df = df[~df["icd10"].str.match("^.*-.*$")] # remove codes with '-' 313 df = df[~df["read3"].str.match("^.*-.*$")] # remove codes with '-' 314 output_path = PROCESSED_PATH / "read3_to_icd10.parquet" 315 df.to_parquet(output_path, index=False) 316 _logger.info(f"Extracted: {output_path}") 317 318 # r3 -> icd9 319 # dbf = simpledbf.Dbf5('build/maps/downloads/Standard/V3/ICD9V3.DBF') 320 321 # r3 -> opcs4 322 input_path = DOWNLOADS_PATH / "Standard" / "V3" / "OPCS4V3.DBF" 323 df = simpledbf.Dbf5(input_path).to_dataframe() 324 df = df[["READ_CODE", "TARG_CODE"]] 325 df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "opcs4"}) 326 df = df[~df["opcs4"].str.match("^.*-.*$")] # remove codes with '-' 327 df = df[~df["read3"].str.match("^.*-.*$")] # remove codes with '-' 328 output_path = PROCESSED_PATH / "read3_to_opcs4.parquet" 329 df.to_parquet(output_path, index=False) 330 _logger.info(f"Extracted: {output_path}") 331 332 333def create_map_directories(): 334 """Create map directories.""" 335 336 # Check if build directory exists 337 create_map_dirs = False 338 if VOCAB_PATH.exists(): 339 user_input = ( 340 input( 341 f"The map directory {VOCAB_PATH} already exists. Do you want to download and process trud data again? (y/n): " 342 ) 343 .strip() 344 .lower() 345 ) 346 if user_input == "y": 347 # delete all build files 348 shutil.rmtree(VOCAB_PATH) 349 create_map_dirs = True 350 elif user_input == "n": 351 _logger.info("Exiting TRUD installation") 352 sys.exit(0) 353 else: 354 create_map_dirs = True 355 356 if create_map_dirs: 357 # create maps directories 358 VOCAB_PATH.mkdir(parents=True, exist_ok=True) 359 DOWNLOADS_PATH.mkdir(parents=True, exist_ok=True) 360 PROCESSED_PATH.mkdir(parents=True, exist_ok=True) 361 362 363def install(): 364 _logger.info(f"Installing TRUD") 365 366 # get TRUD api key from environment variable 367 api_key = os.getenv("ACMC_TRUD_API_KEY") 368 if not api_key: 369 raise ValueError( 370 "TRUD API KEY not found. Set the ACMC_TRUD_API_KEY environment variable." 371 ) 372 373 create_map_directories() 374 375 items_latest = True 376 items = [ 377 { 378 "id": 259, 379 "name": "NHS ICD-10 5th Edition XML data files", 380 "hash": "A4F7BBA6E86349AADD0F4696C5E91152EB99CC06121427FC359160439B9F883F", 381 "extract": extract_icd10, 382 }, 383 { 384 "id": 119, 385 "name": "OPCS-4 data files", 386 "hash": "0615A2BF43FFEF94517F1D1E0C05493B627839F323F22C52CBCD8B40BF767CD3", 387 "extract": extract_opsc4, 388 }, 389 { 390 "id": 9, 391 "name": "NHS Data Migration", 392 "hash": "D4317B3ADBA6E1247CF17F0B7CD2B8850FD36C0EA2923BF684EA6159F3A54765", 393 "extract": extract_nhs_data_migrations, 394 }, 395 { 396 "id": 8, 397 "name": "NHS Read Browser", 398 "hash": "1FFF2CBF11D0E6D7FC6CC6F13DD52D2F459095C3D83A3F754E6C359F16913C5E", 399 "extract": extract_nhs_read_browser, 400 }, 401 # TODO: Download BNF from separate site? https://www.nhsbsa.nhs.uk/sites/default/files/2024-10/BNF%20Snomed%20Mapping%20data%2020241016.zip 402 ] 403 404 # remove function from items to save versions 405 data = [{k: v for k, v in d.items() if k != "extract"} for d in items] 406 # save TRUD versions to file to main record of what was downloaded 407 with open(VERSION_PATH, "w") as file: 408 yaml.dump( 409 data, 410 file, 411 Dumper=util.QuotedDumper, 412 default_flow_style=False, 413 sort_keys=False, 414 default_style='"', 415 ) 416 417 # Validate and process each item ID 418 for item in items: 419 item_id = item["id"] 420 _logger.info(f"--- {item['name']} ---") 421 422 releases = get_releases(item_id, API_KEY=api_key, latest=items_latest) 423 if not releases: 424 raise ValueError(f"No releases found for item {item_id}.") 425 426 # Process each release in reverse order 427 for release_ordinal, release in enumerate(releases[::-1], 1): 428 # Download archive file 429 file_destination = download_release_file( 430 item_id, release_ordinal, release, "archive" 431 ) 432 433 # Optional files 434 # if items.checksum: 435 # download_release_file(item["id"], release_ordinal, release, "checksum") 436 # if items.signature: 437 # download_release_file(item["id"], release_ordinal, release, "signature") 438 # if items.public_key: 439 # download_release_file(item["id"], release_ordinal, release, "publicKey", "public key") 440 441 # Verify Hash if available 442 if "hash" in item: 443 validate_download_hash(file_destination, item["hash"]) 444 445 # Unzip downloaded .zip 446 unzip_download(file_destination) 447 448 # Extract Tables to parquet 449 if "extract" in item: 450 item["extract"]() 451 452 _logger.info(f"Downloaded {release_ordinal} release(s) for item {item_id}.") 453 454 _logger.info(f"TRUD installation completed")
FQDN =
'isd.digital.nhs.uk'
Fully Qualified Domain Name of NHS digital TRUD service API
VOCAB_PATH =
PosixPath('vocab/trud')
Default path to the TRUD vocabulary directory relative to the the acmc execution directory
VERSION_FILE =
'trud_version.yml'
TRUD version file
VERSION_PATH =
PosixPath('vocab/trud/trud_version.yml')
Default path to the TRUD version file
DOWNLOADS_PATH =
PosixPath('vocab/trud/downloads')
Default path to the TRUD vocabulary downloads directory
PROCESSED_PATH =
PosixPath('vocab/trud/processed')
Default path to the processed TRUD mappings directory
def
get_releases(item_id: str, API_KEY: str, latest=False) -> list:
44def get_releases(item_id: str, API_KEY: str, latest=False) -> list: 45 """Retrieve release information for an item from the TRUD API.""" 46 url = f"https://{FQDN}/trud/api/v1/keys/{API_KEY}/items/{item_id}/releases" 47 if latest: 48 url += "?latest" 49 50 response = requests.get(url) 51 if response.status_code != 200: 52 _logger.error( 53 f"Failed to fetch releases for item {item_id}. Status code: {response.status_code}, error {response.json()['message']}. If no releases found for API key, please ensure you are subscribed to the data release and that it is not pending approval" 54 ) 55 response.raise_for_status() 56 57 data = response.json() 58 if data.get("message") != "OK": 59 msg = f"Unknown error occurred {data.get('message')}" 60 _logger.error(msg) 61 raise Exception(msg) 62 63 return data.get("releases", [])
Retrieve release information for an item from the TRUD API.
def
download_release_file( item_id: str, release_ordinal: str, release: dict, file_json_prefix: str) -> pathlib.Path:
66def download_release_file( 67 item_id: str, release_ordinal: str, release: dict, file_json_prefix: str 68) -> Path: 69 """Download specified file type for a given release of an item.""" 70 71 # check folder is a directory 72 if not DOWNLOADS_PATH.is_dir(): 73 raise NotADirectoryError( 74 f"Error: '{DOWNLOADS_PATH}' for TRUD resources is not a directory" 75 ) 76 77 file_type = file_json_prefix 78 file_url = release.get(f"{file_json_prefix}FileUrl") 79 if file_url == None: 80 raise ValueError(f"File url not in json data {file_json_prefix}FileUrl") 81 82 file_name = release.get(f"{file_json_prefix}FileName") 83 if file_name == None: 84 raise ValueError(f"File name not in json data {file_json_prefix}FileName") 85 86 file_destination = DOWNLOADS_PATH / file_name 87 88 if not file_url or not file_name: 89 raise ValueError( 90 f"Missing {file_type} file information for release {release_ordinal} of item {item_id}." 91 ) 92 93 _logger.info( 94 f"Downloading item {item_id} {file_type} file: {file_name} from {file_url} to {file_destination}" 95 ) 96 response = requests.get(file_url, stream=True) 97 98 if response.status_code == 200: 99 with open(file_destination, "wb") as f: 100 f.write(response.content) 101 else: 102 _logger.error( 103 f"Failed to download {file_type} file for item {item_id}. Status code: {response.status_code}" 104 ) 105 response.raise_for_status() 106 107 return file_destination
Download specified file type for a given release of an item.
def
validate_download_hash(file_destination: str, item_hash: str):
110def validate_download_hash(file_destination: str, item_hash: str): 111 with open(file_destination, "rb") as f: 112 hash = hashlib.sha256(f.read()).hexdigest() 113 _logger.debug(hash) 114 if hash.upper() == item_hash.upper(): 115 _logger.debug(f"Verified hash of {file_destination} {hash}") 116 else: 117 msg = f"Could not validate origin of {file_destination}. The SHA-256 hash should be: {item_hash}, but got {hash} instead" 118 _logger.error(msg) 119 raise ValueError(msg)
def
unzip_download(file_destination: str):
122def unzip_download(file_destination: str): 123 # check folder is a directory 124 if not DOWNLOADS_PATH.is_dir(): 125 raise NotADirectoryError( 126 f"Error: '{DOWNLOADS_PATH}' for TRUD resoruces is not a directory" 127 ) 128 129 with zipfile.ZipFile(file_destination, "r") as zip_ref: 130 zip_ref.extractall(DOWNLOADS_PATH)
def
extract_icd10():
133def extract_icd10(): 134 # ICD10_edition5 135 file_path = ( 136 DOWNLOADS_PATH 137 / "ICD10_Edition5_XML_20160401" 138 / "Content" 139 / "ICD10_Edition5_CodesAndTitlesAndMetadata_GB_20160401.xml" 140 ) 141 df = pd.read_xml(file_path) 142 df = df[["CODE", "ALT_CODE", "DESCRIPTION"]] 143 df = df.rename( 144 columns={"CODE": "icd10", "ALT_CODE": "icd10_alt", "DESCRIPTION": "description"} 145 ) 146 output_path = PROCESSED_PATH / "icd10.parquet" 147 df.to_parquet(output_path, index=False) 148 _logger.info(f"Extracted: {output_path}")
def
extract_opsc4():
151def extract_opsc4(): 152 file_path = ( 153 DOWNLOADS_PATH 154 / "OPCS410 Data files txt" 155 / "OPCS410 CodesAndTitles Nov 2022 V1.0.txt" 156 ) 157 158 df = pd.read_csv(file_path, sep="\t", dtype=str, header=None) 159 df = df.rename(columns={0: "opcs4", 1: "description"}) 160 161 output_path = PROCESSED_PATH / "opcs4.parquet" 162 df.to_parquet(output_path, index=False) 163 _logger.info(f"Extracted: {output_path}")
def
extract_nhs_data_migrations():
166def extract_nhs_data_migrations(): 167 # NHS Data Migrations 168 169 # snomed only 170 file_path = ( 171 DOWNLOADS_PATH 172 / "Mapping Tables" 173 / "Updated" 174 / "Clinically Assured" 175 / "sctcremap_uk_20200401000001.txt" 176 ) 177 df = pd.read_csv(file_path, sep="\t") 178 df = df[["SCT_CONCEPTID"]] 179 df = df.rename(columns={"SCT_CONCEPTID": "snomed"}) 180 df = df.drop_duplicates() 181 df = df.astype(str) 182 183 output_path = PROCESSED_PATH / "snomed.parquet" 184 df.to_parquet(output_path, index=False) 185 _logger.info(f"Extracted: {output_path}") 186 187 # r2 -> r3 188 file_path = ( 189 DOWNLOADS_PATH 190 / "Mapping Tables" 191 / "Updated" 192 / "Clinically Assured" 193 / "rctctv3map_uk_20200401000001.txt" 194 ) 195 df = pd.read_csv(file_path, sep="\t") 196 df = df[["V2_CONCEPTID", "CTV3_CONCEPTID"]] 197 df = df.rename(columns={"V2_CONCEPTID": "read2", "CTV3_CONCEPTID": "read3"}) 198 199 output_path = PROCESSED_PATH / "read2_to_read3.parquet" 200 df.to_parquet(output_path, index=False) 201 _logger.info(f"Extracted: {output_path}") 202 203 # r3->r2 204 file_path = ( 205 DOWNLOADS_PATH 206 / "Mapping Tables" 207 / "Updated" 208 / "Clinically Assured" 209 / "ctv3rctmap_uk_20200401000002.txt" 210 ) 211 df = pd.read_csv(file_path, sep="\t") 212 df = df[["CTV3_CONCEPTID", "V2_CONCEPTID"]] 213 df = df.rename(columns={"CTV3_CONCEPTID": "read3", "V2_CONCEPTID": "read2"}) 214 df = df.drop_duplicates() 215 df = df[~df["read2"].str.match("^.*_.*$")] # remove r2 codes with '_' 216 217 output_path = PROCESSED_PATH / "read3_to_read2.parquet" 218 df.to_parquet(output_path, index=False) 219 _logger.info(f"Extracted: {output_path}") 220 221 # r2 -> snomed 222 file_path = ( 223 DOWNLOADS_PATH 224 / "Mapping Tables" 225 / "Updated" 226 / "Clinically Assured" 227 / "rcsctmap2_uk_20200401000001.txt" 228 ) 229 df = pd.read_csv(file_path, sep="\t", dtype=str) 230 df = df[["ReadCode", "ConceptId"]] 231 df = df.rename(columns={"ReadCode": "read2", "ConceptId": "snomed"}) 232 233 output_path = PROCESSED_PATH / "read2_to_snomed.parquet" 234 df.to_parquet(output_path, index=False) 235 _logger.info(f"Extracted: {output_path}") 236 237 # r3->snomed 238 file_path = ( 239 DOWNLOADS_PATH 240 / "Mapping Tables" 241 / "Updated" 242 / "Clinically Assured" 243 / "ctv3sctmap2_uk_20200401000001.txt" 244 ) 245 df = pd.read_csv(file_path, sep="\t", dtype=str) 246 df = df[["CTV3_TERMID", "SCT_CONCEPTID"]] 247 df = df.rename(columns={"CTV3_TERMID": "read3", "SCT_CONCEPTID": "snomed"}) 248 df["snomed"] = df["snomed"].astype(str) 249 df = df[~df["snomed"].str.match("^.*_.*$")] # remove snomed codes with '_' 250 251 output_path = PROCESSED_PATH / "read3_to_snomed.parquet" 252 df.to_parquet(output_path, index=False) 253 _logger.info(f"Extracted: {output_path}")
def
extract_nhs_read_browser():
256def extract_nhs_read_browser(): 257 # r2 only 258 input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ANCESTOR.DBF" 259 df = simpledbf.Dbf5(input_path).to_dataframe() 260 df = pd.concat([df["READCODE"], df["DESCENDANT"]]) 261 df = pd.DataFrame(df.drop_duplicates()) 262 df = df.rename(columns={0: "read2"}) 263 output_path = PROCESSED_PATH / "read2.parquet" 264 df.to_parquet(output_path, index=False) 265 _logger.info(f"Extracted: {output_path}") 266 267 # r2 -> atc 268 input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ATC.DBF" 269 df = simpledbf.Dbf5(input_path).to_dataframe() 270 df = df[["READCODE", "ATC"]] 271 df = df.rename(columns={"READCODE": "read2", "ATC": "atc"}) 272 output_path = PROCESSED_PATH / "read2_to_atc.parquet" 273 df.to_parquet(output_path, index=False) 274 _logger.info(f"Extracted: {output_path}") 275 276 # r2 -> icd10 277 input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ICD10.DBF" 278 df = simpledbf.Dbf5(input_path).to_dataframe() 279 df = df[["READ_CODE", "TARG_CODE"]] 280 df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "icd10"}) 281 df = df[~df["icd10"].str.match("^.*-.*$")] # remove codes with '-' 282 df = df[~df["read2"].str.match("^.*-.*$")] # remove codes with '-' 283 output_path = PROCESSED_PATH / "read2_to_icd10.parquet" 284 df.to_parquet(output_path, index=False) 285 _logger.info(f"Extracted: {output_path}") 286 287 # r2 -> opcs4 288 input_path = DOWNLOADS_PATH / "Standard" / "V2" / "OPCS4V3.DBF" 289 df = simpledbf.Dbf5(input_path).to_dataframe() 290 df = df[["READ_CODE", "TARG_CODE"]] 291 df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "opcs4"}) 292 df = df[~df["opcs4"].str.match("^.*-.*$")] # remove codes with '-' 293 df = df[~df["read2"].str.match("^.*-.*$")] # remove codes with '-' 294 output_path = PROCESSED_PATH / "read2_to_opcs4.parquet" 295 df.to_parquet(output_path, index=False) 296 _logger.info(f"Extracted: {output_path}") 297 298 # r3 only 299 input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ANCESTOR.DBF" 300 df = simpledbf.Dbf5(input_path).to_dataframe() 301 df = pd.concat([df["READCODE"], df["DESCENDANT"]]) 302 df = pd.DataFrame(df.drop_duplicates()) 303 df = df.rename(columns={0: "read3"}) 304 output_path = PROCESSED_PATH / "read3.parquet" 305 df.to_parquet(output_path, index=False) 306 _logger.info(f"Extracted: {output_path}") 307 308 # r3 -> icd10 309 input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ICD10.DBF" 310 df = simpledbf.Dbf5(input_path).to_dataframe() 311 df = df[["READ_CODE", "TARG_CODE"]] 312 df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "icd10"}) 313 df = df[~df["icd10"].str.match("^.*-.*$")] # remove codes with '-' 314 df = df[~df["read3"].str.match("^.*-.*$")] # remove codes with '-' 315 output_path = PROCESSED_PATH / "read3_to_icd10.parquet" 316 df.to_parquet(output_path, index=False) 317 _logger.info(f"Extracted: {output_path}") 318 319 # r3 -> icd9 320 # dbf = simpledbf.Dbf5('build/maps/downloads/Standard/V3/ICD9V3.DBF') 321 322 # r3 -> opcs4 323 input_path = DOWNLOADS_PATH / "Standard" / "V3" / "OPCS4V3.DBF" 324 df = simpledbf.Dbf5(input_path).to_dataframe() 325 df = df[["READ_CODE", "TARG_CODE"]] 326 df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "opcs4"}) 327 df = df[~df["opcs4"].str.match("^.*-.*$")] # remove codes with '-' 328 df = df[~df["read3"].str.match("^.*-.*$")] # remove codes with '-' 329 output_path = PROCESSED_PATH / "read3_to_opcs4.parquet" 330 df.to_parquet(output_path, index=False) 331 _logger.info(f"Extracted: {output_path}")
def
create_map_directories():
334def create_map_directories(): 335 """Create map directories.""" 336 337 # Check if build directory exists 338 create_map_dirs = False 339 if VOCAB_PATH.exists(): 340 user_input = ( 341 input( 342 f"The map directory {VOCAB_PATH} already exists. Do you want to download and process trud data again? (y/n): " 343 ) 344 .strip() 345 .lower() 346 ) 347 if user_input == "y": 348 # delete all build files 349 shutil.rmtree(VOCAB_PATH) 350 create_map_dirs = True 351 elif user_input == "n": 352 _logger.info("Exiting TRUD installation") 353 sys.exit(0) 354 else: 355 create_map_dirs = True 356 357 if create_map_dirs: 358 # create maps directories 359 VOCAB_PATH.mkdir(parents=True, exist_ok=True) 360 DOWNLOADS_PATH.mkdir(parents=True, exist_ok=True) 361 PROCESSED_PATH.mkdir(parents=True, exist_ok=True)
Create map directories.
def
install():
364def install(): 365 _logger.info(f"Installing TRUD") 366 367 # get TRUD api key from environment variable 368 api_key = os.getenv("ACMC_TRUD_API_KEY") 369 if not api_key: 370 raise ValueError( 371 "TRUD API KEY not found. Set the ACMC_TRUD_API_KEY environment variable." 372 ) 373 374 create_map_directories() 375 376 items_latest = True 377 items = [ 378 { 379 "id": 259, 380 "name": "NHS ICD-10 5th Edition XML data files", 381 "hash": "A4F7BBA6E86349AADD0F4696C5E91152EB99CC06121427FC359160439B9F883F", 382 "extract": extract_icd10, 383 }, 384 { 385 "id": 119, 386 "name": "OPCS-4 data files", 387 "hash": "0615A2BF43FFEF94517F1D1E0C05493B627839F323F22C52CBCD8B40BF767CD3", 388 "extract": extract_opsc4, 389 }, 390 { 391 "id": 9, 392 "name": "NHS Data Migration", 393 "hash": "D4317B3ADBA6E1247CF17F0B7CD2B8850FD36C0EA2923BF684EA6159F3A54765", 394 "extract": extract_nhs_data_migrations, 395 }, 396 { 397 "id": 8, 398 "name": "NHS Read Browser", 399 "hash": "1FFF2CBF11D0E6D7FC6CC6F13DD52D2F459095C3D83A3F754E6C359F16913C5E", 400 "extract": extract_nhs_read_browser, 401 }, 402 # TODO: Download BNF from separate site? https://www.nhsbsa.nhs.uk/sites/default/files/2024-10/BNF%20Snomed%20Mapping%20data%2020241016.zip 403 ] 404 405 # remove function from items to save versions 406 data = [{k: v for k, v in d.items() if k != "extract"} for d in items] 407 # save TRUD versions to file to main record of what was downloaded 408 with open(VERSION_PATH, "w") as file: 409 yaml.dump( 410 data, 411 file, 412 Dumper=util.QuotedDumper, 413 default_flow_style=False, 414 sort_keys=False, 415 default_style='"', 416 ) 417 418 # Validate and process each item ID 419 for item in items: 420 item_id = item["id"] 421 _logger.info(f"--- {item['name']} ---") 422 423 releases = get_releases(item_id, API_KEY=api_key, latest=items_latest) 424 if not releases: 425 raise ValueError(f"No releases found for item {item_id}.") 426 427 # Process each release in reverse order 428 for release_ordinal, release in enumerate(releases[::-1], 1): 429 # Download archive file 430 file_destination = download_release_file( 431 item_id, release_ordinal, release, "archive" 432 ) 433 434 # Optional files 435 # if items.checksum: 436 # download_release_file(item["id"], release_ordinal, release, "checksum") 437 # if items.signature: 438 # download_release_file(item["id"], release_ordinal, release, "signature") 439 # if items.public_key: 440 # download_release_file(item["id"], release_ordinal, release, "publicKey", "public key") 441 442 # Verify Hash if available 443 if "hash" in item: 444 validate_download_hash(file_destination, item["hash"]) 445 446 # Unzip downloaded .zip 447 unzip_download(file_destination) 448 449 # Extract Tables to parquet 450 if "extract" in item: 451 item["extract"]() 452 453 _logger.info(f"Downloaded {release_ordinal} release(s) for item {item_id}.") 454 455 _logger.info(f"TRUD installation completed")