acmc.trud

trud.py module

This module provides functionality to manage installation of the NHS TRUD vocabularies.

  1"""
  2trud.py module
  3
  4This module provides functionality to manage installation of the NHS TRUD vocabularies.
  5
  6"""
  7
  8import os
  9import sys
 10import requests
 11import argparse
 12import shutil
 13import hashlib
 14import zipfile
 15import pandas as pd
 16import simpledbf  # type: ignore
 17import yaml
 18from pathlib import Path
 19from acmc import util, logging_config as lc
 20
 21# setup logging
 22_logger = lc.setup_logger()
 23
 24FQDN = "isd.digital.nhs.uk"
 25"""Fully Qualified Domain Name of NHS digital TRUD service API"""
 26
 27VOCAB_PATH = Path("./vocab/trud")
 28"""Default path to the TRUD vocabulary directory relative to the the acmc execution directory"""
 29
 30VERSION_FILE = "trud_version.yml"
 31"""TRUD version file"""
 32
 33VERSION_PATH = VOCAB_PATH / VERSION_FILE
 34"""Default path to the TRUD version file"""
 35
 36DOWNLOADS_PATH = VOCAB_PATH / "downloads"
 37"""Default path to the TRUD vocabulary downloads directory"""
 38
 39PROCESSED_PATH = VOCAB_PATH / "processed"
 40""" Default path to the processed TRUD mappings directory"""
 41
 42
 43def get_releases(item_id: str, API_KEY: str, latest=False) -> list:
 44    """Retrieve release information for an item from the TRUD API."""
 45    url = f"https://{FQDN}/trud/api/v1/keys/{API_KEY}/items/{item_id}/releases"
 46    if latest:
 47        url += "?latest"
 48
 49    response = requests.get(url)
 50    if response.status_code != 200:
 51        _logger.error(
 52            f"Failed to fetch releases for item {item_id}. Status code: {response.status_code}, error {response.json()['message']}. If no releases found for API key, please ensure you are subscribed to the data release and that it is not pending approval"
 53        )
 54        response.raise_for_status()
 55
 56    data = response.json()
 57    if data.get("message") != "OK":
 58        msg = f"Unknown error occurred {data.get('message')}"
 59        _logger.error(msg)
 60        raise Exception(msg)
 61
 62    return data.get("releases", [])
 63
 64
 65def download_release_file(
 66    item_id: str, release_ordinal: str, release: dict, file_json_prefix: str
 67) -> Path:
 68    """Download specified file type for a given release of an item."""
 69
 70    # check folder is a directory
 71    if not DOWNLOADS_PATH.is_dir():
 72        raise NotADirectoryError(
 73            f"Error: '{DOWNLOADS_PATH}' for TRUD resources is not a directory"
 74        )
 75
 76    file_type = file_json_prefix
 77    file_url = release.get(f"{file_json_prefix}FileUrl")
 78    if file_url == None:
 79        raise ValueError(f"File url not in json data {file_json_prefix}FileUrl")
 80
 81    file_name = release.get(f"{file_json_prefix}FileName")
 82    if file_name == None:
 83        raise ValueError(f"File name not in json data {file_json_prefix}FileName")
 84
 85    file_destination = DOWNLOADS_PATH / file_name
 86
 87    if not file_url or not file_name:
 88        raise ValueError(
 89            f"Missing {file_type} file information for release {release_ordinal} of item {item_id}."
 90        )
 91
 92    _logger.info(
 93        f"Downloading item {item_id} {file_type} file: {file_name} from {file_url} to {file_destination}"
 94    )
 95    response = requests.get(file_url, stream=True)
 96
 97    if response.status_code == 200:
 98        with open(file_destination, "wb") as f:
 99            f.write(response.content)
