acmc.trud

  1import os
  2import sys
  3import requests
  4import argparse
  5import shutil
  6import hashlib
  7import zipfile
  8import pandas as pd
  9import simpledbf  # type: ignore
 10import yaml
 11from pathlib import Path
 12
 13# setup logging
 14from acmc import util, logging_config as lc
 15
 16logger = lc.setup_logger()
 17
 18# Constants
 19FQDN = "isd.digital.nhs.uk"
 20VOCAB_PATH = Path("./vocab/trud")
 21VERSION_FILE = "trud_version.yaml"
 22VERSION_PATH = VOCAB_PATH / VERSION_FILE
 23DOWNLOADS_PATH = VOCAB_PATH / "downloads"
 24PROCESSED_PATH = VOCAB_PATH / "processed"
 25
 26
 27def get_releases(item_id: str, API_KEY: str, latest=False) -> list:
 28    """Retrieve release information for an item from the TRUD API."""
 29    url = f"https://{FQDN}/trud/api/v1/keys/{API_KEY}/items/{item_id}/releases"
 30    if latest:
 31        url += "?latest"
 32
 33    response = requests.get(url)
 34    if response.status_code != 200:
 35        logger.error(
 36            f"Failed to fetch releases for item {item_id}. Status code: {response.status_code}, error {response.json()['message']}. If no releases found for API key, please ensure you are subscribed to the data release and that it is not pending approval"
 37        )
 38        response.raise_for_status()
 39
 40    data = response.json()
 41    if data.get("message") != "OK":
 42        msg = f"Unknown error occurred {data.get('message')}"
 43        logger.error(msg)
 44        raise Exception(msg)
 45
 46    return data.get("releases", [])
 47
 48
 49def download_release_file(
 50    item_id: str, release_ordinal: str, release: dict, file_json_prefix: str
 51) -> Path:
 52    """Download specified file type for a given release of an item."""
 53
 54    # check folder is a directory
 55    if not DOWNLOADS_PATH.is_dir():
 56        raise NotADirectoryError(
 57            f"Error: '{DOWNLOADS_PATH}' for TRUD resources is not a directory"
 58        )
 59
 60    file_type = file_json_prefix
 61    file_url = release.get(f"{file_json_prefix}FileUrl")
 62    if file_url == None:
 63        raise ValueError(f"File url not in json data {file_json_prefix}FileUrl")
 64
 65    file_name = release.get(f"{file_json_prefix}FileName")
 66    if file_name == None:
 67        raise ValueError(f"File name not in json data {file_json_prefix}FileName")
 68
 69    file_destination = DOWNLOADS_PATH / file_name
 70
 71    if not file_url or not file_name:
 72        raise ValueError(
 73            f"Missing {file_type} file information for release {release_ordinal} of item {item_id}."
 74        )
 75
 76    logger.info(
 77        f"Downloading item {item_id} {file_type} file: {file_name} from {file_url} to {file_destination}"
 78    )
 79    response = requests.get(file_url, stream=True)
 80
 81    if response.status_code == 200:
 82        with open(file_destination, "wb") as f:
 83            f.write(response.content)
 84    else:
 85        logger.error(
 86            f"Failed to download {file_type} file for item {item_id}. Status code: {response.status_code}"
 87        )
 88        response.raise_for_status()
 89
 90    return file_destination
 91
 92
 93def validate_download_hash(file_destination: str, item_hash: str):
 94    with open(file_destination, "rb") as f:
 95        hash = hashlib.sha256(f.read()).hexdigest()
 96    logger.debug(hash)
 97    if hash.upper() == item_hash.upper():
 98        logger.debug(f"Verified hash of {file_destination} {hash}")
 99    else:
