Select Git revision
SubsScreen.react.jsx
-
James D'Alton authoredJames D'Alton authored
trud.py 14.75 KiB
import os
import sys
import requests
import json
import argparse
import shutil
from pathlib import Path
from base import bcolors
import hashlib
import zipfile
import pandas as pd
import simpledbf
# Constants
FQDN = "isd.digital.nhs.uk"
def error_exit(message):
print(message, "error")
sys.exit(1)
def validate_api_key(api_key):
"""Validate that the API key is 40-character hexadecimal."""
if not api_key or len(api_key) != 40 or not all(c in "0123456789abcdef" for c in api_key.lower()):
error_exit("Invalid API key format. Expected a 40-character hexadecimal string.")
def get_releases(item_id, API_KEY, latest=False):
"""Retrieve release information for an item from the TRUD API."""
url = f"https://{FQDN}/trud/api/v1/keys/{API_KEY}/items/{item_id}/releases"
if latest:
url += "?latest"
response = requests.get(url)
if response.status_code != 200:
error_exit(f"Failed to fetch releases for item {item_id}. Status code: {response.status_code}, error {response.json()['message']}. If no releases found for API key, please ensure you are subscribed to the data release and that it is not pending approval")
data = response.json()
if data.get("message") != "OK":
error_exit(data.get("message", "Unknown error occurred"))
return data.get("releases", [])
def download_release_file(item_id, release_ordinal, release, file_json_prefix, file_type=None, items_folder="build/maps/downloads"):
"""Download specified file type for a given release of an item."""
file_type = file_type or file_json_prefix
file_url = release.get(f"{file_json_prefix}FileUrl")
file_name = release.get(f"{file_json_prefix}FileName")
file_destination = os.path.join(items_folder, file_name)
if not file_url or not file_name:
error_exit(f"Missing {file_type} file information for release {release_ordinal} of item {item_id}.")
print(f"Downloading item {item_id} {file_type} file: {file_name} from {file_url} to {file_destination}")
response = requests.get(file_url, stream=True)
if response.status_code == 200:
with open(file_destination, "wb") as f:
f.write(response.content)
return file_destination
else:
error_exit(f"Failed to download {file_type} file for item {item_id}. Status code: {response.status_code}")
def validate_download_hash(file_destination:str, item_hash:str):
with open(file_destination, "rb") as f:
hash = hashlib.sha256(f.read()).hexdigest()
print(hash)
if hash.upper() == item_hash.upper():
print(f"Verified hash of {file_destination} {hash}")
else:
error_exit(f"Could not validate origin of {file_destination}. The SHA-256 hash should be: {item_hash}, but got {hash} instead")
def unzip_download(file_destination:str, items_folder="build/maps/downloads"):
with zipfile.ZipFile(file_destination, 'r') as zip_ref:
zip_ref.extractall(items_folder)
def extract_icd10():
#ICD10_edition5
file_path = Path('build') / 'maps' / 'downloads' / 'ICD10_Edition5_XML_20160401' / 'Content' / 'ICD10_Edition5_CodesAndTitlesAndMetadata_GB_20160401.xml'
df = pd.read_xml(file_path)
df = df[["CODE", "ALT_CODE", "DESCRIPTION"]]
df = df.rename(columns={"CODE":"icd10_code",
"ALT_CODE":"icd10_alt_code",
"DESCRIPTION":"description"
})
df.to_parquet("build/maps/processed/icd10_code.parquet", index=False)
print("Extracted ", "build/maps/processed/icd10_code.parquet")
def extract_opsc4():
file_path = Path('build') / 'maps' / 'downloads' / 'OPCS410 Data files txt' / 'OPCS410 CodesAndTitles Nov 2022 V1.0.txt'
df = pd.read_csv(file_path, sep='\t', dtype=str, header=None)
df = df.rename(columns={0:"opcs4_code", 1:"description"})
df.to_parquet("build/maps/processed/opcs4_code.parquet", index=False)
print("Extracted ", "build/maps/processed/opcs4_code.parquet")
def extract_nhs_data_migrations():
#NHS Data Migrations
#snomed only
file_path = Path('build') / 'maps' / 'downloads' / 'Mapping Tables' / 'Updated' / 'Clinically Assured' / 'sctcremap_uk_20200401000001.txt'
df = pd.read_csv(file_path, sep='\t')
df = df[["SCT_CONCEPTID"]]
df = df.rename(columns={"SCT_CONCEPTID":"snomed_code"})
df = df.drop_duplicates()
df = df.astype(str)
df.to_parquet("build/maps/processed/snomed_code.parquet", index=False)
print("Extracted ", "build/maps/processed/snomed_code.parquet")
#r2 -> r3
file_path = Path('build') / 'maps' / 'downloads' / 'Mapping Tables' / 'Updated' / 'Clinically Assured' / 'rctctv3map_uk_20200401000001.txt'
df = pd.read_csv(file_path, sep='\t')
df = df[["V2_CONCEPTID", "CTV3_CONCEPTID"]]
df = df.rename(columns={"V2_CONCEPTID":"read2_code",
"CTV3_CONCEPTID":"read3_code"})
df.to_parquet("build/maps/processed/read2_code_to_read3_code.parquet", index=False)
print("Extracted ", "build/maps/processed/read2_code_to_read3_code.parquet")
#r3->r2
file_path = Path('build') / 'maps' / 'downloads' / 'Mapping Tables' / 'Updated' / 'Clinically Assured' / 'ctv3rctmap_uk_20200401000002.txt'
df = pd.read_csv(file_path, sep='\t')
df = df[["CTV3_CONCEPTID", "V2_CONCEPTID"]]
df = df.rename(columns={"CTV3_CONCEPTID":"read3_code",
"V2_CONCEPTID":"read2_code"})
df = df.drop_duplicates()
df = df[~df["read2_code"].str.match("^.*_.*$")] #remove r2 codes with '_'
df.to_parquet("build/maps/processed/read3_code_to_read2_code.parquet", index=False)
print("Extracted ", "build/maps/processed/read3_code_to_read2_code.parquet")
#r2 -> snomed
file_path = Path('build') / 'maps' / 'downloads' / 'Mapping Tables' / 'Updated' / 'Clinically Assured' / 'rcsctmap2_uk_20200401000001.txt'
df = pd.read_csv(file_path, sep='\t', dtype=str)
df = df[["ReadCode", "ConceptId"]]
df = df.rename(columns={"ReadCode":"read2_code",
"ConceptId":"snomed_code"})
df.to_parquet("build/maps/processed/read2_code_to_snomed_code.parquet", index=False)
print("Extracted ", "build/maps/processed/read2_code_to_snomed_code.parquet")
#r3->snomed
file_path = Path('build') / 'maps' / 'downloads' / 'Mapping Tables' / 'Updated' / 'Clinically Assured' / 'ctv3sctmap2_uk_20200401000001.txt'
df = pd.read_csv(file_path, sep='\t', dtype=str)
df = df[["CTV3_TERMID", "SCT_CONCEPTID"]]
df = df.rename(columns={"CTV3_TERMID":"read3_code",
"SCT_CONCEPTID":"snomed_code"})
df["snomed_code"] = df["snomed_code"].astype(str)
df = df[~df["snomed_code"].str.match("^.*_.*$")] #remove snomed codes with '_'
df.to_parquet("build/maps/processed/read3_code_to_snomed_code.parquet", index=False)
print("Extracted ", "build/maps/processed/read3_code_to_snomed_code.parquet")
def extract_nhs_read_browser():
#r2 only
df = simpledbf.Dbf5('build/maps/downloads/Standard/V2/ANCESTOR.DBF').to_dataframe()
df = pd.concat([df['READCODE'], df['DESCENDANT']])
df = pd.DataFrame(df.drop_duplicates())
df = df.rename(columns={0:"read2_code"})
df.to_parquet("build/maps/processed/read2_code.parquet", index=False)
print("Extracted ", "build/maps/processed/read2_code.parquet")
#r2 -> atc
df = simpledbf.Dbf5('build/maps/downloads/Standard/V2/ATC.DBF').to_dataframe()
df = df[["READCODE", "ATC"]]
df = df.rename(columns={"READCODE":"read2_code", "ATC":"atc_code"})
df.to_parquet("build/maps/processed/read2_code_to_atc_code.parquet", index=False)
print("Extracted ", "build/maps/processed/read2_code_to_atc_code.parquet")
#r2 -> icd10
df = simpledbf.Dbf5('build/maps/downloads/Standard/V2/ICD10.DBF').to_dataframe()
df = df[["READ_CODE", "TARG_CODE"]]
df = df.rename(columns={"READ_CODE":"read2_code", "TARG_CODE":"icd10_code"})
df = df[~df["icd10_code"].str.match("^.*-.*$")] #remove codes with '-'
df = df[~df["read2_code"].str.match("^.*-.*$")] #remove codes with '-'
df.to_parquet("build/maps/processed/read2_code_to_icd10_code.parquet", index=False)
print("Extracted ", "build/maps/processed/read2_code_to_icd10_code.parquet")
#r2 -> opcs4
df = simpledbf.Dbf5('build/maps/downloads/Standard/V2/OPCS4V3.DBF').to_dataframe()
df = df[["READ_CODE", "TARG_CODE"]]
df = df.rename(columns={"READ_CODE":"read2_code", "TARG_CODE":"opcs4_code"})
df = df[~df["opcs4_code"].str.match("^.*-.*$")] #remove codes with '-'
df = df[~df["read2_code"].str.match("^.*-.*$")] #remove codes with '-'
df.to_parquet("build/maps/processed/read2_code_to_opcs4_code.parquet", index=False)
print("Extracted ", "build/maps/processed/read2_code_to_opcs4_code.parquet")
#r3 only
df = simpledbf.Dbf5('build/maps/downloads/Standard/V3/ANCESTOR.DBF').to_dataframe()
df = pd.concat([df['READCODE'], df['DESCENDANT']])
df = pd.DataFrame(df.drop_duplicates())
df = df.rename(columns={0:"read3_code"})
df.to_parquet("build/maps/processed/read3_code.parquet", index=False)
print("Extracted ", "build/maps/processed/read3_code.parquet")
#r3 -> icd10
df = simpledbf.Dbf5('build/maps/downloads/Standard/V3/ICD10.DBF').to_dataframe()
df = df[["READ_CODE", "TARG_CODE"]]
df = df.rename(columns={"READ_CODE":"read3_code", "TARG_CODE":"icd10_code"})
df = df[~df["icd10_code"].str.match("^.*-.*$")] #remove codes with '-'
df = df[~df["read3_code"].str.match("^.*-.*$")] #remove codes with '-'
df.to_parquet("build/maps/processed/read3_code_to_icd10_code.parquet", index=False)
print("Extracted ", "build/maps/processed/read3_code_to_icd10_code.parquet")
#r3 -> icd9
# dbf = simpledbf.Dbf5('build/maps/downloads/Standard/V3/ICD9V3.DBF')
#r3 -> opcs4
df = simpledbf.Dbf5('build/maps/downloads/Standard/V3/OPCS4V3.DBF').to_dataframe()
df = df[["READ_CODE", "TARG_CODE"]]
df = df.rename(columns={"READ_CODE":"read3_code", "TARG_CODE":"opcs4_code"})
df = df[~df["opcs4_code"].str.match("^.*-.*$")] #remove codes with '-'
df = df[~df["read3_code"].str.match("^.*-.*$")] #remove codes with '-'
df.to_parquet("build/maps/processed/read3_code_to_opcs4_code.parquet", index=False)
print("Extracted ", "build/maps/processed/read3_code_to_opcs4_code.parquet")
def create_build_directories(build_dir='build'):
"""Create build directories."""
build_path = Path(build_dir)
# Check if build directory exists
create_build_dirs = False
if build_path.exists() and build_path.is_dir():
user_input = input(f"The build directory {build_path} already exists. Do you want to delete and recreate all data? (y/n): ").strip().lower()
if user_input == "y":
# delete all build files
shutil.rmtree(build_path)
create_build_dirs = True
else:
create_build_dirs = True
if create_build_dirs:
# create build directory
build_path.mkdir(parents=True, exist_ok=True)
# create maps directories
maps_path = build_path / 'maps'
maps_path.mkdir(parents=True, exist_ok=True)
maps_download_path = maps_path / 'downloads'
maps_download_path.mkdir(parents=True, exist_ok=True)
maps_processed_path = maps_path / 'processed'
maps_processed_path.mkdir(parents=True,exist_ok=True)
def main():
print("Processing TRUD files")
parser = argparse.ArgumentParser(
description="Download releases of items using the TRUD API.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("--key", type=str, help="TRUD API Key")
# parser.add_argument("item_ids", nargs="+", help="Item IDs to download releases for.")
# parser.add_argument("-l", "--latest", action="store_true", help="Download only the latest release")
# parser.add_argument("-c", "--checksum", action="store_true", help="Also download the checksum file")
# parser.add_argument("-s", "--signature", action="store_true", help="Also download the signature file")
# parser.add_argument("-p", "--public_key", action="store_true", help="Also download the public key file")
args = parser.parse_args()
create_build_directories()
items_latest = True
items_folder = "build/maps/downloads"
items = [
{
"id": 259,
"name": "NHS ICD-10 5th Edition XML data files",
"hash": "A4F7BBA6E86349AADD0F4696C5E91152EB99CC06121427FC359160439B9F883F",
"extract": extract_icd10,
},
{
"id": 119,
"name": "OPCS-4 data files",
"hash": "0615A2BF43FFEF94517F1D1E0C05493B627839F323F22C52CBCD8B40BF767CD3",
"extract": extract_opsc4,
},
{
"id": 9,
"name": "NHS Data Migration",
"hash": "D4317B3ADBA6E1247CF17F0B7CD2B8850FD36C0EA2923BF684EA6159F3A54765",
"extract": extract_nhs_data_migrations,
},
{
"id": 8,
"name": "NHS Read Browser",
"hash": "1FFF2CBF11D0E6D7FC6CC6F13DD52D2F459095C3D83A3F754E6C359F16913C5E",
"extract": extract_nhs_read_browser,
},
# TODO: Download BNF from separate site? https://www.nhsbsa.nhs.uk/sites/default/files/2024-10/BNF%20Snomed%20Mapping%20data%2020241016.zip
]
# Validate and process each item ID
for item in items:
item_id = item["id"]
print(bcolors.HEADER, "---"+item["name"]+"---", bcolors.ENDC)
releases = get_releases(item_id, API_KEY=args.key, latest=items_latest)
if not releases:
error_exit(f"No releases found for item {item_id}.")
# Process each release in reverse order
for release_ordinal, release in enumerate(releases[::-1], 1):
# Download archive file
file_destination = download_release_file(item_id, release_ordinal, release, "archive", items_folder=items_folder)
# Optional files
# if items.checksum:
# download_release_file(item["id"], release_ordinal, release, "checksum")
# if items.signature:
# download_release_file(item["id"], release_ordinal, release, "signature")
# if items.public_key:
# download_release_file(item["id"], release_ordinal, release, "publicKey", "public key")
#Verify Hash if available
if "hash" in item:
validate_download_hash(file_destination, item["hash"])
#Unzip downloaded .zip
unzip_download(file_destination, items_folder=items_folder)
#Extract Tables to parquet
if "extract" in item:
item["extract"]()
print(f"Downloaded {release_ordinal} release(s) for item {item_id}.")
print(f"Successfully completed TRUD processing")
if __name__ == "__main__":
main()