100    else:
101        _logger.error(
102            f"Failed to download {file_type} file for item {item_id}. Status code: {response.status_code}"
103        )
104        response.raise_for_status()
105
106    return file_destination
107
108
109def validate_download_hash(file_destination: str, item_hash: str):
110    with open(file_destination, "rb") as f:
111        hash = hashlib.sha256(f.read()).hexdigest()
112    _logger.debug(hash)
113    if hash.upper() == item_hash.upper():
114        _logger.debug(f"Verified hash of {file_destination} {hash}")
115    else:
116        msg = f"Could not validate origin of {file_destination}. The SHA-256 hash should be: {item_hash}, but got {hash} instead"
117        _logger.error(msg)
118        raise ValueError(msg)
119
120
121def unzip_download(file_destination: str):
122    # check folder is a directory
123    if not DOWNLOADS_PATH.is_dir():
124        raise NotADirectoryError(
125            f"Error: '{DOWNLOADS_PATH}' for TRUD resoruces is not a directory"
126        )
127
128    with zipfile.ZipFile(file_destination, "r") as zip_ref:
129        zip_ref.extractall(DOWNLOADS_PATH)
130
131
132def extract_icd10():
133    # ICD10_edition5
134    file_path = (
135        DOWNLOADS_PATH
136        / "ICD10_Edition5_XML_20160401"
137        / "Content"
138        / "ICD10_Edition5_CodesAndTitlesAndMetadata_GB_20160401.xml"
139    )
140    df = pd.read_xml(file_path)
141    df = df[["CODE", "ALT_CODE", "DESCRIPTION"]]
142    df = df.rename(
143        columns={"CODE": "icd10", "ALT_CODE": "icd10_alt", "DESCRIPTION": "description"}
144    )
145    output_path = PROCESSED_PATH / "icd10.parquet"
146    df.to_parquet(output_path, index=False)
147    _logger.info(f"Extracted: {output_path}")
148
149
150def extract_opsc4():
151    file_path = (
152        DOWNLOADS_PATH
153        / "OPCS410 Data files txt"
154        / "OPCS410 CodesAndTitles Nov 2022 V1.0.txt"
155    )
156
157    df = pd.read_csv(file_path, sep="\t", dtype=str, header=None)
158    df = df.rename(columns={0: "opcs4", 1: "description"})
159
160    output_path = PROCESSED_PATH / "opcs4.parquet"
161    df.to_parquet(output_path, index=False)
162    _logger.info(f"Extracted: {output_path}")
163
164
165def extract_nhs_data_migrations():
166    # NHS Data Migrations
167
168    # snomed only
169    file_path = (
170        DOWNLOADS_PATH
171        / "Mapping Tables"
172        / "Updated"
173        / "Clinically Assured"
174        / "sctcremap_uk_20200401000001.txt"
175    )
176    df = pd.read_csv(file_path, sep="\t")
177    df = df[["SCT_CONCEPTID"]]
178    df = df.rename(columns={"SCT_CONCEPTID": "snomed"})
179    df = df.drop_duplicates()
180    df = df.astype(str)
181
182    output_path = PROCESSED_PATH / "snomed.parquet"
183    df.to_parquet(output_path, index=False)
184    _logger.info(f"Extracted: {output_path}")
185
186    # r2 -> r3
187    file_path = (
188        DOWNLOADS_PATH
189        / "Mapping Tables"
190        / "Updated"
191        / "Clinically Assured"
192        / "rctctv3map_uk_20200401000001.txt"
193    )
194    df = pd.read_csv(file_path, sep="\t")
195    df = df[["V2_CONCEPTID", "CTV3_CONCEPTID"]]
196    df = df.rename(columns={"V2_CONCEPTID": "read2", "CTV3_CONCEPTID": "read3"})
197
198    output_path = PROCESSED_PATH / "read2_to_read3.parquet"
199    df.to_parquet(output_path, index=False)
200    _logger.info(f"Extracted: {output_path}")
201
202    # r3->r2
203    file_path = (
204        DOWNLOADS_PATH
205        / "Mapping Tables"
206        / "Updated"
207        / "Clinically Assured"
208        / "ctv3rctmap_uk_20200401000002.txt"
209    )
210    df = pd.read_csv(file_path, sep="\t")
211    df = df[["CTV3_CONCEPTID", "V2_CONCEPTID"]]
212    df = df.rename(columns={"CTV3_CONCEPTID": "read3", "V2_CONCEPTID": "read2"})
213    df = df.drop_duplicates()
214    df = df[~df["read2"].str.match("^.*_.*$")]  # remove r2 codes with '_'
215
216    output_path = PROCESSED_PATH / "read3_to_read2.parquet"
217    df.to_parquet(output_path, index=False)
218    _logger.info(f"Extracted: {output_path}")
219
220    # r2 -> snomed
221    file_path = (
222        DOWNLOADS_PATH
223        / "Mapping Tables"
224        / "Updated"
225        / "Clinically Assured"
226        / "rcsctmap2_uk_20200401000001.txt"
227    )
228    df = pd.read_csv(file_path, sep="\t", dtype=str)
229    df = df[["ReadCode", "ConceptId"]]
230    df = df.rename(columns={"ReadCode": "read2", "ConceptId": "snomed"})
231
232    output_path = PROCESSED_PATH / "read2_to_snomed.parquet"
233    df.to_parquet(output_path, index=False)
234    _logger.info(f"Extracted: {output_path}")
235
236    # r3->snomed
237    file_path = (
238        DOWNLOADS_PATH
239        / "Mapping Tables"
240        / "Updated"
241        / "Clinically Assured"
242        / "ctv3sctmap2_uk_20200401000001.txt"
243    )
244    df = pd.read_csv(file_path, sep="\t", dtype=str)
245    df = df[["CTV3_TERMID", "SCT_CONCEPTID"]]
246    df = df.rename(columns={"CTV3_TERMID": "read3", "SCT_CONCEPTID": "snomed"})
247    df["snomed"] = df["snomed"].astype(str)
248    df = df[~df["snomed"].str.match("^.*_.*$")]  # remove snomed codes with '_'
249
250    output_path = PROCESSED_PATH / "read3_to_snomed.parquet"
251    df.to_parquet(output_path, index=False)
252    _logger.info(f"Extracted: {output_path}")
253
254
255def extract_nhs_read_browser():
256    # r2 only
257    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ANCESTOR.DBF"
258    df = simpledbf.Dbf5(input_path).to_dataframe()
259    df = pd.concat([df["READCODE"], df["DESCENDANT"]])
260    df = pd.DataFrame(df.drop_duplicates())
261    df = df.rename(columns={0: "read2"})
262    output_path = PROCESSED_PATH / "read2.parquet"
263    df.to_parquet(output_path, index=False)
264    _logger.info(f"Extracted: {output_path}")
265
266    # r2 -> atc
267    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ATC.DBF"
268    df = simpledbf.Dbf5(input_path).to_dataframe()
269    df = df[["READCODE", "ATC"]]
270    df = df.rename(columns={"READCODE": "read2", "ATC": "atc"})
271    output_path = PROCESSED_PATH / "read2_to_atc.parquet"
272    df.to_parquet(output_path, index=False)
273    _logger.info(f"Extracted: {output_path}")
274
275    # r2 -> icd10
276    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ICD10.DBF"
277    df = simpledbf.Dbf5(input_path).to_dataframe()
278    df = df[["READ_CODE", "TARG_CODE"]]
279    df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "icd10"})
280    df = df[~df["icd10"].str.match("^.*-.*$")]  # remove codes with '-'
281    df = df[~df["read2"].str.match("^.*-.*$")]  # remove codes with '-'
282    output_path = PROCESSED_PATH / "read2_to_icd10.parquet"
283    df.to_parquet(output_path, index=False)
284    _logger.info(f"Extracted: {output_path}")
285
286    # r2 -> opcs4
287    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "OPCS4V3.DBF"
288    df = simpledbf.Dbf5(input_path).to_dataframe()
289    df = df[["READ_CODE", "TARG_CODE"]]
290    df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "opcs4"})
291    df = df[~df["opcs4"].str.match("^.*-.*$")]  # remove codes with '-'
292    df = df[~df["read2"].str.match("^.*-.*$")]  # remove codes with '-'
293    output_path = PROCESSED_PATH / "read2_to_opcs4.parquet"
294    df.to_parquet(output_path, index=False)
295    _logger.info(f"Extracted: {output_path}")
296
297    # r3 only
298    input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ANCESTOR.DBF"
299    df = simpledbf.Dbf5(input_path).to_dataframe()
300    df = pd.concat([df["READCODE"], df["DESCENDANT"]])
301    df = pd.DataFrame(df.drop_duplicates())
302    df = df.rename(columns={0: "read3"})
303    output_path = PROCESSED_PATH / "read3.parquet"
304    df.to_parquet(output_path, index=False)
305    _logger.info(f"Extracted: {output_path}")
306
307    # r3 -> icd10
308    input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ICD10.DBF"
309    df = simpledbf.Dbf5(input_path).to_dataframe()
310    df = df[["READ_CODE", "TARG_CODE"]]
311    df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "icd10"})
312    df = df[~df["icd10"].str.match("^.*-.*$")]  # remove codes with '-'
313    df = df[~df["read3"].str.match("^.*-.*$")]  # remove codes with '-'
314    output_path = PROCESSED_PATH / "read3_to_icd10.parquet"
315    df.to_parquet(output_path, index=False)
316    _logger.info(f"Extracted: {output_path}")
317
318    # r3 -> icd9
319    # dbf = simpledbf.Dbf5('build/maps/downloads/Standard/V3/ICD9V3.DBF')
320
321    # r3 -> opcs4
322    input_path = DOWNLOADS_PATH / "Standard" / "V3" / "OPCS4V3.DBF"
323    df = simpledbf.Dbf5(input_path).to_dataframe()
324    df = df[["READ_CODE", "TARG_CODE"]]
325    df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "opcs4"})
326    df = df[~df["opcs4"].str.match("^.*-.*$")]  # remove codes with '-'
327    df = df[~df["read3"].str.match("^.*-.*$")]  # remove codes with '-'
328    output_path = PROCESSED_PATH / "read3_to_opcs4.parquet"
329    df.to_parquet(output_path, index=False)
330    _logger.info(f"Extracted: {output_path}")
331
332
333def create_map_directories():
334    """Create map directories."""
335
336    # Check if build directory exists
337    create_map_dirs = False
338    if VOCAB_PATH.exists():
339        user_input = (
340            input(
341                f"The map directory {VOCAB_PATH} already exists. Do you want to download and process trud data again? (y/n): "
342            )
343            .strip()
344            .lower()
345        )
346        if user_input == "y":
347            # delete all build files
348            shutil.rmtree(VOCAB_PATH)
349            create_map_dirs = True
350        elif user_input == "n":
351            _logger.info("Exiting TRUD installation")
352            sys.exit(0)
353    else:
354        create_map_dirs = True
355
356    if create_map_dirs:
357        # create maps directories
358        VOCAB_PATH.mkdir(parents=True, exist_ok=True)
359        DOWNLOADS_PATH.mkdir(parents=True, exist_ok=True)
360        PROCESSED_PATH.mkdir(parents=True, exist_ok=True)
361
362
363def install():
364    _logger.info(f"Installing TRUD")
365
366    # get TRUD api key from environment variable
367    api_key = os.getenv("ACMC_TRUD_API_KEY")
368    if not api_key:
369        raise ValueError(
370            "TRUD API KEY not found. Set the ACMC_TRUD_API_KEY environment variable."
371        )
372
373    create_map_directories()
374
375    items_latest = True
376    items = [
377        {
378            "id": 259,
379            "name": "NHS ICD-10 5th Edition XML data files",
380            "hash": "A4F7BBA6E86349AADD0F4696C5E91152EB99CC06121427FC359160439B9F883F",
381            "extract": extract_icd10,
382        },
383        {
384            "id": 119,
385            "name": "OPCS-4 data files",
386            "hash": "0615A2BF43FFEF94517F1D1E0C05493B627839F323F22C52CBCD8B40BF767CD3",
387            "extract": extract_opsc4,
388        },
389        {
390            "id": 9,
391            "name": "NHS Data Migration",
392            "hash": "D4317B3ADBA6E1247CF17F0B7CD2B8850FD36C0EA2923BF684EA6159F3A54765",
393            "extract": extract_nhs_data_migrations,
394        },
395        {
396            "id": 8,
397            "name": "NHS Read Browser",
398            "hash": "1FFF2CBF11D0E6D7FC6CC6F13DD52D2F459095C3D83A3F754E6C359F16913C5E",
399            "extract": extract_nhs_read_browser,
400        },
401        # TODO: Download BNF from separate site? https://www.nhsbsa.nhs.uk/sites/default/files/2024-10/BNF%20Snomed%20Mapping%20data%2020241016.zip
402    ]
403
404    # remove function from items to save versions
405    data = [{k: v for k, v in d.items() if k != "extract"} for d in items]
406    # save TRUD versions to file to main record of what was downloaded
407    with open(VERSION_PATH, "w") as file:
408        yaml.dump(
409            data,
410            file,
411            Dumper=util.QuotedDumper,
412            default_flow_style=False,
413            sort_keys=False,
414            default_style='"',
415        )
416
417    # Validate and process each item ID
418    for item in items:
419        item_id = item["id"]
420        _logger.info(f"--- {item['name']} ---")
421
422        releases = get_releases(item_id, API_KEY=api_key, latest=items_latest)
423        if not releases:
424            raise ValueError(f"No releases found for item {item_id}.")
425
426        # Process each release in reverse order
427        for release_ordinal, release in enumerate(releases[::-1], 1):
428            # Download archive file
429            file_destination = download_release_file(
430                item_id, release_ordinal, release, "archive"
431            )
432
433            # Optional files
434            # if items.checksum:
435            #     download_release_file(item["id"], release_ordinal, release, "checksum")
436            # if items.signature:
437            #     download_release_file(item["id"], release_ordinal, release, "signature")
438            # if items.public_key:
439            #     download_release_file(item["id"], release_ordinal, release, "publicKey", "public key")
440
441            # Verify Hash if available
442            if "hash" in item:
443                validate_download_hash(file_destination, item["hash"])
444
445            # Unzip downloaded .zip
446            unzip_download(file_destination)
447
448            # Extract Tables to parquet
449            if "extract" in item:
450                item["extract"]()
451
452        _logger.info(f"Downloaded {release_ordinal} release(s) for item {item_id}.")
453
454    _logger.info(f"TRUD installation completed")
FQDN = 'isd.digital.nhs.uk'

