Skip to content
Snippets Groups Projects
Select Git revision
  • 47aeae38fcbb637ad2c536cb59992993da9a5550
  • dev default
  • 61-feature-add-optional-backwards-mapping-for-consistency-with-older-version
  • 61-feature-add-optional-backwards-mapping-for-consistency-with-older-version-2
  • main protected
  • 11-test-fix-tests-to-handle-licensed-data-resources-from-trud-snd-omop
  • general
  • pypi
  • old-main
  • v0.0.3
10 results

trud.py

Blame
  • trud.py 14.75 KiB
    import os
    import sys
    import requests
    import json
    import argparse
    import shutil
    from pathlib import Path
    
    from base import bcolors
    
    import hashlib
    import zipfile
    import pandas as pd
    import simpledbf
    
    # Constants
    FQDN = "isd.digital.nhs.uk"
    
    def error_exit(message):
        print(message, "error")
        sys.exit(1)
    
    def validate_api_key(api_key):
        """Validate that the API key is 40-character hexadecimal."""
        if not api_key or len(api_key) != 40 or not all(c in "0123456789abcdef" for c in api_key.lower()):
            error_exit("Invalid API key format. Expected a 40-character hexadecimal string.")
    
    def get_releases(item_id, API_KEY, latest=False):
        """Retrieve release information for an item from the TRUD API."""
        url = f"https://{FQDN}/trud/api/v1/keys/{API_KEY}/items/{item_id}/releases"
        if latest:
            url += "?latest"
    
        response = requests.get(url)
        if response.status_code != 200:
            error_exit(f"Failed to fetch releases for item {item_id}. Status code: {response.status_code}, error {response.json()['message']}. If no releases found for API key, please ensure you are subscribed to the data release and that it is not pending approval")
    
        data = response.json()
        if data.get("message") != "OK":
            error_exit(data.get("message", "Unknown error occurred"))
    
        return data.get("releases", [])
    
    def download_release_file(item_id, release_ordinal, release, file_json_prefix, file_type=None, items_folder="build/maps/downloads"):
        """Download specified file type for a given release of an item."""
        file_type = file_type or file_json_prefix
        file_url = release.get(f"{file_json_prefix}FileUrl")
        file_name = release.get(f"{file_json_prefix}FileName")
        file_destination = os.path.join(items_folder, file_name)
    
        if not file_url or not file_name:
            error_exit(f"Missing {file_type} file information for release {release_ordinal} of item {item_id}.")
    
        print(f"Downloading item {item_id} {file_type} file: {file_name} from {file_url} to {file_destination}")
        response = requests.get(file_url, stream=True)
        
        if response.status_code == 200:
            with open(file_destination, "wb") as f:
                f.write(response.content)
            return file_destination
        else:
            error_exit(f"Failed to download {file_type} file for item {item_id}. Status code: {response.status_code}")
    
    def validate_download_hash(file_destination:str, item_hash:str):
        with open(file_destination, "rb") as f:
            hash = hashlib.sha256(f.read()).hexdigest()
        print(hash)
        if hash.upper() == item_hash.upper():
            print(f"Verified hash of {file_destination} {hash}")
        else:
            error_exit(f"Could not validate origin of {file_destination}. The SHA-256 hash should be: {item_hash}, but got {hash} instead")
    
    def unzip_download(file_destination:str, items_folder="build/maps/downloads"):
        with zipfile.ZipFile(file_destination, 'r') as zip_ref:
            zip_ref.extractall(items_folder)
    
    def extract_icd10():
        #ICD10_edition5
        file_path = Path('build') / 'maps' / 'downloads' / 'ICD10_Edition5_XML_20160401' / 'Content' / 'ICD10_Edition5_CodesAndTitlesAndMetadata_GB_20160401.xml'
    
        df = pd.read_xml(file_path)
        df = df[["CODE", "ALT_CODE", "DESCRIPTION"]]
        df = df.rename(columns={"CODE":"icd10_code",
                                "ALT_CODE":"icd10_alt_code",
                                "DESCRIPTION":"description"
                            })
        df.to_parquet("build/maps/processed/icd10_code.parquet", index=False)
        print("Extracted ", "build/maps/processed/icd10_code.parquet")
    
    def extract_opsc4():
        file_path = Path('build') / 'maps' / 'downloads' / 'OPCS410 Data files txt' / 'OPCS410 CodesAndTitles Nov 2022 V1.0.txt'
        
        df = pd.read_csv(file_path, sep='\t', dtype=str, header=None)
        df = df.rename(columns={0:"opcs4_code", 1:"description"})
        df.to_parquet("build/maps/processed/opcs4_code.parquet", index=False)
        print("Extracted ", "build/maps/processed/opcs4_code.parquet")
    
    def extract_nhs_data_migrations():
        #NHS Data Migrations
        
        #snomed only
        file_path = Path('build') / 'maps' / 'downloads' / 'Mapping Tables' / 'Updated' / 'Clinically Assured' / 'sctcremap_uk_20200401000001.txt'    
        df = pd.read_csv(file_path, sep='\t')    
        df = df[["SCT_CONCEPTID"]]
        df = df.rename(columns={"SCT_CONCEPTID":"snomed_code"})
        df = df.drop_duplicates()
        df = df.astype(str)
        df.to_parquet("build/maps/processed/snomed_code.parquet", index=False)
        print("Extracted ", "build/maps/processed/snomed_code.parquet")
    
        #r2 -> r3
        file_path = Path('build') / 'maps' / 'downloads' / 'Mapping Tables' / 'Updated' / 'Clinically Assured' / 'rctctv3map_uk_20200401000001.txt'
        df = pd.read_csv(file_path, sep='\t')
        df = df[["V2_CONCEPTID", "CTV3_CONCEPTID"]]
        df = df.rename(columns={"V2_CONCEPTID":"read2_code",
                                "CTV3_CONCEPTID":"read3_code"})
        df.to_parquet("build/maps/processed/read2_code_to_read3_code.parquet", index=False)
        print("Extracted ", "build/maps/processed/read2_code_to_read3_code.parquet")
    
        #r3->r2
        file_path = Path('build') / 'maps' / 'downloads' / 'Mapping Tables' / 'Updated' / 'Clinically Assured' / 'ctv3rctmap_uk_20200401000002.txt'
        df = pd.read_csv(file_path, sep='\t')
        df = df[["CTV3_CONCEPTID", "V2_CONCEPTID"]]
        df = df.rename(columns={"CTV3_CONCEPTID":"read3_code", 
                                "V2_CONCEPTID":"read2_code"})
        df = df.drop_duplicates()
        df = df[~df["read2_code"].str.match("^.*_.*$")] #remove r2 codes with '_'
        df.to_parquet("build/maps/processed/read3_code_to_read2_code.parquet", index=False)
        print("Extracted ", "build/maps/processed/read3_code_to_read2_code.parquet")
    
        #r2 -> snomed
        file_path = Path('build') / 'maps' / 'downloads' / 'Mapping Tables' / 'Updated' / 'Clinically Assured' / 'rcsctmap2_uk_20200401000001.txt'
        df = pd.read_csv(file_path, sep='\t', dtype=str)
        df = df[["ReadCode", "ConceptId"]]
        df = df.rename(columns={"ReadCode":"read2_code",
                                "ConceptId":"snomed_code"})
        df.to_parquet("build/maps/processed/read2_code_to_snomed_code.parquet", index=False)
        print("Extracted ", "build/maps/processed/read2_code_to_snomed_code.parquet")
    
        #r3->snomed
        file_path = Path('build') / 'maps' / 'downloads' / 'Mapping Tables' / 'Updated' / 'Clinically Assured' / 'ctv3sctmap2_uk_20200401000001.txt'
        df = pd.read_csv(file_path, sep='\t', dtype=str)
        df = df[["CTV3_TERMID", "SCT_CONCEPTID"]]
        df = df.rename(columns={"CTV3_TERMID":"read3_code",
                                "SCT_CONCEPTID":"snomed_code"})
        df["snomed_code"] = df["snomed_code"].astype(str)
        df = df[~df["snomed_code"].str.match("^.*_.*$")] #remove snomed codes with '_'
        df.to_parquet("build/maps/processed/read3_code_to_snomed_code.parquet", index=False)
        print("Extracted ", "build/maps/processed/read3_code_to_snomed_code.parquet")
    
    def extract_nhs_read_browser():
        #r2 only
        df = simpledbf.Dbf5('build/maps/downloads/Standard/V2/ANCESTOR.DBF').to_dataframe()
        df = pd.concat([df['READCODE'], df['DESCENDANT']])
        df = pd.DataFrame(df.drop_duplicates())
        df = df.rename(columns={0:"read2_code"})
        df.to_parquet("build/maps/processed/read2_code.parquet", index=False)
        print("Extracted ", "build/maps/processed/read2_code.parquet")
    
        #r2 -> atc
        df = simpledbf.Dbf5('build/maps/downloads/Standard/V2/ATC.DBF').to_dataframe()
        df = df[["READCODE", "ATC"]]
        df = df.rename(columns={"READCODE":"read2_code", "ATC":"atc_code"})
        df.to_parquet("build/maps/processed/read2_code_to_atc_code.parquet", index=False)
        print("Extracted ", "build/maps/processed/read2_code_to_atc_code.parquet")
    
        #r2 -> icd10
        df = simpledbf.Dbf5('build/maps/downloads/Standard/V2/ICD10.DBF').to_dataframe()
        df = df[["READ_CODE", "TARG_CODE"]]
        df = df.rename(columns={"READ_CODE":"read2_code", "TARG_CODE":"icd10_code"})
        df = df[~df["icd10_code"].str.match("^.*-.*$")] #remove codes with '-'
        df = df[~df["read2_code"].str.match("^.*-.*$")] #remove codes with '-'
        df.to_parquet("build/maps/processed/read2_code_to_icd10_code.parquet", index=False)
        print("Extracted ", "build/maps/processed/read2_code_to_icd10_code.parquet")
    
        #r2 -> opcs4
        df = simpledbf.Dbf5('build/maps/downloads/Standard/V2/OPCS4V3.DBF').to_dataframe()
        df = df[["READ_CODE", "TARG_CODE"]]
        df = df.rename(columns={"READ_CODE":"read2_code", "TARG_CODE":"opcs4_code"})
        df = df[~df["opcs4_code"].str.match("^.*-.*$")] #remove codes with '-'
        df = df[~df["read2_code"].str.match("^.*-.*$")] #remove codes with '-'
        df.to_parquet("build/maps/processed/read2_code_to_opcs4_code.parquet", index=False)
        print("Extracted ", "build/maps/processed/read2_code_to_opcs4_code.parquet")
    
        #r3 only
        df = simpledbf.Dbf5('build/maps/downloads/Standard/V3/ANCESTOR.DBF').to_dataframe()
        df = pd.concat([df['READCODE'], df['DESCENDANT']])
        df = pd.DataFrame(df.drop_duplicates())
        df = df.rename(columns={0:"read3_code"})
        df.to_parquet("build/maps/processed/read3_code.parquet", index=False)
        print("Extracted ", "build/maps/processed/read3_code.parquet")
    
        #r3 -> icd10
        df = simpledbf.Dbf5('build/maps/downloads/Standard/V3/ICD10.DBF').to_dataframe()
        df = df[["READ_CODE", "TARG_CODE"]]
        df = df.rename(columns={"READ_CODE":"read3_code", "TARG_CODE":"icd10_code"})
        df = df[~df["icd10_code"].str.match("^.*-.*$")] #remove codes with '-'
        df = df[~df["read3_code"].str.match("^.*-.*$")] #remove codes with '-'
        df.to_parquet("build/maps/processed/read3_code_to_icd10_code.parquet", index=False)
        print("Extracted ", "build/maps/processed/read3_code_to_icd10_code.parquet")
    
        #r3 -> icd9
        # dbf = simpledbf.Dbf5('build/maps/downloads/Standard/V3/ICD9V3.DBF')
    
        #r3 -> opcs4
        df = simpledbf.Dbf5('build/maps/downloads/Standard/V3/OPCS4V3.DBF').to_dataframe()
        df = df[["READ_CODE", "TARG_CODE"]]
        df = df.rename(columns={"READ_CODE":"read3_code", "TARG_CODE":"opcs4_code"})
        df = df[~df["opcs4_code"].str.match("^.*-.*$")] #remove codes with '-'
        df = df[~df["read3_code"].str.match("^.*-.*$")] #remove codes with '-'
        df.to_parquet("build/maps/processed/read3_code_to_opcs4_code.parquet", index=False)
        print("Extracted ", "build/maps/processed/read3_code_to_opcs4_code.parquet")
    
    def create_build_directories(build_dir='build'):
        """Create build directories.""" 
        build_path = Path(build_dir)
    
        # Check if build directory exists
        create_build_dirs = False   
        if build_path.exists() and build_path.is_dir():
            user_input = input(f"The build directory {build_path} already exists. Do you want to delete and recreate all data? (y/n): ").strip().lower()
            if user_input == "y":
                # delete all build files
                shutil.rmtree(build_path)
                create_build_dirs = True
        else:
            create_build_dirs = True  
    
        if create_build_dirs:
            # create build directory
            build_path.mkdir(parents=True, exist_ok=True)
    
            # create maps directories
            maps_path = build_path / 'maps'
            maps_path.mkdir(parents=True, exist_ok=True)
            maps_download_path = maps_path / 'downloads'
            maps_download_path.mkdir(parents=True, exist_ok=True)            
            maps_processed_path = maps_path / 'processed'
            maps_processed_path.mkdir(parents=True,exist_ok=True)                                 
    
    def main():
        print("Processing TRUD files")
        
        parser = argparse.ArgumentParser(
            description="Download releases of items using the TRUD API.",
            formatter_class=argparse.ArgumentDefaultsHelpFormatter
        )
        parser.add_argument("--key", type=str, help="TRUD API Key")
    #     parser.add_argument("item_ids", nargs="+", help="Item IDs to download releases for.")
    #     parser.add_argument("-l", "--latest", action="store_true", help="Download only the latest release")
    #     parser.add_argument("-c", "--checksum", action="store_true", help="Also download the checksum file")
    #     parser.add_argument("-s", "--signature", action="store_true", help="Also download the signature file")
    #     parser.add_argument("-p", "--public_key", action="store_true", help="Also download the public key file")
        
        args = parser.parse_args()
    
        create_build_directories()
    
        items_latest = True
        items_folder = "build/maps/downloads"
        items = [
            {
                "id": 259,
                "name": "NHS ICD-10 5th Edition XML data files",
                "hash": "A4F7BBA6E86349AADD0F4696C5E91152EB99CC06121427FC359160439B9F883F",
                "extract": extract_icd10,
            },
            {
                "id": 119,
                "name": "OPCS-4 data files",
                "hash": "0615A2BF43FFEF94517F1D1E0C05493B627839F323F22C52CBCD8B40BF767CD3",
                "extract": extract_opsc4,
            },
            {
                "id": 9,
                "name": "NHS Data Migration",
                "hash": "D4317B3ADBA6E1247CF17F0B7CD2B8850FD36C0EA2923BF684EA6159F3A54765",
                "extract": extract_nhs_data_migrations,
            },
            {
                "id": 8,
                "name": "NHS Read Browser",
                "hash": "1FFF2CBF11D0E6D7FC6CC6F13DD52D2F459095C3D83A3F754E6C359F16913C5E",
                "extract": extract_nhs_read_browser,
            },
            # TODO: Download BNF from separate site? https://www.nhsbsa.nhs.uk/sites/default/files/2024-10/BNF%20Snomed%20Mapping%20data%2020241016.zip
        ]
        
        # Validate and process each item ID
        for item in items:
            item_id = item["id"]
            print(bcolors.HEADER, "---"+item["name"]+"---", bcolors.ENDC)
    
            releases = get_releases(item_id, API_KEY=args.key, latest=items_latest)
            if not releases:
                error_exit(f"No releases found for item {item_id}.")
    
            # Process each release in reverse order
            for release_ordinal, release in enumerate(releases[::-1], 1):
                # Download archive file
                file_destination = download_release_file(item_id, release_ordinal, release, "archive", items_folder=items_folder)
                
                # Optional files
                # if items.checksum:
                #     download_release_file(item["id"], release_ordinal, release, "checksum")
                # if items.signature:
                #     download_release_file(item["id"], release_ordinal, release, "signature")
                # if items.public_key:
                #     download_release_file(item["id"], release_ordinal, release, "publicKey", "public key")
    
                #Verify Hash if available
                if "hash" in item:
                    validate_download_hash(file_destination, item["hash"])
    
                #Unzip downloaded .zip
                unzip_download(file_destination, items_folder=items_folder)
    
                #Extract Tables to parquet
                if "extract" in item:
                    item["extract"]()
                
            print(f"Downloaded {release_ordinal} release(s) for item {item_id}.")
    
        print(f"Successfully completed TRUD processing")
    if __name__ == "__main__":
        main()