100        msg = f"Could not validate origin of {file_destination}. The SHA-256 hash should be: {item_hash}, but got {hash} instead"
101        logger.error(msg)
102        raise ValueError(msg)
103
104
105def unzip_download(file_destination: str):
106    # check folder is a directory
107    if not DOWNLOADS_PATH.is_dir():
108        raise NotADirectoryError(
109            f"Error: '{DOWNLOADS_PATH}' for TRUD resoruces is not a directory"
110        )
111
112    with zipfile.ZipFile(file_destination, "r") as zip_ref:
113        zip_ref.extractall(DOWNLOADS_PATH)
114
115
116def extract_icd10():
117    # ICD10_edition5
118    file_path = (
119        DOWNLOADS_PATH
120        / "ICD10_Edition5_XML_20160401"
121        / "Content"
122        / "ICD10_Edition5_CodesAndTitlesAndMetadata_GB_20160401.xml"
123    )
124    df = pd.read_xml(file_path)
125    df = df[["CODE", "ALT_CODE", "DESCRIPTION"]]
126    df = df.rename(
127        columns={"CODE": "icd10", "ALT_CODE": "icd10_alt", "DESCRIPTION": "description"}
128    )
129    output_path = PROCESSED_PATH / "icd10.parquet"
130    df.to_parquet(output_path, index=False)
131    logger.info(f"Extracted: {output_path}")
132
133
134def extract_opsc4():
135    file_path = (
136        DOWNLOADS_PATH
137        / "OPCS410 Data files txt"
138        / "OPCS410 CodesAndTitles Nov 2022 V1.0.txt"
139    )
140
141    df = pd.read_csv(file_path, sep="\t", dtype=str, header=None)
142    df = df.rename(columns={0: "opcs4", 1: "description"})
143
144    output_path = PROCESSED_PATH / "opcs4.parquet"
145    df.to_parquet(output_path, index=False)
146    logger.info(f"Extracted: {output_path}")
147
148
149def extract_nhs_data_migrations():
150    # NHS Data Migrations
151
152    # snomed only
153    file_path = (
154        DOWNLOADS_PATH
155        / "Mapping Tables"
156        / "Updated"
157        / "Clinically Assured"
158        / "sctcremap_uk_20200401000001.txt"
159    )
160    df = pd.read_csv(file_path, sep="\t")
161    df = df[["SCT_CONCEPTID"]]
162    df = df.rename(columns={"SCT_CONCEPTID": "snomed"})
163    df = df.drop_duplicates()
164    df = df.astype(str)
165
166    output_path = PROCESSED_PATH / "snomed.parquet"
167    df.to_parquet(output_path, index=False)
168    logger.info(f"Extracted: {output_path}")
169
170    # r2 -> r3
171    file_path = (
172        DOWNLOADS_PATH
173        / "Mapping Tables"
174        / "Updated"
175        / "Clinically Assured"
176        / "rctctv3map_uk_20200401000001.txt"
177    )
178    df = pd.read_csv(file_path, sep="\t")
179    df = df[["V2_CONCEPTID", "CTV3_CONCEPTID"]]
180    df = df.rename(columns={"V2_CONCEPTID": "read2", "CTV3_CONCEPTID": "read3"})
181
182    output_path = PROCESSED_PATH / "read2_to_read3.parquet"
183    df.to_parquet(output_path, index=False)
184    logger.info(f"Extracted: {output_path}")
185
186    # r3->r2
187    file_path = (
188        DOWNLOADS_PATH
189        / "Mapping Tables"
190        / "Updated"
191        / "Clinically Assured"
192        / "ctv3rctmap_uk_20200401000002.txt"
193    )
194    df = pd.read_csv(file_path, sep="\t")
195    df = df[["CTV3_CONCEPTID", "V2_CONCEPTID"]]
196    df = df.rename(columns={"CTV3_CONCEPTID": "read3", "V2_CONCEPTID": "read2"})
197    df = df.drop_duplicates()
198    df = df[~df["read2"].str.match("^.*_.*$")]  # remove r2 codes with '_'
199
200    output_path = PROCESSED_PATH / "read3_to_read2.parquet"
201    df.to_parquet(output_path, index=False)
202    logger.info(f"Extracted: {output_path}")
203
204    # r2 -> snomed
205    file_path = (
206        DOWNLOADS_PATH
207        / "Mapping Tables"
208        / "Updated"
209        / "Clinically Assured"
210        / "rcsctmap2_uk_20200401000001.txt"
211    )
212    df = pd.read_csv(file_path, sep="\t", dtype=str)
213    df = df[["ReadCode", "ConceptId"]]
214    df = df.rename(columns={"ReadCode": "read2", "ConceptId": "snomed"})
215
216    output_path = PROCESSED_PATH / "read2_to_snomed.parquet"
217    df.to_parquet(output_path, index=False)
218    logger.info(f"Extracted: {output_path}")
219
220    # r3->snomed
221    file_path = (
222        DOWNLOADS_PATH
223        / "Mapping Tables"
224        / "Updated"
225        / "Clinically Assured"
226        / "ctv3sctmap2_uk_20200401000001.txt"
227    )
228    df = pd.read_csv(file_path, sep="\t", dtype=str)
229    df = df[["CTV3_TERMID", "SCT_CONCEPTID"]]
230    df = df.rename(columns={"CTV3_TERMID": "read3", "SCT_CONCEPTID": "snomed"})
231    df["snomed"] = df["snomed"].astype(str)
232    df = df[~df["snomed"].str.match("^.*_.*$")]  # remove snomed codes with '_'
233
234    output_path = PROCESSED_PATH / "read3_to_snomed.parquet"
235    df.to_parquet(output_path, index=False)
236    logger.info(f"Extracted: {output_path}")
237
238
239def extract_nhs_read_browser():
240    # r2 only
241    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ANCESTOR.DBF"
242    df = simpledbf.Dbf5(input_path).to_dataframe()
243    df = pd.concat([df["READCODE"], df["DESCENDANT"]])
244    df = pd.DataFrame(df.drop_duplicates())
245    df = df.rename(columns={0: "read2"})
246    output_path = PROCESSED_PATH / "read2.parquet"
247    df.to_parquet(output_path, index=False)
248    logger.info(f"Extracted: {output_path}")
249
250    # r2 -> atc
251    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ATC.DBF"
252    df = simpledbf.Dbf5(input_path).to_dataframe()
253    df = df[["READCODE", "ATC"]]
254    df = df.rename(columns={"READCODE": "read2", "ATC": "atc"})
255    output_path = PROCESSED_PATH / "read2_to_atc.parquet"
256    df.to_parquet(output_path, index=False)
257    logger.info(f"Extracted: {output_path}")
258
259    # r2 -> icd10
260    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ICD10.DBF"
261    df = simpledbf.Dbf5(input_path).to_dataframe()
262    df = df[["READ_CODE", "TARG_CODE"]]
263    df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "icd10"})
264    df = df[~df["icd10"].str.match("^.*-.*$")]  # remove codes with '-'
265    df = df[~df["read2"].str.match("^.*-.*$")]  # remove codes with '-'
266    output_path = PROCESSED_PATH / "read2_to_icd10.parquet"
267    df.to_parquet(output_path, index=False)
268    logger.info(f"Extracted: {output_path}")
269
270    # r2 -> opcs4
271    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "OPCS4V3.DBF"
272    df = simpledbf.Dbf5(input_path).to_dataframe()
273    df = df[["READ_CODE", "TARG_CODE"]]
274    df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "opcs4"})
275    df = df[~df["opcs4"].str.match("^.*-.*$")]  # remove codes with '-'
276    df = df[~df["read2"].str.match("^.*-.*$")]  # remove codes with '-'
277    output_path = PROCESSED_PATH / "read2_to_opcs4.parquet"
278    df.to_parquet(output_path, index=False)
279    logger.info(f"Extracted: {output_path}")
280
281    # r3 only
282    input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ANCESTOR.DBF"
283    df = simpledbf.Dbf5(input_path).to_dataframe()
284    df = pd.concat([df["READCODE"], df["DESCENDANT"]])
285    df = pd.DataFrame(df.drop_duplicates())
286    df = df.rename(columns={0: "read3"})
287    output_path = PROCESSED_PATH / "read3.parquet"
288    df.to_parquet(output_path, index=False)
289    logger.info(f"Extracted: {output_path}")
290
291    # r3 -> icd10
292    input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ICD10.DBF"
293    df = simpledbf.Dbf5(input_path).to_dataframe()
294    df = df[["READ_CODE", "TARG_CODE"]]
295    df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "icd10"})
296    df = df[~df["icd10"].str.match("^.*-.*$")]  # remove codes with '-'
297    df = df[~df["read3"].str.match("^.*-.*$")]  # remove codes with '-'
298    output_path = PROCESSED_PATH / "read3_to_icd10.parquet"
299    df.to_parquet(output_path, index=False)
300    logger.info(f"Extracted: {output_path}")
301
302    # r3 -> icd9
303    # dbf = simpledbf.Dbf5('build/maps/downloads/Standard/V3/ICD9V3.DBF')
304
305    # r3 -> opcs4
306    input_path = DOWNLOADS_PATH / "Standard" / "V3" / "OPCS4V3.DBF"
307    df = simpledbf.Dbf5(input_path).to_dataframe()
308    df = df[["READ_CODE", "TARG_CODE"]]
309    df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "opcs4"})
310    df = df[~df["opcs4"].str.match("^.*-.*$")]  # remove codes with '-'
311    df = df[~df["read3"].str.match("^.*-.*$")]  # remove codes with '-'
312    output_path = PROCESSED_PATH / "read3_to_opcs4.parquet"
313    df.to_parquet(output_path, index=False)
314    logger.info(f"Extracted: {output_path}")
315
316
317def create_map_directories():
318    """Create map directories."""
319
320    # Check if build directory exists
321    create_map_dirs = False
322    if VOCAB_PATH.exists():
323        user_input = (
324            input(
325                f"The map directory {VOCAB_PATH} already exists. Do you want to download and process trud data again? (y/n): "
326            )
327            .strip()
328            .lower()
329        )
330        if user_input == "y":
331            # delete all build files
332            shutil.rmtree(VOCAB_PATH)
333            create_map_dirs = True
334        elif user_input == "n":
335            logger.info("Exiting TRUD installation")
336            sys.exit(0)
337    else:
338        create_map_dirs = True
339
340    if create_map_dirs:
341        # create maps directories
342        VOCAB_PATH.mkdir(parents=True, exist_ok=True)
343        DOWNLOADS_PATH.mkdir(parents=True, exist_ok=True)
344        PROCESSED_PATH.mkdir(parents=True, exist_ok=True)
345
346
347def install():
348    logger.info(f"Installing TRUD")
349
350    # get TRUD api key from environment variable
351    api_key = os.getenv("ACMC_TRUD_API_KEY")
352    if not api_key:
353        raise ValueError(
354            "TRUD API KEY not found. Set the ACMC_TRUD_API_KEY environment variable."
355        )
356
357    create_map_directories()
358
359    items_latest = True
360    items = [
361        {
362            "id": 259,
363            "name": "NHS ICD-10 5th Edition XML data files",
364            "hash": "A4F7BBA6E86349AADD0F4696C5E91152EB99CC06121427FC359160439B9F883F",
365            "extract": extract_icd10,
366        },
367        {
368            "id": 119,
369            "name": "OPCS-4 data files",
370            "hash": "0615A2BF43FFEF94517F1D1E0C05493B627839F323F22C52CBCD8B40BF767CD3",
371            "extract": extract_opsc4,
372        },
373        {
374            "id": 9,
375            "name": "NHS Data Migration",
376            "hash": "D4317B3ADBA6E1247CF17F0B7CD2B8850FD36C0EA2923BF684EA6159F3A54765",
377            "extract": extract_nhs_data_migrations,
378        },
379        {
380            "id": 8,
381            "name": "NHS Read Browser",
382            "hash": "1FFF2CBF11D0E6D7FC6CC6F13DD52D2F459095C3D83A3F754E6C359F16913C5E",
383            "extract": extract_nhs_read_browser,
384        },
385        # TODO: Download BNF from separate site? https://www.nhsbsa.nhs.uk/sites/default/files/2024-10/BNF%20Snomed%20Mapping%20data%2020241016.zip
386    ]
387
388    # remove function from items to save versions
389    data = [{k: v for k, v in d.items() if k != "extract"} for d in items]
390    # save TRUD versions to file to main record of what was downloaded
391    with open(VERSION_PATH, "w") as file:
392        yaml.dump(
393            data,
394            file,
395            Dumper=util.QuotedDumper,
396            default_flow_style=False,
397            sort_keys=False,
398            default_style='"',
399        )
400
401    # Validate and process each item ID
402    for item in items:
403        item_id = item["id"]
404        logger.info(f"--- {item['name']} ---")
405
406        releases = get_releases(item_id, API_KEY=api_key, latest=items_latest)
407        if not releases:
408            raise ValueError(f"No releases found for item {item_id}.")
409
410        # Process each release in reverse order
411        for release_ordinal, release in enumerate(releases[::-1], 1):
412            # Download archive file
413            file_destination = download_release_file(
414                item_id, release_ordinal, release, "archive"
415            )
416
417            # Optional files
418            # if items.checksum:
419            #     download_release_file(item["id"], release_ordinal, release, "checksum")
420            # if items.signature:
421            #     download_release_file(item["id"], release_ordinal, release, "signature")
422            # if items.public_key:
423            #     download_release_file(item["id"], release_ordinal, release, "publicKey", "public key")
424
425            # Verify Hash if available
426            if "hash" in item:
427                validate_download_hash(file_destination, item["hash"])
428
429            # Unzip downloaded .zip
430            unzip_download(file_destination)
431
432            # Extract Tables to parquet
433            if "extract" in item:
434                item["extract"]()
435
436        logger.info(f"Downloaded {release_ordinal} release(s) for item {item_id}.")
437
438    logger.info(f"TRUD installation completed")
logger = <Logger acmc_logger (INFO)>
FQDN = 'isd.digital.nhs.uk'
VOCAB_PATH = PosixPath('vocab/trud')
VERSION_FILE = 'trud_version.yaml'
VERSION_PATH = PosixPath('vocab/trud/trud_version.yaml')
DOWNLOADS_PATH = PosixPath('vocab/trud/downloads')
PROCESSED_PATH = PosixPath('vocab/trud/processed')
def get_releases(item_id: str, API_KEY: str, latest=False) -> list:
28def get_releases(item_id: str, API_KEY: str, latest=False) -> list:
29    """Retrieve release information for an item from the TRUD API."""
30    url = f"https://{FQDN}/trud/api/v1/keys/{API_KEY}/items/{item_id}/releases"
31    if latest:
32        url += "?latest"
33
34    response = requests.get(url)
35    if response.status_code != 200:
36        logger.error(
37            f"Failed to fetch releases for item {item_id}. Status code: {response.status_code}, error {response.json()['message']}. If no releases found for API key, please ensure you are subscribed to the data release and that it is not pending approval"
38        )
39        response.raise_for_status()
40
41    data = response.json()
42    if data.get("message") != "OK":
43        msg = f"Unknown error occurred {data.get('message')}"
44        logger.error(msg)
45        raise Exception(msg)
46
47    return data.get("releases", [])