Fully Qualified Domain Name of NHS digital TRUD service API

VOCAB_PATH = PosixPath('vocab/trud')

Default path to the TRUD vocabulary directory relative to the the acmc execution directory

VERSION_FILE = 'trud_version.yml'

TRUD version file

VERSION_PATH = PosixPath('vocab/trud/trud_version.yml')

Default path to the TRUD version file

DOWNLOADS_PATH = PosixPath('vocab/trud/downloads')

Default path to the TRUD vocabulary downloads directory

PROCESSED_PATH = PosixPath('vocab/trud/processed')

Default path to the processed TRUD mappings directory

def get_releases(item_id: str, API_KEY: str, latest=False) -> list:
44def get_releases(item_id: str, API_KEY: str, latest=False) -> list:
45    """Retrieve release information for an item from the TRUD API."""
46    url = f"https://{FQDN}/trud/api/v1/keys/{API_KEY}/items/{item_id}/releases"
47    if latest:
48        url += "?latest"
49
50    response = requests.get(url)
51    if response.status_code != 200:
52        _logger.error(
53            f"Failed to fetch releases for item {item_id}. Status code: {response.status_code}, error {response.json()['message']}. If no releases found for API key, please ensure you are subscribed to the data release and that it is not pending approval"
54        )
55        response.raise_for_status()
56
57    data = response.json()
58    if data.get("message") != "OK":
59        msg = f"Unknown error occurred {data.get('message')}"
60        _logger.error(msg)
61        raise Exception(msg)
62
63    return data.get("releases", [])

