acmc.trud
1import os 2import sys 3import requests 4import argparse 5import shutil 6import hashlib 7import zipfile 8import pandas as pd 9import simpledbf # type: ignore 10import yaml 11from pathlib import Path 12 13# setup logging 14from acmc import util, logging_config as lc 15 16logger = lc.setup_logger() 17 18# Constants 19FQDN = "isd.digital.nhs.uk" 20VOCAB_PATH = Path("./vocab/trud") 21VERSION_FILE = "trud_version.yaml" 22VERSION_PATH = VOCAB_PATH / VERSION_FILE 23DOWNLOADS_PATH = VOCAB_PATH / "downloads" 24PROCESSED_PATH = VOCAB_PATH / "processed" 25 26 27def get_releases(item_id: str, API_KEY: str, latest=False) -> list: 28 """Retrieve release information for an item from the TRUD API.""" 29 url = f"https://{FQDN}/trud/api/v1/keys/{API_KEY}/items/{item_id}/releases" 30 if latest: 31 url += "?latest" 32 33 response = requests.get(url) 34 if response.status_code != 200: 35 logger.error( 36 f"Failed to fetch releases for item {item_id}. Status code: {response.status_code}, error {response.json()['message']}. If no releases found for API key, please ensure you are subscribed to the data release and that it is not pending approval" 37 ) 38 response.raise_for_status() 39 40 data = response.json() 41 if data.get("message") != "OK": 42 msg = f"Unknown error occurred {data.get('message')}" 43 logger.error(msg) 44 raise Exception(msg) 45 46 return data.get("releases", []) 47 48 49def download_release_file( 50 item_id: str, 51 release_ordinal: str, 52 release: dict, 53 file_json_prefix: str, 54 file_type=None, 55) -> Path: 56 """Download specified file type for a given release of an item.""" 57 58 # check folder is a directory 59 if not DOWNLOADS_PATH.is_dir(): 60 raise NotADirectoryError( 61 f"Error: '{DOWNLOADS_PATH}' for TRUD resources is not a directory" 62 ) 63 64 file_type = file_type or file_json_prefix 65 file_url = release.get(f"{file_json_prefix}FileUrl") 66 file_name = release.get(f"{file_json_prefix}FileName") 67 file_destination = DOWNLOADS_PATH / file_name 68 69 if not file_url or not file_name: 70 raise ValueError( 71 f"Missing {file_type} file information for release {release_ordinal} of item {item_id}." 72 ) 73 74 logger.info( 75 f"Downloading item {item_id} {file_type} file: {file_name} from {file_url} to {file_destination}" 76 ) 77 response = requests.get(file_url, stream=True) 78 79 if response.status_code == 200: 80 with open(file_destination, "wb") as f: 81 f.write(response.content) 82 else: 83 logger.error( 84 f"Failed to download {file_type} file for item {item_id}. Status code: {response.status_code}" 85 ) 86 response.raise_for_status() 87 88 return file_destination 89 90 91def validate_download_hash(file_destination: str, item_hash: str): 92 with open(file_destination, "rb") as f: 93 hash = hashlib.sha256(f.read()).hexdigest() 94 logger.debug(hash) 95 if hash.upper() == item_hash.upper(): 96 logger.debug(f"Verified hash of {file_destination} {hash}") 97 else: 98 msg = f"Could not validate origin of {file_destination}. The SHA-256 hash should be: {item_hash}, but got {hash} instead" 99 logger.error(msg) 100 raise ValueError(msg) 101 102 103def unzip_download(file_destination: str): 104 105 # check folder is a directory 106 if not DOWNLOADS_PATH.is_dir(): 107 raise NotADirectoryError( 108 f"Error: '{DOWNLOADS_PATH}' for TRUD resoruces is not a directory" 109 ) 110 111 with zipfile.ZipFile(file_destination, "r") as zip_ref: 112 zip_ref.extractall(DOWNLOADS_PATH) 113 114 115def extract_icd10(): 116 # ICD10_edition5 117 file_path = ( 118 DOWNLOADS_PATH 119 / "ICD10_Edition5_XML_20160401" 120 / "Content" 121 / "ICD10_Edition5_CodesAndTitlesAndMetadata_GB_20160401.xml" 122 ) 123 df = pd.read_xml(file_path) 124 df = df[["CODE", "ALT_CODE", "DESCRIPTION"]] 125 df = df.rename( 126 columns={"CODE": "icd10", "ALT_CODE": "icd10_alt", "DESCRIPTION": "description"} 127 ) 128 output_path = PROCESSED_PATH / "icd10.parquet" 129 df.to_parquet(output_path, index=False) 130 logger.info(f"Extracted: {output_path}") 131 132 133def extract_opsc4(): 134 file_path = ( 135 DOWNLOADS_PATH 136 / "OPCS410 Data files txt" 137 / "OPCS410 CodesAndTitles Nov 2022 V1.0.txt" 138 ) 139 140 df = pd.read_csv(file_path, sep="\t", dtype=str, header=None) 141 df = df.rename(columns={0: "opcs4", 1: "description"}) 142 143 output_path = PROCESSED_PATH / "opcs4.parquet" 144 df.to_parquet(output_path, index=False) 145 logger.info(f"Extracted: {output_path}") 146 147 148def extract_nhs_data_migrations(): 149 # NHS Data Migrations 150 151 # snomed only 152 file_path = ( 153 DOWNLOADS_PATH 154 / "Mapping Tables" 155 / "Updated" 156 / "Clinically Assured" 157 / "sctcremap_uk_20200401000001.txt" 158 ) 159 df = pd.read_csv(file_path, sep="\t") 160 df = df[["SCT_CONCEPTID"]] 161 df = df.rename(columns={"SCT_CONCEPTID": "snomed"}) 162 df = df.drop_duplicates() 163 df = df.astype(str) 164 165 output_path = PROCESSED_PATH / "snomed.parquet" 166 df.to_parquet(output_path, index=False) 167 logger.info(f"Extracted: {output_path}") 168 169 # r2 -> r3 170 file_path = ( 171 DOWNLOADS_PATH 172 / "Mapping Tables" 173 / "Updated" 174 / "Clinically Assured" 175 / "rctctv3map_uk_20200401000001.txt" 176 ) 177 df = pd.read_csv(file_path, sep="\t") 178 df = df[["V2_CONCEPTID", "CTV3_CONCEPTID"]] 179 df = df.rename(columns={"V2_CONCEPTID": "read2", "CTV3_CONCEPTID": "read3"}) 180 181 output_path = PROCESSED_PATH / "read2_to_read3.parquet" 182 df.to_parquet(output_path, index=False) 183 logger.info(f"Extracted: {output_path}") 184 185 # r3->r2 186 file_path = ( 187 DOWNLOADS_PATH 188 / "Mapping Tables" 189 / "Updated" 190 / "Clinically Assured" 191 / "ctv3rctmap_uk_20200401000002.txt" 192 ) 193 df = pd.read_csv(file_path, sep="\t") 194 df = df[["CTV3_CONCEPTID", "V2_CONCEPTID"]] 195 df = df.rename(columns={"CTV3_CONCEPTID": "read3", "V2_CONCEPTID": "read2"}) 196 df = df.drop_duplicates() 197 df = df[~df["read2"].str.match("^.*_.*$")] # remove r2 codes with '_' 198 199 output_path = PROCESSED_PATH / "read3_to_read2.parquet" 200 df.to_parquet(output_path, index=False) 201 logger.info(f"Extracted: {output_path}") 202 203 # r2 -> snomed 204 file_path = ( 205 DOWNLOADS_PATH 206 / "Mapping Tables" 207 / "Updated" 208 / "Clinically Assured" 209 / "rcsctmap2_uk_20200401000001.txt" 210 ) 211 df = pd.read_csv(file_path, sep="\t", dtype=str) 212 df = df[["ReadCode", "ConceptId"]] 213 df = df.rename(columns={"ReadCode": "read2", "ConceptId": "snomed"}) 214 215 output_path = PROCESSED_PATH / "read2_to_snomed.parquet" 216 df.to_parquet(output_path, index=False) 217 logger.info(f"Extracted: {output_path}") 218 219 # r3->snomed 220 file_path = ( 221 DOWNLOADS_PATH 222 / "Mapping Tables" 223 / "Updated" 224 / "Clinically Assured" 225 / "ctv3sctmap2_uk_20200401000001.txt" 226 ) 227 df = pd.read_csv(file_path, sep="\t", dtype=str) 228 df = df[["CTV3_TERMID", "SCT_CONCEPTID"]] 229 df = df.rename(columns={"CTV3_TERMID": "read3", "SCT_CONCEPTID": "snomed"}) 230 df["snomed"] = df["snomed"].astype(str) 231 df = df[~df["snomed"].str.match("^.*_.*$")] # remove snomed codes with '_' 232 233 output_path = PROCESSED_PATH / "read3_to_snomed.parquet" 234 df.to_parquet(output_path, index=False) 235 logger.info(f"Extracted: {output_path}") 236 237 238def extract_nhs_read_browser(): 239 # r2 only 240 input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ANCESTOR.DBF" 241 df = simpledbf.Dbf5(input_path).to_dataframe() 242 df = pd.concat([df["READCODE"], df["DESCENDANT"]]) 243 df = pd.DataFrame(df.drop_duplicates()) 244 df = df.rename(columns={0: "read2"}) 245 output_path = PROCESSED_PATH / "read2.parquet" 246 df.to_parquet(output_path, index=False) 247 logger.info(f"Extracted: {output_path}") 248 249 # r2 -> atc 250 input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ATC.DBF" 251 df = simpledbf.Dbf5(input_path).to_dataframe() 252 df = df[["READCODE", "ATC"]] 253 df = df.rename(columns={"READCODE": "read2", "ATC": "atc"}) 254 output_path = PROCESSED_PATH / "read2_to_atc.parquet" 255 df.to_parquet(output_path, index=False) 256 logger.info(f"Extracted: {output_path}") 257 258 # r2 -> icd10 259 input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ICD10.DBF" 260 df = simpledbf.Dbf5(input_path).to_dataframe() 261 df = df[["READ_CODE", "TARG_CODE"]] 262 df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "icd10"}) 263 df = df[~df["icd10"].str.match("^.*-.*$")] # remove codes with '-' 264 df = df[~df["read2"].str.match("^.*-.*$")] # remove codes with '-' 265 output_path = PROCESSED_PATH / "read2_to_icd10.parquet" 266 df.to_parquet(output_path, index=False) 267 logger.info(f"Extracted: {output_path}") 268 269 # r2 -> opcs4 270 input_path = DOWNLOADS_PATH / "Standard" / "V2" / "OPCS4V3.DBF" 271 df = simpledbf.Dbf5(input_path).to_dataframe() 272 df = df[["READ_CODE", "TARG_CODE"]] 273 df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "opcs4"}) 274 df = df[~df["opcs4"].str.match("^.*-.*$")] # remove codes with '-' 275 df = df[~df["read2"].str.match("^.*-.*$")] # remove codes with '-' 276 output_path = PROCESSED_PATH / "read2_to_opcs4.parquet" 277 df.to_parquet(output_path, index=False) 278 logger.info(f"Extracted: {output_path}") 279 280 # r3 only 281 input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ANCESTOR.DBF" 282 df = simpledbf.Dbf5(input_path).to_dataframe() 283 df = pd.concat([df["READCODE"], df["DESCENDANT"]]) 284 df = pd.DataFrame(df.drop_duplicates()) 285 df = df.rename(columns={0: "read3"}) 286 output_path = PROCESSED_PATH / "read3.parquet" 287 df.to_parquet(output_path, index=False) 288 logger.info(f"Extracted: {output_path}") 289 290 # r3 -> icd10 291 input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ICD10.DBF" 292 df = simpledbf.Dbf5(input_path).to_dataframe() 293 df = df[["READ_CODE", "TARG_CODE"]] 294 df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "icd10"}) 295 df = df[~df["icd10"].str.match("^.*-.*$")] # remove codes with '-' 296 df = df[~df["read3"].str.match("^.*-.*$")] # remove codes with '-' 297 output_path = PROCESSED_PATH / "read3_to_icd10.parquet" 298 df.to_parquet(output_path, index=False) 299 logger.info(f"Extracted: {output_path}") 300 301 # r3 -> icd9 302 # dbf = simpledbf.Dbf5('build/maps/downloads/Standard/V3/ICD9V3.DBF') 303 304 # r3 -> opcs4 305 input_path = DOWNLOADS_PATH / "Standard" / "V3" / "OPCS4V3.DBF" 306 df = simpledbf.Dbf5(input_path).to_dataframe() 307 df = df[["READ_CODE", "TARG_CODE"]] 308 df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "opcs4"}) 309 df = df[~df["opcs4"].str.match("^.*-.*$")] # remove codes with '-' 310 df = df[~df["read3"].str.match("^.*-.*$")] # remove codes with '-' 311 output_path = PROCESSED_PATH / "read3_to_opcs4.parquet" 312 df.to_parquet(output_path, index=False) 313 logger.info(f"Extracted: {output_path}") 314 315 316def create_map_directories(): 317 """Create map directories.""" 318 319 # Check if build directory exists 320 create_map_dirs = False 321 if VOCAB_PATH.exists(): 322 user_input = ( 323 input( 324 f"The map directory {VOCAB_PATH} already exists. Do you want to download and process trud data again? (y/n): " 325 ) 326 .strip() 327 .lower() 328 ) 329 if user_input == "y": 330 # delete all build files 331 shutil.rmtree(VOCAB_PATH) 332 create_map_dirs = True 333 elif user_input == "n": 334 logger.info("Exiting TRUD installation") 335 sys.exit(0) 336 else: 337 create_map_dirs = True 338 339 if create_map_dirs: 340 # create maps directories 341 VOCAB_PATH.mkdir(parents=True, exist_ok=True) 342 DOWNLOADS_PATH.mkdir(parents=True, exist_ok=True) 343 PROCESSED_PATH.mkdir(parents=True, exist_ok=True) 344 345 346def install(): 347 logger.info(f"Installing TRUD") 348 349 # get TRUD api key from environment variable 350 api_key = os.getenv("ACMC_TRUD_API_KEY") 351 if not api_key: 352 raise ValueError( 353 "TRUD API KEY not found. Set the ACMC_TRUD_API_KEY environment variable." 354 ) 355 356 create_map_directories() 357 358 items_latest = True 359 items = [ 360 { 361 "id": 259, 362 "name": "NHS ICD-10 5th Edition XML data files", 363 "hash": "A4F7BBA6E86349AADD0F4696C5E91152EB99CC06121427FC359160439B9F883F", 364 "extract": extract_icd10, 365 }, 366 { 367 "id": 119, 368 "name": "OPCS-4 data files", 369 "hash": "0615A2BF43FFEF94517F1D1E0C05493B627839F323F22C52CBCD8B40BF767CD3", 370 "extract": extract_opsc4, 371 }, 372 { 373 "id": 9, 374 "name": "NHS Data Migration", 375 "hash": "D4317B3ADBA6E1247CF17F0B7CD2B8850FD36C0EA2923BF684EA6159F3A54765", 376 "extract": extract_nhs_data_migrations, 377 }, 378 { 379 "id": 8, 380 "name": "NHS Read Browser", 381 "hash": "1FFF2CBF11D0E6D7FC6CC6F13DD52D2F459095C3D83A3F754E6C359F16913C5E", 382 "extract": extract_nhs_read_browser, 383 }, 384 # TODO: Download BNF from separate site? https://www.nhsbsa.nhs.uk/sites/default/files/2024-10/BNF%20Snomed%20Mapping%20data%2020241016.zip 385 ] 386 387 # remove function from items to save versions 388 data = [{k: v for k, v in d.items() if k != "extract"} for d in items] 389 # save TRUD versions to file to main record of what was downloaded 390 with open(VERSION_PATH, "w") as file: 391 yaml.dump( 392 data, 393 file, 394 Dumper=util.QuotedDumper, 395 default_flow_style=False, 396 sort_keys=False, 397 default_style='"', 398 ) 399 400 # Validate and process each item ID 401 for item in items: 402 item_id = item["id"] 403 logger.info(f"--- {item['name']} ---") 404 405 releases = get_releases(item_id, API_KEY=api_key, latest=items_latest) 406 if not releases: 407 raise ValueError(f"No releases found for item {item_id}.") 408 409 # Process each release in reverse order 410 for release_ordinal, release in enumerate(releases[::-1], 1): 411 # Download archive file 412 file_destination = download_release_file( 413 item_id, release_ordinal, release, "archive" 414 ) 415 416 # Optional files 417 # if items.checksum: 418 # download_release_file(item["id"], release_ordinal, release, "checksum") 419 # if items.signature: 420 # download_release_file(item["id"], release_ordinal, release, "signature") 421 # if items.public_key: 422 # download_release_file(item["id"], release_ordinal, release, "publicKey", "public key") 423 424 # Verify Hash if available 425 if "hash" in item: 426 validate_download_hash(file_destination, item["hash"]) 427 428 # Unzip downloaded .zip 429 unzip_download(file_destination) 430 431 # Extract Tables to parquet 432 if "extract" in item: 433 item["extract"]() 434 435 logger.info(f"Downloaded {release_ordinal} release(s) for item {item_id}.") 436 437 logger.info(f"TRUD installation completed")
logger =
<Logger acmc_logger (INFO)>
FQDN =
'isd.digital.nhs.uk'
VOCAB_PATH =
PosixPath('vocab/trud')
VERSION_FILE =
'trud_version.yaml'
VERSION_PATH =
PosixPath('vocab/trud/trud_version.yaml')
DOWNLOADS_PATH =
PosixPath('vocab/trud/downloads')
PROCESSED_PATH =
PosixPath('vocab/trud/processed')
def
get_releases(item_id: str, API_KEY: str, latest=False) -> list:
28def get_releases(item_id: str, API_KEY: str, latest=False) -> list: 29 """Retrieve release information for an item from the TRUD API.""" 30 url = f"https://{FQDN}/trud/api/v1/keys/{API_KEY}/items/{item_id}/releases" 31 if latest: 32 url += "?latest" 33 34 response = requests.get(url) 35 if response.status_code != 200: 36 logger.error( 37 f"Failed to fetch releases for item {item_id}. Status code: {response.status_code}, error {response.json()['message']}. If no releases found for API key, please ensure you are subscribed to the data release and that it is not pending approval" 38 ) 39 response.raise_for_status() 40 41 data = response.json() 42 if data.get("message") != "OK": 43 msg = f"Unknown error occurred {data.get('message')}" 44 logger.error(msg) 45 raise Exception(msg) 46 47 return data.get("releases", [])
Retrieve release information for an item from the TRUD API.
def
download_release_file( item_id: str, release_ordinal: str, release: dict, file_json_prefix: str, file_type=None) -> pathlib.Path:
50def download_release_file( 51 item_id: str, 52 release_ordinal: str, 53 release: dict, 54 file_json_prefix: str, 55 file_type=None, 56) -> Path: 57 """Download specified file type for a given release of an item.""" 58 59 # check folder is a directory 60 if not DOWNLOADS_PATH.is_dir(): 61 raise NotADirectoryError( 62 f"Error: '{DOWNLOADS_PATH}' for TRUD resources is not a directory" 63 ) 64 65 file_type = file_type or file_json_prefix 66 file_url = release.get(f"{file_json_prefix}FileUrl") 67 file_name = release.get(f"{file_json_prefix}FileName") 68 file_destination = DOWNLOADS_PATH / file_name 69 70 if not file_url or not file_name: 71 raise ValueError( 72 f"Missing {file_type} file information for release {release_ordinal} of item {item_id}." 73 ) 74 75 logger.info( 76 f"Downloading item {item_id} {file_type} file: {file_name} from {file_url} to {file_destination}" 77 ) 78 response = requests.get(file_url, stream=True) 79 80 if response.status_code == 200: 81 with open(file_destination, "wb") as f: 82 f.write(response.content) 83 else: 84 logger.error( 85 f"Failed to download {file_type} file for item {item_id}. Status code: {response.status_code}" 86 ) 87 response.raise_for_status() 88 89 return file_destination
Download specified file type for a given release of an item.
def
validate_download_hash(file_destination: str, item_hash: str):
92def validate_download_hash(file_destination: str, item_hash: str): 93 with open(file_destination, "rb") as f: 94 hash = hashlib.sha256(f.read()).hexdigest() 95 logger.debug(hash) 96 if hash.upper() == item_hash.upper(): 97 logger.debug(f"Verified hash of {file_destination} {hash}") 98 else: 99 msg = f"Could not validate origin of {file_destination}. The SHA-256 hash should be: {item_hash}, but got {hash} instead" 100 logger.error(msg) 101 raise ValueError(msg)
def
unzip_download(file_destination: str):
104def unzip_download(file_destination: str): 105 106 # check folder is a directory 107 if not DOWNLOADS_PATH.is_dir(): 108 raise NotADirectoryError( 109 f"Error: '{DOWNLOADS_PATH}' for TRUD resoruces is not a directory" 110 ) 111 112 with zipfile.ZipFile(file_destination, "r") as zip_ref: 113 zip_ref.extractall(DOWNLOADS_PATH)
def
extract_icd10():
116def extract_icd10(): 117 # ICD10_edition5 118 file_path = ( 119 DOWNLOADS_PATH 120 / "ICD10_Edition5_XML_20160401" 121 / "Content" 122 / "ICD10_Edition5_CodesAndTitlesAndMetadata_GB_20160401.xml" 123 ) 124 df = pd.read_xml(file_path) 125 df = df[["CODE", "ALT_CODE", "DESCRIPTION"]] 126 df = df.rename( 127 columns={"CODE": "icd10", "ALT_CODE": "icd10_alt", "DESCRIPTION": "description"} 128 ) 129 output_path = PROCESSED_PATH / "icd10.parquet" 130 df.to_parquet(output_path, index=False) 131 logger.info(f"Extracted: {output_path}")
def
extract_opsc4():
134def extract_opsc4(): 135 file_path = ( 136 DOWNLOADS_PATH 137 / "OPCS410 Data files txt" 138 / "OPCS410 CodesAndTitles Nov 2022 V1.0.txt" 139 ) 140 141 df = pd.read_csv(file_path, sep="\t", dtype=str, header=None) 142 df = df.rename(columns={0: "opcs4", 1: "description"}) 143 144 output_path = PROCESSED_PATH / "opcs4.parquet" 145 df.to_parquet(output_path, index=False) 146 logger.info(f"Extracted: {output_path}")
def
extract_nhs_data_migrations():
149def extract_nhs_data_migrations(): 150 # NHS Data Migrations 151 152 # snomed only 153 file_path = ( 154 DOWNLOADS_PATH 155 / "Mapping Tables" 156 / "Updated" 157 / "Clinically Assured" 158 / "sctcremap_uk_20200401000001.txt" 159 ) 160 df = pd.read_csv(file_path, sep="\t") 161 df = df[["SCT_CONCEPTID"]] 162 df = df.rename(columns={"SCT_CONCEPTID": "snomed"}) 163 df = df.drop_duplicates() 164 df = df.astype(str) 165 166 output_path = PROCESSED_PATH / "snomed.parquet" 167 df.to_parquet(output_path, index=False) 168 logger.info(f"Extracted: {output_path}") 169 170 # r2 -> r3 171 file_path = ( 172 DOWNLOADS_PATH 173 / "Mapping Tables" 174 / "Updated" 175 / "Clinically Assured" 176 / "rctctv3map_uk_20200401000001.txt" 177 ) 178 df = pd.read_csv(file_path, sep="\t") 179 df = df[["V2_CONCEPTID", "CTV3_CONCEPTID"]] 180 df = df.rename(columns={"V2_CONCEPTID": "read2", "CTV3_CONCEPTID": "read3"}) 181 182 output_path = PROCESSED_PATH / "read2_to_read3.parquet" 183 df.to_parquet(output_path, index=False) 184 logger.info(f"Extracted: {output_path}") 185 186 # r3->r2 187 file_path = ( 188 DOWNLOADS_PATH 189 / "Mapping Tables" 190 / "Updated" 191 / "Clinically Assured" 192 / "ctv3rctmap_uk_20200401000002.txt" 193 ) 194 df = pd.read_csv(file_path, sep="\t") 195 df = df[["CTV3_CONCEPTID", "V2_CONCEPTID"]] 196 df = df.rename(columns={"CTV3_CONCEPTID": "read3", "V2_CONCEPTID": "read2"}) 197 df = df.drop_duplicates() 198 df = df[~df["read2"].str.match("^.*_.*$")] # remove r2 codes with '_' 199 200 output_path = PROCESSED_PATH / "read3_to_read2.parquet" 201 df.to_parquet(output_path, index=False) 202 logger.info(f"Extracted: {output_path}") 203 204 # r2 -> snomed 205 file_path = ( 206 DOWNLOADS_PATH 207 / "Mapping Tables" 208 / "Updated" 209 / "Clinically Assured" 210 / "rcsctmap2_uk_20200401000001.txt" 211 ) 212 df = pd.read_csv(file_path, sep="\t", dtype=str) 213 df = df[["ReadCode", "ConceptId"]] 214 df = df.rename(columns={"ReadCode": "read2", "ConceptId": "snomed"}) 215 216 output_path = PROCESSED_PATH / "read2_to_snomed.parquet" 217 df.to_parquet(output_path, index=False) 218 logger.info(f"Extracted: {output_path}") 219 220 # r3->snomed 221 file_path = ( 222 DOWNLOADS_PATH 223 / "Mapping Tables" 224 / "Updated" 225 / "Clinically Assured" 226 / "ctv3sctmap2_uk_20200401000001.txt" 227 ) 228 df = pd.read_csv(file_path, sep="\t", dtype=str) 229 df = df[["CTV3_TERMID", "SCT_CONCEPTID"]] 230 df = df.rename(columns={"CTV3_TERMID": "read3", "SCT_CONCEPTID": "snomed"}) 231 df["snomed"] = df["snomed"].astype(str) 232 df = df[~df["snomed"].str.match("^.*_.*$")] # remove snomed codes with '_' 233 234 output_path = PROCESSED_PATH / "read3_to_snomed.parquet" 235 df.to_parquet(output_path, index=False) 236 logger.info(f"Extracted: {output_path}")
def
extract_nhs_read_browser():
239def extract_nhs_read_browser(): 240 # r2 only 241 input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ANCESTOR.DBF" 242 df = simpledbf.Dbf5(input_path).to_dataframe() 243 df = pd.concat([df["READCODE"], df["DESCENDANT"]]) 244 df = pd.DataFrame(df.drop_duplicates()) 245 df = df.rename(columns={0: "read2"}) 246 output_path = PROCESSED_PATH / "read2.parquet" 247 df.to_parquet(output_path, index=False) 248 logger.info(f"Extracted: {output_path}") 249 250 # r2 -> atc 251 input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ATC.DBF" 252 df = simpledbf.Dbf5(input_path).to_dataframe() 253 df = df[["READCODE", "ATC"]] 254 df = df.rename(columns={"READCODE": "read2", "ATC": "atc"}) 255 output_path = PROCESSED_PATH / "read2_to_atc.parquet" 256 df.to_parquet(output_path, index=False) 257 logger.info(f"Extracted: {output_path}") 258 259 # r2 -> icd10 260 input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ICD10.DBF" 261 df = simpledbf.Dbf5(input_path).to_dataframe() 262 df = df[["READ_CODE", "TARG_CODE"]] 263 df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "icd10"}) 264 df = df[~df["icd10"].str.match("^.*-.*$")] # remove codes with '-' 265 df = df[~df["read2"].str.match("^.*-.*$")] # remove codes with '-' 266 output_path = PROCESSED_PATH / "read2_to_icd10.parquet" 267 df.to_parquet(output_path, index=False) 268 logger.info(f"Extracted: {output_path}") 269 270 # r2 -> opcs4 271 input_path = DOWNLOADS_PATH / "Standard" / "V2" / "OPCS4V3.DBF" 272 df = simpledbf.Dbf5(input_path).to_dataframe() 273 df = df[["READ_CODE", "TARG_CODE"]] 274 df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "opcs4"}) 275 df = df[~df["opcs4"].str.match("^.*-.*$")] # remove codes with '-' 276 df = df[~df["read2"].str.match("^.*-.*$")] # remove codes with '-' 277 output_path = PROCESSED_PATH / "read2_to_opcs4.parquet" 278 df.to_parquet(output_path, index=False) 279 logger.info(f"Extracted: {output_path}") 280 281 # r3 only 282 input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ANCESTOR.DBF" 283 df = simpledbf.Dbf5(input_path).to_dataframe() 284 df = pd.concat([df["READCODE"], df["DESCENDANT"]]) 285 df = pd.DataFrame(df.drop_duplicates()) 286 df = df.rename(columns={0: "read3"}) 287 output_path = PROCESSED_PATH / "read3.parquet" 288 df.to_parquet(output_path, index=False) 289 logger.info(f"Extracted: {output_path}") 290 291 # r3 -> icd10 292 input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ICD10.DBF" 293 df = simpledbf.Dbf5(input_path).to_dataframe() 294 df = df[["READ_CODE", "TARG_CODE"]] 295 df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "icd10"}) 296 df = df[~df["icd10"].str.match("^.*-.*$")] # remove codes with '-' 297 df = df[~df["read3"].str.match("^.*-.*$")] # remove codes with '-' 298 output_path = PROCESSED_PATH / "read3_to_icd10.parquet" 299 df.to_parquet(output_path, index=False) 300 logger.info(f"Extracted: {output_path}") 301 302 # r3 -> icd9 303 # dbf = simpledbf.Dbf5('build/maps/downloads/Standard/V3/ICD9V3.DBF') 304 305 # r3 -> opcs4 306 input_path = DOWNLOADS_PATH / "Standard" / "V3" / "OPCS4V3.DBF" 307 df = simpledbf.Dbf5(input_path).to_dataframe() 308 df = df[["READ_CODE", "TARG_CODE"]] 309 df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "opcs4"}) 310 df = df[~df["opcs4"].str.match("^.*-.*$")] # remove codes with '-' 311 df = df[~df["read3"].str.match("^.*-.*$")] # remove codes with '-' 312 output_path = PROCESSED_PATH / "read3_to_opcs4.parquet" 313 df.to_parquet(output_path, index=False) 314 logger.info(f"Extracted: {output_path}")
def
create_map_directories():
317def create_map_directories(): 318 """Create map directories.""" 319 320 # Check if build directory exists 321 create_map_dirs = False 322 if VOCAB_PATH.exists(): 323 user_input = ( 324 input( 325 f"The map directory {VOCAB_PATH} already exists. Do you want to download and process trud data again? (y/n): " 326 ) 327 .strip() 328 .lower() 329 ) 330 if user_input == "y": 331 # delete all build files 332 shutil.rmtree(VOCAB_PATH) 333 create_map_dirs = True 334 elif user_input == "n": 335 logger.info("Exiting TRUD installation") 336 sys.exit(0) 337 else: 338 create_map_dirs = True 339 340 if create_map_dirs: 341 # create maps directories 342 VOCAB_PATH.mkdir(parents=True, exist_ok=True) 343 DOWNLOADS_PATH.mkdir(parents=True, exist_ok=True) 344 PROCESSED_PATH.mkdir(parents=True, exist_ok=True)
Create map directories.
def
install():
347def install(): 348 logger.info(f"Installing TRUD") 349 350 # get TRUD api key from environment variable 351 api_key = os.getenv("ACMC_TRUD_API_KEY") 352 if not api_key: 353 raise ValueError( 354 "TRUD API KEY not found. Set the ACMC_TRUD_API_KEY environment variable." 355 ) 356 357 create_map_directories() 358 359 items_latest = True 360 items = [ 361 { 362 "id": 259, 363 "name": "NHS ICD-10 5th Edition XML data files", 364 "hash": "A4F7BBA6E86349AADD0F4696C5E91152EB99CC06121427FC359160439B9F883F", 365 "extract": extract_icd10, 366 }, 367 { 368 "id": 119, 369 "name": "OPCS-4 data files", 370 "hash": "0615A2BF43FFEF94517F1D1E0C05493B627839F323F22C52CBCD8B40BF767CD3", 371 "extract": extract_opsc4, 372 }, 373 { 374 "id": 9, 375 "name": "NHS Data Migration", 376 "hash": "D4317B3ADBA6E1247CF17F0B7CD2B8850FD36C0EA2923BF684EA6159F3A54765", 377 "extract": extract_nhs_data_migrations, 378 }, 379 { 380 "id": 8, 381 "name": "NHS Read Browser", 382 "hash": "1FFF2CBF11D0E6D7FC6CC6F13DD52D2F459095C3D83A3F754E6C359F16913C5E", 383 "extract": extract_nhs_read_browser, 384 }, 385 # TODO: Download BNF from separate site? https://www.nhsbsa.nhs.uk/sites/default/files/2024-10/BNF%20Snomed%20Mapping%20data%2020241016.zip 386 ] 387 388 # remove function from items to save versions 389 data = [{k: v for k, v in d.items() if k != "extract"} for d in items] 390 # save TRUD versions to file to main record of what was downloaded 391 with open(VERSION_PATH, "w") as file: 392 yaml.dump( 393 data, 394 file, 395 Dumper=util.QuotedDumper, 396 default_flow_style=False, 397 sort_keys=False, 398 default_style='"', 399 ) 400 401 # Validate and process each item ID 402 for item in items: 403 item_id = item["id"] 404 logger.info(f"--- {item['name']} ---") 405 406 releases = get_releases(item_id, API_KEY=api_key, latest=items_latest) 407 if not releases: 408 raise ValueError(f"No releases found for item {item_id}.") 409 410 # Process each release in reverse order 411 for release_ordinal, release in enumerate(releases[::-1], 1): 412 # Download archive file 413 file_destination = download_release_file( 414 item_id, release_ordinal, release, "archive" 415 ) 416 417 # Optional files 418 # if items.checksum: 419 # download_release_file(item["id"], release_ordinal, release, "checksum") 420 # if items.signature: 421 # download_release_file(item["id"], release_ordinal, release, "signature") 422 # if items.public_key: 423 # download_release_file(item["id"], release_ordinal, release, "publicKey", "public key") 424 425 # Verify Hash if available 426 if "hash" in item: 427 validate_download_hash(file_destination, item["hash"]) 428 429 # Unzip downloaded .zip 430 unzip_download(file_destination) 431 432 # Extract Tables to parquet 433 if "extract" in item: 434 item["extract"]() 435 436 logger.info(f"Downloaded {release_ordinal} release(s) for item {item_id}.") 437 438 logger.info(f"TRUD installation completed")