Retrieve release information for an item from the TRUD API.

def download_release_file( item_id: str, release_ordinal: str, release: dict, file_json_prefix: str) -> pathlib.Path:
50def download_release_file(
51    item_id: str, release_ordinal: str, release: dict, file_json_prefix: str
52) -> Path:
53    """Download specified file type for a given release of an item."""
54
55    # check folder is a directory
56    if not DOWNLOADS_PATH.is_dir():
57        raise NotADirectoryError(
58            f"Error: '{DOWNLOADS_PATH}' for TRUD resources is not a directory"
59        )
60
61    file_type = file_json_prefix
62    file_url = release.get(f"{file_json_prefix}FileUrl")
63    if file_url == None:
64        raise ValueError(f"File url not in json data {file_json_prefix}FileUrl")
65
66    file_name = release.get(f"{file_json_prefix}FileName")
67    if file_name == None:
68        raise ValueError(f"File name not in json data {file_json_prefix}FileName")
69
70    file_destination = DOWNLOADS_PATH / file_name
71
72    if not file_url or not file_name:
73        raise ValueError(
74            f"Missing {file_type} file information for release {release_ordinal} of item {item_id}."
75        )
76
77    logger.info(
78        f"Downloading item {item_id} {file_type} file: {file_name} from {file_url} to {file_destination}"
79    )
80    response = requests.get(file_url, stream=True)
81
82    if response.status_code == 200:
83        with open(file_destination, "wb") as f:
84            f.write(response.content)
85    else:
86        logger.error(
87            f"Failed to download {file_type} file for item {item_id}. Status code: {response.status_code}"
88        )
89        response.raise_for_status()
90
91    return file_destination