Retrieve release information for an item from the TRUD API.

def download_release_file( item_id: str, release_ordinal: str, release: dict, file_json_prefix: str) -> pathlib.Path:
 66def download_release_file(
 67    item_id: str, release_ordinal: str, release: dict, file_json_prefix: str
 68) -> Path:
 69    """Download specified file type for a given release of an item."""
 70
 71    # check folder is a directory
 72    if not DOWNLOADS_PATH.is_dir():
 73        raise NotADirectoryError(
 74            f"Error: '{DOWNLOADS_PATH}' for TRUD resources is not a directory"
 75        )
 76
 77    file_type = file_json_prefix
 78    file_url = release.get(f"{file_json_prefix}FileUrl")
 79    if file_url == None:
 80        raise ValueError(f"File url not in json data {file_json_prefix}FileUrl")
 81
 82    file_name = release.get(f"{file_json_prefix}FileName")
 83    if file_name == None:
 84        raise ValueError(f"File name not in json data {file_json_prefix}FileName")
 85
 86    file_destination = DOWNLOADS_PATH / file_name
 87
 88    if not file_url or not file_name:
 89        raise ValueError(
 90            f"Missing {file_type} file information for release {release_ordinal} of item {item_id}."
 91        )
 92
 93    _logger.info(
 94        f"Downloading item {item_id} {file_type} file: {file_name} from {file_url} to {file_destination}"
 95    )
 96    response = requests.get(file_url, stream=True)
 97
 98    if response.status_code == 200:
 99        with open(file_destination, "wb") as f:
100            f.write(response.content)
101    else:
102        _logger.error(
103            f"Failed to download {file_type} file for item {item_id}. Status code: {response.status_code}"
104        )
105        response.raise_for_status()
106
107    return file_destination

Download specified file type for a given release of an item.

def validate_download_hash(file_destination: str, item_hash: str):
110def validate_download_hash(file_destination: str, item_hash: str):
111    with open(file_destination, "rb") as f:
112        hash = hashlib.sha256(f.read()).hexdigest()
113    _logger.debug(hash)
114    if hash.upper() == item_hash.upper():
115        _logger.debug(f"Verified hash of {file_destination} {hash}")
116    else:
117        msg = f"Could not validate origin of {file_destination}. The SHA-256 hash should be: {item_hash}, but got {hash} instead"
118        _logger.error(msg)
119        raise ValueError(msg)
def unzip_download(file_destination: str):
122def unzip_download(file_destination: str):
123    # check folder is a directory
124    if not DOWNLOADS_PATH.is_dir():
125        raise NotADirectoryError(
126            f"Error: '{DOWNLOADS_PATH}' for TRUD resoruces is not a directory"
127        )
128
129    with zipfile.ZipFile(file_destination, "r") as zip_ref:
130        zip_ref.extractall(DOWNLOADS_PATH)
def extract_icd10():
133def extract_icd10():
134    # ICD10_edition5
135    file_path = (
136        DOWNLOADS_PATH
137        / "ICD10_Edition5_XML_20160401"
138        / "Content"
139        / "ICD10_Edition5_CodesAndTitlesAndMetadata_GB_20160401.xml"
140    )
141    df = pd.read_xml(file_path)
142    df = df[["CODE", "ALT_CODE", "DESCRIPTION"]]
143    df = df.rename(
144        columns={"CODE": "icd10", "ALT_CODE": "icd10_alt", "DESCRIPTION": "description"}
145    )
146    output_path = PROCESSED_PATH / "icd10.parquet"
147    df.to_parquet(output_path, index=False)
148    _logger.info(f"Extracted: {output_path}")
def extract_opsc4():
151def extract_opsc4():
152    file_path = (
153        DOWNLOADS_PATH
154        / "OPCS410 Data files txt"
155        / "OPCS410 CodesAndTitles Nov 2022 V1.0.txt"
156    )
157
158    df = pd.read_csv(file_path, sep="\t", dtype=str, header=None)
159    df = df.rename(columns={0: "opcs4", 1: "description"})
160
161    output_path = PROCESSED_PATH / "opcs4.parquet"
162    df.to_parquet(output_path, index=False)
163    _logger.info(f"Extracted: {output_path}")
def extract_nhs_data_migrations():
166def extract_nhs_data_migrations():
167    # NHS Data Migrations
168
169    # snomed only
170    file_path = (
171        DOWNLOADS_PATH
172        / "Mapping Tables"
173        / "Updated"
174        / "Clinically Assured"
175        / "sctcremap_uk_20200401000001.txt"
176    )
177    df = pd.read_csv(file_path, sep="\t")
178    df = df[["SCT_CONCEPTID"]]
179    df = df.rename(columns={"SCT_CONCEPTID": "snomed"})
180    df = df.drop_duplicates()
181    df = df.astype(str)
182
183    output_path = PROCESSED_PATH / "snomed.parquet"
184    df.to_parquet(output_path, index=False)
185    _logger.info(f"Extracted: {output_path}")
186
187    # r2 -> r3
188    file_path = (
189        DOWNLOADS_PATH
190        / "Mapping Tables"
191        / "Updated"
192        / "Clinically Assured"
193        / "rctctv3map_uk_20200401000001.txt"
194    )
195    df = pd.read_csv(file_path, sep="\t")
196    df = df[["V2_CONCEPTID", "CTV3_CONCEPTID"]]
197    df = df.rename(columns={"V2_CONCEPTID": "read2", "CTV3_CONCEPTID": "read3"})
198
199    output_path = PROCESSED_PATH / "read2_to_read3.parquet"
200    df.to_parquet(output_path, index=False)
201    _logger.info(f"Extracted: {output_path}")
202
203    # r3->r2
204    file_path = (
205        DOWNLOADS_PATH
206        / "Mapping Tables"
207        / "Updated"
208        / "Clinically Assured"
209        / "ctv3rctmap_uk_20200401000002.txt"
210    )
211    df = pd.read_csv(file_path, sep="\t")
212    df = df[["CTV3_CONCEPTID", "V2_CONCEPTID"]]
213    df = df.rename(columns={"CTV3_CONCEPTID": "read3", "V2_CONCEPTID": "read2"})
214    df = df.drop_duplicates()
215    df = df[~df["read2"].str.match("^.*_.*$")]  # remove r2 codes with '_'
216
217    output_path = PROCESSED_PATH / "read3_to_read2.parquet"
218    df.to_parquet(output_path, index=False)
219    _logger.info(f"Extracted: {output_path}")
220
221    # r2 -> snomed
222    file_path = (
223        DOWNLOADS_PATH
224        / "Mapping Tables"
225        / "Updated"
226        / "Clinically Assured"
227        / "rcsctmap2_uk_20200401000001.txt"
228    )
229    df = pd.read_csv(file_path, sep="\t", dtype=str)
230    df = df[["ReadCode", "ConceptId"]]
231    df = df.rename(columns={"ReadCode": "read2", "ConceptId": "snomed"})
232
233    output_path = PROCESSED_PATH / "read2_to_snomed.parquet"
234    df.to_parquet(output_path, index=False)
235    _logger.info(f"Extracted: {output_path}")
236
237    # r3->snomed
238    file_path = (
239        DOWNLOADS_PATH
240        / "Mapping Tables"
241        / "Updated"
242        / "Clinically Assured"
243        / "ctv3sctmap2_uk_20200401000001.txt"
244    )
245    df = pd.read_csv(file_path, sep="\t", dtype=str)
246    df = df[["CTV3_TERMID", "SCT_CONCEPTID"]]
247    df = df.rename(columns={"CTV3_TERMID": "read3", "SCT_CONCEPTID": "snomed"})
248    df["snomed"] = df["snomed"].astype(str)
249    df = df[~df["snomed"].str.match("^.*_.*$")]  # remove snomed codes with '_'
250
251    output_path = PROCESSED_PATH / "read3_to_snomed.parquet"
252    df.to_parquet(output_path, index=False)
253    _logger.info(f"Extracted: {output_path}")
def extract_nhs_read_browser():
256def extract_nhs_read_browser():
257    # r2 only
258    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ANCESTOR.DBF"
259    df = simpledbf.Dbf5(input_path).to_dataframe()
260    df = pd.concat([df["READCODE"], df["DESCENDANT"]])
261    df = pd.DataFrame(df.drop_duplicates())
262    df = df.rename(columns={0: "read2"})
263    output_path = PROCESSED_PATH / "read2.parquet"
264    df.to_parquet(output_path, index=False)
265    _logger.info(f"Extracted: {output_path}")
266
267    # r2 -> atc
268    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ATC.DBF"
269    df = simpledbf.Dbf5(input_path).to_dataframe()
270    df = df[["READCODE", "ATC"]]
271    df = df.rename(columns={"READCODE": "read2", "ATC": "atc"})
272    output_path = PROCESSED_PATH / "read2_to_atc.parquet"
273    df.to_parquet(output_path, index=False)
274    _logger.info(f"Extracted: {output_path}")
275
276    # r2 -> icd10
277    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ICD10.DBF"
278    df = simpledbf.Dbf5(input_path).to_dataframe()
279    df = df[["READ_CODE", "TARG_CODE"]]
280    df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "icd10"})
281    df = df[~df["icd10"].str.match("^.*-.*$")]  # remove codes with '-'
282    df = df[~df["read2"].str.match("^.*-.*$")]  # remove codes with '-'
283    output_path = PROCESSED_PATH / "read2_to_icd10.parquet"
284    df.to_parquet(output_path, index=False)
285    _logger.info(f"Extracted: {output_path}")
286
287    # r2 -> opcs4
288    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "OPCS4V3.DBF"
289    df = simpledbf.Dbf5(input_path).to_dataframe()
290    df = df[["READ_CODE", "TARG_CODE"]]
291    df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "opcs4"})
292    df = df[~df["opcs4"].str.match("^.*-.*$")]  # remove codes with '-'
293    df = df[~df["read2"].str.match("^.*-.*$")]  # remove codes with '-'
294    output_path = PROCESSED_PATH / "read2_to_opcs4.parquet"
295    df.to_parquet(output_path, index=False)
296    _logger.info(f"Extracted: {output_path}")
297
298    # r3 only
299    input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ANCESTOR.DBF"
300    df = simpledbf.Dbf5(input_path).to_dataframe()
301    df = pd.concat([df["READCODE"], df["DESCENDANT"]])
302    df = pd.DataFrame(df.drop_duplicates())
303    df = df.rename(columns={0: "read3"})
304    output_path = PROCESSED_PATH / "read3.parquet"
305    df.to_parquet(output_path, index=False)
306    _logger.info(f"Extracted: {output_path}")
307
308    # r3 -> icd10
309    input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ICD10.DBF"
310    df = simpledbf.Dbf5(input_path).to_dataframe()
311    df = df[["READ_CODE", "TARG_CODE"]]
312    df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "icd10"})
313    df = df[~df["icd10"].str.match("^.*-.*$")]  # remove codes with '-'
314    df = df[~df["read3"].str.match("^.*-.*$")]  # remove codes with '-'
315    output_path = PROCESSED_PATH / "read3_to_icd10.parquet"
316    df.to_parquet(output_path, index=False)
317    _logger.info(f"Extracted: {output_path}")
318
319    # r3 -> icd9
320    # dbf = simpledbf.Dbf5('build/maps/downloads/Standard/V3/ICD9V3.DBF')
321
322    # r3 -> opcs4
323    input_path = DOWNLOADS_PATH / "Standard" / "V3" / "OPCS4V3.DBF"
324    df = simpledbf.Dbf5(input_path).to_dataframe()
325    df = df[["READ_CODE", "TARG_CODE"]]
326    df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "opcs4"})
327    df = df[~df["opcs4"].str.match("^.*-.*$")]  # remove codes with '-'
328    df = df[~df["read3"].str.match("^.*-.*$")]  # remove codes with '-'
329    output_path = PROCESSED_PATH / "read3_to_opcs4.parquet"
330    df.to_parquet(output_path, index=False)
331    _logger.info(f"Extracted: {output_path}")
def create_map_directories():
334def create_map_directories():
335    """Create map directories."""
336
337    # Check if build directory exists
338    create_map_dirs = False
339    if VOCAB_PATH.exists():
340        user_input = (
341            input(
342                f"The map directory {VOCAB_PATH} already exists. Do you want to download and process trud data again? (y/n): "
343            )
344            .strip()
345            .lower()
346        )
347        if user_input == "y":
348            # delete all build files
349            shutil.rmtree(VOCAB_PATH)
350            create_map_dirs = True
351        elif user_input == "n":
352            _logger.info("Exiting TRUD installation")
353            sys.exit(0)
354    else:
355        create_map_dirs = True
356
357    if create_map_dirs:
358        # create maps directories
359        VOCAB_PATH.mkdir(parents=True, exist_ok=True)
360        DOWNLOADS_PATH.mkdir(parents=True, exist_ok=True)
361        PROCESSED_PATH.mkdir(parents=True, exist_ok=True)