Download specified file type for a given release of an item.

def validate_download_hash(file_destination: str, item_hash: str):
 94def validate_download_hash(file_destination: str, item_hash: str):
 95    with open(file_destination, "rb") as f:
 96        hash = hashlib.sha256(f.read()).hexdigest()
 97    logger.debug(hash)
 98    if hash.upper() == item_hash.upper():
 99        logger.debug(f"Verified hash of {file_destination} {hash}")
100    else:
101        msg = f"Could not validate origin of {file_destination}. The SHA-256 hash should be: {item_hash}, but got {hash} instead"
102        logger.error(msg)
103        raise ValueError(msg)
def unzip_download(file_destination: str):
106def unzip_download(file_destination: str):
107    # check folder is a directory
108    if not DOWNLOADS_PATH.is_dir():
109        raise NotADirectoryError(
110            f"Error: '{DOWNLOADS_PATH}' for TRUD resoruces is not a directory"
111        )
112
113    with zipfile.ZipFile(file_destination, "r") as zip_ref:
114        zip_ref.extractall(DOWNLOADS_PATH)
def extract_icd10():
117def extract_icd10():
118    # ICD10_edition5
119    file_path = (
120        DOWNLOADS_PATH
121        / "ICD10_Edition5_XML_20160401"
122        / "Content"
123        / "ICD10_Edition5_CodesAndTitlesAndMetadata_GB_20160401.xml"
124    )
125    df = pd.read_xml(file_path)
126    df = df[["CODE", "ALT_CODE", "DESCRIPTION"]]
127    df = df.rename(
128        columns={"CODE": "icd10", "ALT_CODE": "icd10_alt", "DESCRIPTION": "description"}
129    )
130    output_path = PROCESSED_PATH / "icd10.parquet"
131    df.to_parquet(output_path, index=False)
132    logger.info(f"Extracted: {output_path}")
def extract_opsc4():
135def extract_opsc4():
136    file_path = (
137        DOWNLOADS_PATH
138        / "OPCS410 Data files txt"
139        / "OPCS410 CodesAndTitles Nov 2022 V1.0.txt"
140    )
141
142    df = pd.read_csv(file_path, sep="\t", dtype=str, header=None)
143    df = df.rename(columns={0: "opcs4", 1: "description"})
144
145    output_path = PROCESSED_PATH / "opcs4.parquet"
146    df.to_parquet(output_path, index=False)
147    logger.info(f"Extracted: {output_path}")
def extract_nhs_data_migrations():
150def extract_nhs_data_migrations():
151    # NHS Data Migrations
152
153    # snomed only
154    file_path = (
155        DOWNLOADS_PATH
156        / "Mapping Tables"
157        / "Updated"
158        / "Clinically Assured"
159        / "sctcremap_uk_20200401000001.txt"
160    )
161    df = pd.read_csv(file_path, sep="\t")
162    df = df[["SCT_CONCEPTID"]]
163    df = df.rename(columns={"SCT_CONCEPTID": "snomed"})
164    df = df.drop_duplicates()
165    df = df.astype(str)
166
167    output_path = PROCESSED_PATH / "snomed.parquet"
168    df.to_parquet(output_path, index=False)
169    logger.info(f"Extracted: {output_path}")
170
171    # r2 -> r3
172    file_path = (
173        DOWNLOADS_PATH
174        / "Mapping Tables"
175        / "Updated"
176        / "Clinically Assured"
177        / "rctctv3map_uk_20200401000001.txt"
178    )
179    df = pd.read_csv(file_path, sep="\t")
180    df = df[["V2_CONCEPTID", "CTV3_CONCEPTID"]]
181    df = df.rename(columns={"V2_CONCEPTID": "read2", "CTV3_CONCEPTID": "read3"})
182
183    output_path = PROCESSED_PATH / "read2_to_read3.parquet"
184    df.to_parquet(output_path, index=False)
185    logger.info(f"Extracted: {output_path}")
186
187    # r3->r2
188    file_path = (
189        DOWNLOADS_PATH
190        / "Mapping Tables"
191        / "Updated"
192        / "Clinically Assured"
193        / "ctv3rctmap_uk_20200401000002.txt"
194    )
195    df = pd.read_csv(file_path, sep="\t")
196    df = df[["CTV3_CONCEPTID", "V2_CONCEPTID"]]
197    df = df.rename(columns={"CTV3_CONCEPTID": "read3", "V2_CONCEPTID": "read2"})
198    df = df.drop_duplicates()
199    df = df[~df["read2"].str.match("^.*_.*$")]  # remove r2 codes with '_'
200
201    output_path = PROCESSED_PATH / "read3_to_read2.parquet"
202    df.to_parquet(output_path, index=False)
203    logger.info(f"Extracted: {output_path}")
204
205    # r2 -> snomed
206    file_path = (
207        DOWNLOADS_PATH
208        / "Mapping Tables"
209        / "Updated"
210        / "Clinically Assured"
211        / "rcsctmap2_uk_20200401000001.txt"
212    )
213    df = pd.read_csv(file_path, sep="\t", dtype=str)
214    df = df[["ReadCode", "ConceptId"]]
215    df = df.rename(columns={"ReadCode": "read2", "ConceptId": "snomed"})
216
217    output_path = PROCESSED_PATH / "read2_to_snomed.parquet"
218    df.to_parquet(output_path, index=False)
219    logger.info(f"Extracted: {output_path}")
220
221    # r3->snomed
222    file_path = (
223        DOWNLOADS_PATH
224        / "Mapping Tables"
225        / "Updated"
226        / "Clinically Assured"
227        / "ctv3sctmap2_uk_20200401000001.txt"
228    )
229    df = pd.read_csv(file_path, sep="\t", dtype=str)
230    df = df[["CTV3_TERMID", "SCT_CONCEPTID"]]
231    df = df.rename(columns={"CTV3_TERMID": "read3", "SCT_CONCEPTID": "snomed"})
232    df["snomed"] = df["snomed"].astype(str)
233    df = df[~df["snomed"].str.match("^.*_.*$")]  # remove snomed codes with '_'
234
235    output_path = PROCESSED_PATH / "read3_to_snomed.parquet"
236    df.to_parquet(output_path, index=False)
237    logger.info(f"Extracted: {output_path}")
def extract_nhs_read_browser():
240def extract_nhs_read_browser():
241    # r2 only
242    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ANCESTOR.DBF"
243    df = simpledbf.Dbf5(input_path).to_dataframe()
244    df = pd.concat([df["READCODE"], df["DESCENDANT"]])
245    df = pd.DataFrame(df.drop_duplicates())
246    df = df.rename(columns={0: "read2"})
247    output_path = PROCESSED_PATH / "read2.parquet"
248    df.to_parquet(output_path, index=False)
249    logger.info(f"Extracted: {output_path}")
250
251    # r2 -> atc
252    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ATC.DBF"
253    df = simpledbf.Dbf5(input_path).to_dataframe()
254    df = df[["READCODE", "ATC"]]
255    df = df.rename(columns={"READCODE": "read2", "ATC": "atc"})
256    output_path = PROCESSED_PATH / "read2_to_atc.parquet"
257    df.to_parquet(output_path, index=False)
258    logger.info(f"Extracted: {output_path}")
259
260    # r2 -> icd10
261    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ICD10.DBF"
262    df = simpledbf.Dbf5(input_path).to_dataframe()
263    df = df[["READ_CODE", "TARG_CODE"]]
264    df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "icd10"})
265    df = df[~df["icd10"].str.match("^.*-.*$")]  # remove codes with '-'
266    df = df[~df["read2"].str.match("^.*-.*$")]  # remove codes with '-'
267    output_path = PROCESSED_PATH / "read2_to_icd10.parquet"
268    df.to_parquet(output_path, index=False)
269    logger.info(f"Extracted: {output_path}")
270
271    # r2 -> opcs4
272    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "OPCS4V3.DBF"
273    df = simpledbf.Dbf5(input_path).to_dataframe()
274    df = df[["READ_CODE", "TARG_CODE"]]
275    df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "opcs4"})
276    df = df[~df["opcs4"].str.match("^.*-.*$")]  # remove codes with '-'
277    df = df[~df["read2"].str.match("^.*-.*$")]  # remove codes with '-'
278    output_path = PROCESSED_PATH / "read2_to_opcs4.parquet"
279    df.to_parquet(output_path, index=False)
280    logger.info(f"Extracted: {output_path}")
281
282    # r3 only
283    input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ANCESTOR.DBF"
284    df = simpledbf.Dbf5(input_path).to_dataframe()
285    df = pd.concat([df["READCODE"], df["DESCENDANT"]])
286    df = pd.DataFrame(df.drop_duplicates())
287    df = df.rename(columns={0: "read3"})
288    output_path = PROCESSED_PATH / "read3.parquet"
289    df.to_parquet(output_path, index=False)
290    logger.info(f"Extracted: {output_path}")
291
292    # r3 -> icd10
293    input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ICD10.DBF"
294    df = simpledbf.Dbf5(input_path).to_dataframe()
295    df = df[["READ_CODE", "TARG_CODE"]]
296    df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "icd10"})
297    df = df[~df["icd10"].str.match("^.*-.*$")]  # remove codes with '-'
298    df = df[~df["read3"].str.match("^.*-.*$")]  # remove codes with '-'
299    output_path = PROCESSED_PATH / "read3_to_icd10.parquet"
300    df.to_parquet(output_path, index=False)
301    logger.info(f"Extracted: {output_path}")
302
303    # r3 -> icd9
304    # dbf = simpledbf.Dbf5('build/maps/downloads/Standard/V3/ICD9V3.DBF')
305
306    # r3 -> opcs4
307    input_path = DOWNLOADS_PATH / "Standard" / "V3" / "OPCS4V3.DBF"
308    df = simpledbf.Dbf5(input_path).to_dataframe()
309    df = df[["READ_CODE", "TARG_CODE"]]
310    df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "opcs4"})
311    df = df[~df["opcs4"].str.match("^.*-.*$")]  # remove codes with '-'
312    df = df[~df["read3"].str.match("^.*-.*$")]  # remove codes with '-'
313    output_path = PROCESSED_PATH / "read3_to_opcs4.parquet"
314    df.to_parquet(output_path, index=False)
315    logger.info(f"Extracted: {output_path}")
def create_map_directories():
318def create_map_directories():
319    """Create map directories."""
320
321    # Check if build directory exists
322    create_map_dirs = False
323    if VOCAB_PATH.exists():
324        user_input = (
325            input(
326                f"The map directory {VOCAB_PATH} already exists. Do you want to download and process trud data again? (y/n): "
327            )
328            .strip()
329            .lower()
330        )
331        if user_input == "y":
332            # delete all build files
333            shutil.rmtree(VOCAB_PATH)
334            create_map_dirs = True
335        elif user_input == "n":
336            logger.info("Exiting TRUD installation")
337            sys.exit(0)
338    else:
339        create_map_dirs = True
340
341    if create_map_dirs:
342        # create maps directories
343        VOCAB_PATH.mkdir(parents=True, exist_ok=True)
344        DOWNLOADS_PATH.mkdir(parents=True, exist_ok=True)
345        PROCESSED_PATH.mkdir(parents=True, exist_ok=True)

Create map directories.

def install():
348def install():
349    logger.info(f"Installing TRUD")
350
351    # get TRUD api key from environment variable
352    api_key = os.getenv("ACMC_TRUD_API_KEY")
353    if not api_key:
354        raise ValueError(
355            "TRUD API KEY not found. Set the ACMC_TRUD_API_KEY environment variable."
356        )
357
358    create_map_directories()
359
360    items_latest = True
361    items = [
362        {
363            "id": 259,
364            "name": "NHS ICD-10 5th Edition XML data files",
365            "hash": "A4F7BBA6E86349AADD0F4696C5E91152EB99CC06121427FC359160439B9F883F",
366            "extract": extract_icd10,
367        },
368        {
369            "id": 119,
370            "name": "OPCS-4 data files",
371            "hash": "0615A2BF43FFEF94517F1D1E0C05493B627839F323F22C52CBCD8B40BF767CD3",
372            "extract": extract_opsc4,
373        },
374        {
375            "id": 9,
376            "name": "NHS Data Migration",
377            "hash": "D4317B3ADBA6E1247CF17F0B7CD2B8850FD36C0EA2923BF684EA6159F3A54765",
378            "extract": extract_nhs_data_migrations,
379        },
380        {
381            "id": 8,
382            "name": "NHS Read Browser",
383            "hash": "1FFF2CBF11D0E6D7FC6CC6F13DD52D2F459095C3D83A3F754E6C359F16913C5E",
384            "extract": extract_nhs_read_browser,
385        },
386        # TODO: Download BNF from separate site? https://www.nhsbsa.nhs.uk/sites/default/files/2024-10/BNF%20Snomed%20Mapping%20data%2020241016.zip
387    ]
388
389    # remove function from items to save versions
390    data = [{k: v for k, v in d.items() if k != "extract"} for d in items]
391    # save TRUD versions to file to main record of what was downloaded
392    with open(VERSION_PATH, "w") as file:
393        yaml.dump(
394            data,
395            file,
396            Dumper=util.QuotedDumper,
397            default_flow_style=False,
398            sort_keys=False,
399            default_style='"',
400        )
401
402    # Validate and process each item ID
403    for item in items:
404        item_id = item["id"]
405        logger.info(f"--- {item['name']} ---")
406
407        releases = get_releases(item_id, API_KEY=api_key, latest=items_latest)
408        if not releases:
409            raise ValueError(f"No releases found for item {item_id}.")
410
411        # Process each release in reverse order
412        for release_ordinal, release in enumerate(releases[::-1], 1):
413            # Download archive file
414            file_destination = download_release_file(
415                item_id, release_ordinal, release, "archive"
416            )
417
418            # Optional files
419            # if items.checksum:
420            #     download_release_file(item["id"], release_ordinal, release, "checksum")
421            # if items.signature:
422            #     download_release_file(item["id"], release_ordinal, release, "signature")
423            # if items.public_key:
424            #     download_release_file(item["id"], release_ordinal, release, "publicKey", "public key")
425
426            # Verify Hash if available
427            if "hash" in item:
428                validate_download_hash(file_destination, item["hash"])
429
430            # Unzip downloaded .zip
431            unzip_download(file_destination)
432
433            # Extract Tables to parquet
434            if "extract" in item:
435                item["extract"]()
436
437        logger.info(f"Downloaded {release_ordinal} release(s) for item {item_id}.")
438
439    logger.info(f"TRUD installation completed")