Create map directories.

def install():
364def install():
365    _logger.info(f"Installing TRUD")
366
367    # get TRUD api key from environment variable
368    api_key = os.getenv("ACMC_TRUD_API_KEY")
369    if not api_key:
370        raise ValueError(
371            "TRUD API KEY not found. Set the ACMC_TRUD_API_KEY environment variable."
372        )
373
374    create_map_directories()
375
376    items_latest = True
377    items = [
378        {
379            "id": 259,
380            "name": "NHS ICD-10 5th Edition XML data files",
381            "hash": "A4F7BBA6E86349AADD0F4696C5E91152EB99CC06121427FC359160439B9F883F",
382            "extract": extract_icd10,
383        },
384        {
385            "id": 119,
386            "name": "OPCS-4 data files",
387            "hash": "0615A2BF43FFEF94517F1D1E0C05493B627839F323F22C52CBCD8B40BF767CD3",
388            "extract": extract_opsc4,
389        },
390        {
391            "id": 9,
392            "name": "NHS Data Migration",
393            "hash": "D4317B3ADBA6E1247CF17F0B7CD2B8850FD36C0EA2923BF684EA6159F3A54765",
394            "extract": extract_nhs_data_migrations,
395        },
396        {
397            "id": 8,
398            "name": "NHS Read Browser",
399            "hash": "1FFF2CBF11D0E6D7FC6CC6F13DD52D2F459095C3D83A3F754E6C359F16913C5E",
400            "extract": extract_nhs_read_browser,
401        },
402        # TODO: Download BNF from separate site? https://www.nhsbsa.nhs.uk/sites/default/files/2024-10/BNF%20Snomed%20Mapping%20data%2020241016.zip
403    ]
404
405    # remove function from items to save versions
406    data = [{k: v for k, v in d.items() if k != "extract"} for d in items]
407    # save TRUD versions to file to main record of what was downloaded
408    with open(VERSION_PATH, "w") as file:
409        yaml.dump(
410            data,
411            file,
412            Dumper=util.QuotedDumper,
413            default_flow_style=False,
414            sort_keys=False,
415            default_style='"',
416        )
417
418    # Validate and process each item ID
419    for item in items:
420        item_id = item["id"]
421        _logger.info(f"--- {item['name']} ---")
422
423        releases = get_releases(item_id, API_KEY=api_key, latest=items_latest)
424        if not releases:
425            raise ValueError(f"No releases found for item {item_id}.")
426
427        # Process each release in reverse order
428        for release_ordinal, release in enumerate(releases[::-1], 1):
429            # Download archive file
430            file_destination = download_release_file(
431                item_id, release_ordinal, release, "archive"
432            )
433
434            # Optional files
435            # if items.checksum:
436            #     download_release_file(item["id"], release_ordinal, release, "checksum")
437            # if items.signature:
438            #     download_release_file(item["id"], release_ordinal, release, "signature")
439            # if items.public_key:
440            #     download_release_file(item["id"], release_ordinal, release, "publicKey", "public key")
441
442            # Verify Hash if available
443            if "hash" in item:
444                validate_download_hash(file_destination, item["hash"])
445
446            # Unzip downloaded .zip
447            unzip_download(file_destination)
448
449            # Extract Tables to parquet
450            if "extract" in item:
451                item["extract"]()
452
453        _logger.info(f"Downloaded {release_ordinal} release(s) for item {item_id}.")
454
455    _logger.info(f"TRUD installation completed")