acmc.trud

  1import os
  2import sys
  3import requests
  4import argparse
  5import shutil
  6import hashlib
  7import zipfile
  8import pandas as pd
  9import simpledbf  # type: ignore
 10import yaml
 11from pathlib import Path
 12
 13# setup logging
 14from acmc import util, logging_config as lc
 15
 16logger = lc.setup_logger()
 17
 18# Constants
 19FQDN = "isd.digital.nhs.uk"
 20VOCAB_PATH = Path("./vocab/trud")
 21VERSION_FILE = "trud_version.yaml"
 22VERSION_PATH = VOCAB_PATH / VERSION_FILE
 23DOWNLOADS_PATH = VOCAB_PATH / "downloads"
 24PROCESSED_PATH = VOCAB_PATH / "processed"
 25
 26
 27def get_releases(item_id: str, API_KEY: str, latest=False) -> list:
 28    """Retrieve release information for an item from the TRUD API."""
 29    url = f"https://{FQDN}/trud/api/v1/keys/{API_KEY}/items/{item_id}/releases"
 30    if latest:
 31        url += "?latest"
 32
 33    response = requests.get(url)
 34    if response.status_code != 200:
 35        logger.error(
 36            f"Failed to fetch releases for item {item_id}. Status code: {response.status_code}, error {response.json()['message']}. If no releases found for API key, please ensure you are subscribed to the data release and that it is not pending approval"
 37        )
 38        response.raise_for_status()
 39
 40    data = response.json()
 41    if data.get("message") != "OK":
 42        msg = f"Unknown error occurred {data.get('message')}"
 43        logger.error(msg)
 44        raise Exception(msg)
 45
 46    return data.get("releases", [])
 47
 48
 49def download_release_file(
 50    item_id: str,
 51    release_ordinal: str,
 52    release: dict,
 53    file_json_prefix: str,
 54    file_type=None,
 55) -> Path:
 56    """Download specified file type for a given release of an item."""
 57
 58    # check folder is a directory
 59    if not DOWNLOADS_PATH.is_dir():
 60        raise NotADirectoryError(
 61            f"Error: '{DOWNLOADS_PATH}' for TRUD resources is not a directory"
 62        )
 63
 64    file_type = file_type or file_json_prefix
 65    file_url = release.get(f"{file_json_prefix}FileUrl")
 66    file_name = release.get(f"{file_json_prefix}FileName")
 67    file_destination = DOWNLOADS_PATH / file_name
 68
 69    if not file_url or not file_name:
 70        raise ValueError(
 71            f"Missing {file_type} file information for release {release_ordinal} of item {item_id}."
 72        )
 73
 74    logger.info(
 75        f"Downloading item {item_id} {file_type} file: {file_name} from {file_url} to {file_destination}"
 76    )
 77    response = requests.get(file_url, stream=True)
 78
 79    if response.status_code == 200:
 80        with open(file_destination, "wb") as f:
 81            f.write(response.content)
 82    else:
 83        logger.error(
 84            f"Failed to download {file_type} file for item {item_id}. Status code: {response.status_code}"
 85        )
 86        response.raise_for_status()
 87
 88    return file_destination
 89
 90
 91def validate_download_hash(file_destination: str, item_hash: str):
 92    with open(file_destination, "rb") as f:
 93        hash = hashlib.sha256(f.read()).hexdigest()
 94    logger.debug(hash)
 95    if hash.upper() == item_hash.upper():
 96        logger.debug(f"Verified hash of {file_destination} {hash}")
 97    else:
 98        msg = f"Could not validate origin of {file_destination}. The SHA-256 hash should be: {item_hash}, but got {hash} instead"
 99        logger.error(msg)
100        raise ValueError(msg)
101
102
103def unzip_download(file_destination: str):
104
105    # check folder is a directory
106    if not DOWNLOADS_PATH.is_dir():
107        raise NotADirectoryError(
108            f"Error: '{DOWNLOADS_PATH}' for TRUD resoruces is not a directory"
109        )
110
111    with zipfile.ZipFile(file_destination, "r") as zip_ref:
112        zip_ref.extractall(DOWNLOADS_PATH)
113
114
115def extract_icd10():
116    # ICD10_edition5
117    file_path = (
118        DOWNLOADS_PATH
119        / "ICD10_Edition5_XML_20160401"
120        / "Content"
121        / "ICD10_Edition5_CodesAndTitlesAndMetadata_GB_20160401.xml"
122    )
123    df = pd.read_xml(file_path)
124    df = df[["CODE", "ALT_CODE", "DESCRIPTION"]]
125    df = df.rename(
126        columns={"CODE": "icd10", "ALT_CODE": "icd10_alt", "DESCRIPTION": "description"}
127    )
128    output_path = PROCESSED_PATH / "icd10.parquet"
129    df.to_parquet(output_path, index=False)
130    logger.info(f"Extracted: {output_path}")
131
132
133def extract_opsc4():
134    file_path = (
135        DOWNLOADS_PATH
136        / "OPCS410 Data files txt"
137        / "OPCS410 CodesAndTitles Nov 2022 V1.0.txt"
138    )
139
140    df = pd.read_csv(file_path, sep="\t", dtype=str, header=None)
141    df = df.rename(columns={0: "opcs4", 1: "description"})
142
143    output_path = PROCESSED_PATH / "opcs4.parquet"
144    df.to_parquet(output_path, index=False)
145    logger.info(f"Extracted: {output_path}")
146
147
148def extract_nhs_data_migrations():
149    # NHS Data Migrations
150
151    # snomed only
152    file_path = (
153        DOWNLOADS_PATH
154        / "Mapping Tables"
155        / "Updated"
156        / "Clinically Assured"
157        / "sctcremap_uk_20200401000001.txt"
158    )
159    df = pd.read_csv(file_path, sep="\t")
160    df = df[["SCT_CONCEPTID"]]
161    df = df.rename(columns={"SCT_CONCEPTID": "snomed"})
162    df = df.drop_duplicates()
163    df = df.astype(str)
164
165    output_path = PROCESSED_PATH / "snomed.parquet"
166    df.to_parquet(output_path, index=False)
167    logger.info(f"Extracted: {output_path}")
168
169    # r2 -> r3
170    file_path = (
171        DOWNLOADS_PATH
172        / "Mapping Tables"
173        / "Updated"
174        / "Clinically Assured"
175        / "rctctv3map_uk_20200401000001.txt"
176    )
177    df = pd.read_csv(file_path, sep="\t")
178    df = df[["V2_CONCEPTID", "CTV3_CONCEPTID"]]
179    df = df.rename(columns={"V2_CONCEPTID": "read2", "CTV3_CONCEPTID": "read3"})
180
181    output_path = PROCESSED_PATH / "read2_to_read3.parquet"
182    df.to_parquet(output_path, index=False)
183    logger.info(f"Extracted: {output_path}")
184
185    # r3->r2
186    file_path = (
187        DOWNLOADS_PATH
188        / "Mapping Tables"
189        / "Updated"
190        / "Clinically Assured"
191        / "ctv3rctmap_uk_20200401000002.txt"
192    )
193    df = pd.read_csv(file_path, sep="\t")
194    df = df[["CTV3_CONCEPTID", "V2_CONCEPTID"]]
195    df = df.rename(columns={"CTV3_CONCEPTID": "read3", "V2_CONCEPTID": "read2"})
196    df = df.drop_duplicates()
197    df = df[~df["read2"].str.match("^.*_.*$")]  # remove r2 codes with '_'
198
199    output_path = PROCESSED_PATH / "read3_to_read2.parquet"
200    df.to_parquet(output_path, index=False)
201    logger.info(f"Extracted: {output_path}")
202
203    # r2 -> snomed
204    file_path = (
205        DOWNLOADS_PATH
206        / "Mapping Tables"
207        / "Updated"
208        / "Clinically Assured"
209        / "rcsctmap2_uk_20200401000001.txt"
210    )
211    df = pd.read_csv(file_path, sep="\t", dtype=str)
212    df = df[["ReadCode", "ConceptId"]]
213    df = df.rename(columns={"ReadCode": "read2", "ConceptId": "snomed"})
214
215    output_path = PROCESSED_PATH / "read2_to_snomed.parquet"
216    df.to_parquet(output_path, index=False)
217    logger.info(f"Extracted: {output_path}")
218
219    # r3->snomed
220    file_path = (
221        DOWNLOADS_PATH
222        / "Mapping Tables"
223        / "Updated"
224        / "Clinically Assured"
225        / "ctv3sctmap2_uk_20200401000001.txt"
226    )
227    df = pd.read_csv(file_path, sep="\t", dtype=str)
228    df = df[["CTV3_TERMID", "SCT_CONCEPTID"]]
229    df = df.rename(columns={"CTV3_TERMID": "read3", "SCT_CONCEPTID": "snomed"})
230    df["snomed"] = df["snomed"].astype(str)
231    df = df[~df["snomed"].str.match("^.*_.*$")]  # remove snomed codes with '_'
232
233    output_path = PROCESSED_PATH / "read3_to_snomed.parquet"
234    df.to_parquet(output_path, index=False)
235    logger.info(f"Extracted: {output_path}")
236
237
238def extract_nhs_read_browser():
239    # r2 only
240    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ANCESTOR.DBF"
241    df = simpledbf.Dbf5(input_path).to_dataframe()
242    df = pd.concat([df["READCODE"], df["DESCENDANT"]])
243    df = pd.DataFrame(df.drop_duplicates())
244    df = df.rename(columns={0: "read2"})
245    output_path = PROCESSED_PATH / "read2.parquet"
246    df.to_parquet(output_path, index=False)
247    logger.info(f"Extracted: {output_path}")
248
249    # r2 -> atc
250    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ATC.DBF"
251    df = simpledbf.Dbf5(input_path).to_dataframe()
252    df = df[["READCODE", "ATC"]]
253    df = df.rename(columns={"READCODE": "read2", "ATC": "atc"})
254    output_path = PROCESSED_PATH / "read2_to_atc.parquet"
255    df.to_parquet(output_path, index=False)
256    logger.info(f"Extracted: {output_path}")
257
258    # r2 -> icd10
259    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ICD10.DBF"
260    df = simpledbf.Dbf5(input_path).to_dataframe()
261    df = df[["READ_CODE", "TARG_CODE"]]
262    df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "icd10"})
263    df = df[~df["icd10"].str.match("^.*-.*$")]  # remove codes with '-'
264    df = df[~df["read2"].str.match("^.*-.*$")]  # remove codes with '-'
265    output_path = PROCESSED_PATH / "read2_to_icd10.parquet"
266    df.to_parquet(output_path, index=False)
267    logger.info(f"Extracted: {output_path}")
268
269    # r2 -> opcs4
270    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "OPCS4V3.DBF"
271    df = simpledbf.Dbf5(input_path).to_dataframe()
272    df = df[["READ_CODE", "TARG_CODE"]]
273    df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "opcs4"})
274    df = df[~df["opcs4"].str.match("^.*-.*$")]  # remove codes with '-'
275    df = df[~df["read2"].str.match("^.*-.*$")]  # remove codes with '-'
276    output_path = PROCESSED_PATH / "read2_to_opcs4.parquet"
277    df.to_parquet(output_path, index=False)
278    logger.info(f"Extracted: {output_path}")
279
280    # r3 only
281    input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ANCESTOR.DBF"
282    df = simpledbf.Dbf5(input_path).to_dataframe()
283    df = pd.concat([df["READCODE"], df["DESCENDANT"]])
284    df = pd.DataFrame(df.drop_duplicates())
285    df = df.rename(columns={0: "read3"})
286    output_path = PROCESSED_PATH / "read3.parquet"
287    df.to_parquet(output_path, index=False)
288    logger.info(f"Extracted: {output_path}")
289
290    # r3 -> icd10
291    input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ICD10.DBF"
292    df = simpledbf.Dbf5(input_path).to_dataframe()
293    df = df[["READ_CODE", "TARG_CODE"]]
294    df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "icd10"})
295    df = df[~df["icd10"].str.match("^.*-.*$")]  # remove codes with '-'
296    df = df[~df["read3"].str.match("^.*-.*$")]  # remove codes with '-'
297    output_path = PROCESSED_PATH / "read3_to_icd10.parquet"
298    df.to_parquet(output_path, index=False)
299    logger.info(f"Extracted: {output_path}")
300
301    # r3 -> icd9
302    # dbf = simpledbf.Dbf5('build/maps/downloads/Standard/V3/ICD9V3.DBF')
303
304    # r3 -> opcs4
305    input_path = DOWNLOADS_PATH / "Standard" / "V3" / "OPCS4V3.DBF"
306    df = simpledbf.Dbf5(input_path).to_dataframe()
307    df = df[["READ_CODE", "TARG_CODE"]]
308    df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "opcs4"})
309    df = df[~df["opcs4"].str.match("^.*-.*$")]  # remove codes with '-'
310    df = df[~df["read3"].str.match("^.*-.*$")]  # remove codes with '-'
311    output_path = PROCESSED_PATH / "read3_to_opcs4.parquet"
312    df.to_parquet(output_path, index=False)
313    logger.info(f"Extracted: {output_path}")
314
315
316def create_map_directories():
317    """Create map directories."""
318
319    # Check if build directory exists
320    create_map_dirs = False
321    if VOCAB_PATH.exists():
322        user_input = (
323            input(
324                f"The map directory {VOCAB_PATH} already exists. Do you want to download and process trud data again? (y/n): "
325            )
326            .strip()
327            .lower()
328        )
329        if user_input == "y":
330            # delete all build files
331            shutil.rmtree(VOCAB_PATH)
332            create_map_dirs = True
333        elif user_input == "n":
334            logger.info("Exiting TRUD installation")
335            sys.exit(0)
336    else:
337        create_map_dirs = True
338
339    if create_map_dirs:
340        # create maps directories
341        VOCAB_PATH.mkdir(parents=True, exist_ok=True)
342        DOWNLOADS_PATH.mkdir(parents=True, exist_ok=True)
343        PROCESSED_PATH.mkdir(parents=True, exist_ok=True)
344
345
346def install():
347    logger.info(f"Installing TRUD")
348
349    # get TRUD api key from environment variable
350    api_key = os.getenv("ACMC_TRUD_API_KEY")
351    if not api_key:
352        raise ValueError(
353            "TRUD API KEY not found. Set the ACMC_TRUD_API_KEY environment variable."
354        )
355
356    create_map_directories()
357
358    items_latest = True
359    items = [
360        {
361            "id": 259,
362            "name": "NHS ICD-10 5th Edition XML data files",
363            "hash": "A4F7BBA6E86349AADD0F4696C5E91152EB99CC06121427FC359160439B9F883F",
364            "extract": extract_icd10,
365        },
366        {
367            "id": 119,
368            "name": "OPCS-4 data files",
369            "hash": "0615A2BF43FFEF94517F1D1E0C05493B627839F323F22C52CBCD8B40BF767CD3",
370            "extract": extract_opsc4,
371        },
372        {
373            "id": 9,
374            "name": "NHS Data Migration",
375            "hash": "D4317B3ADBA6E1247CF17F0B7CD2B8850FD36C0EA2923BF684EA6159F3A54765",
376            "extract": extract_nhs_data_migrations,
377        },
378        {
379            "id": 8,
380            "name": "NHS Read Browser",
381            "hash": "1FFF2CBF11D0E6D7FC6CC6F13DD52D2F459095C3D83A3F754E6C359F16913C5E",
382            "extract": extract_nhs_read_browser,
383        },
384        # TODO: Download BNF from separate site? https://www.nhsbsa.nhs.uk/sites/default/files/2024-10/BNF%20Snomed%20Mapping%20data%2020241016.zip
385    ]
386
387    # remove function from items to save versions
388    data = [{k: v for k, v in d.items() if k != "extract"} for d in items]
389    # save TRUD versions to file to main record of what was downloaded
390    with open(VERSION_PATH, "w") as file:
391        yaml.dump(
392            data,
393            file,
394            Dumper=util.QuotedDumper,
395            default_flow_style=False,
396            sort_keys=False,
397            default_style='"',
398        )
399
400    # Validate and process each item ID
401    for item in items:
402        item_id = item["id"]
403        logger.info(f"--- {item['name']} ---")
404
405        releases = get_releases(item_id, API_KEY=api_key, latest=items_latest)
406        if not releases:
407            raise ValueError(f"No releases found for item {item_id}.")
408
409        # Process each release in reverse order
410        for release_ordinal, release in enumerate(releases[::-1], 1):
411            # Download archive file
412            file_destination = download_release_file(
413                item_id, release_ordinal, release, "archive"
414            )
415
416            # Optional files
417            # if items.checksum:
418            #     download_release_file(item["id"], release_ordinal, release, "checksum")
419            # if items.signature:
420            #     download_release_file(item["id"], release_ordinal, release, "signature")
421            # if items.public_key:
422            #     download_release_file(item["id"], release_ordinal, release, "publicKey", "public key")
423
424            # Verify Hash if available
425            if "hash" in item:
426                validate_download_hash(file_destination, item["hash"])
427
428            # Unzip downloaded .zip
429            unzip_download(file_destination)
430
431            # Extract Tables to parquet
432            if "extract" in item:
433                item["extract"]()
434
435        logger.info(f"Downloaded {release_ordinal} release(s) for item {item_id}.")
436
437    logger.info(f"TRUD installation completed")
logger = <Logger acmc_logger (INFO)>
FQDN = 'isd.digital.nhs.uk'
VOCAB_PATH = PosixPath('vocab/trud')
VERSION_FILE = 'trud_version.yaml'
VERSION_PATH = PosixPath('vocab/trud/trud_version.yaml')
DOWNLOADS_PATH = PosixPath('vocab/trud/downloads')
PROCESSED_PATH = PosixPath('vocab/trud/processed')
def get_releases(item_id: str, API_KEY: str, latest=False) -> list:
28def get_releases(item_id: str, API_KEY: str, latest=False) -> list:
29    """Retrieve release information for an item from the TRUD API."""
30    url = f"https://{FQDN}/trud/api/v1/keys/{API_KEY}/items/{item_id}/releases"
31    if latest:
32        url += "?latest"
33
34    response = requests.get(url)
35    if response.status_code != 200:
36        logger.error(
37            f"Failed to fetch releases for item {item_id}. Status code: {response.status_code}, error {response.json()['message']}. If no releases found for API key, please ensure you are subscribed to the data release and that it is not pending approval"
38        )
39        response.raise_for_status()
40
41    data = response.json()
42    if data.get("message") != "OK":
43        msg = f"Unknown error occurred {data.get('message')}"
44        logger.error(msg)
45        raise Exception(msg)
46
47    return data.get("releases", [])

Retrieve release information for an item from the TRUD API.

def download_release_file( item_id: str, release_ordinal: str, release: dict, file_json_prefix: str, file_type=None) -> pathlib.Path:
50def download_release_file(
51    item_id: str,
52    release_ordinal: str,
53    release: dict,
54    file_json_prefix: str,
55    file_type=None,
56) -> Path:
57    """Download specified file type for a given release of an item."""
58
59    # check folder is a directory
60    if not DOWNLOADS_PATH.is_dir():
61        raise NotADirectoryError(
62            f"Error: '{DOWNLOADS_PATH}' for TRUD resources is not a directory"
63        )
64
65    file_type = file_type or file_json_prefix
66    file_url = release.get(f"{file_json_prefix}FileUrl")
67    file_name = release.get(f"{file_json_prefix}FileName")
68    file_destination = DOWNLOADS_PATH / file_name
69
70    if not file_url or not file_name:
71        raise ValueError(
72            f"Missing {file_type} file information for release {release_ordinal} of item {item_id}."
73        )
74
75    logger.info(
76        f"Downloading item {item_id} {file_type} file: {file_name} from {file_url} to {file_destination}"
77    )
78    response = requests.get(file_url, stream=True)
79
80    if response.status_code == 200:
81        with open(file_destination, "wb") as f:
82            f.write(response.content)
83    else:
84        logger.error(
85            f"Failed to download {file_type} file for item {item_id}. Status code: {response.status_code}"
86        )
87        response.raise_for_status()
88
89    return file_destination

Download specified file type for a given release of an item.

def validate_download_hash(file_destination: str, item_hash: str):
 92def validate_download_hash(file_destination: str, item_hash: str):
 93    with open(file_destination, "rb") as f:
 94        hash = hashlib.sha256(f.read()).hexdigest()
 95    logger.debug(hash)
 96    if hash.upper() == item_hash.upper():
 97        logger.debug(f"Verified hash of {file_destination} {hash}")
 98    else:
 99        msg = f"Could not validate origin of {file_destination}. The SHA-256 hash should be: {item_hash}, but got {hash} instead"
100        logger.error(msg)
101        raise ValueError(msg)
def unzip_download(file_destination: str):
104def unzip_download(file_destination: str):
105
106    # check folder is a directory
107    if not DOWNLOADS_PATH.is_dir():
108        raise NotADirectoryError(
109            f"Error: '{DOWNLOADS_PATH}' for TRUD resoruces is not a directory"
110        )
111
112    with zipfile.ZipFile(file_destination, "r") as zip_ref:
113        zip_ref.extractall(DOWNLOADS_PATH)
def extract_icd10():
116def extract_icd10():
117    # ICD10_edition5
118    file_path = (
119        DOWNLOADS_PATH
120        / "ICD10_Edition5_XML_20160401"
121        / "Content"
122        / "ICD10_Edition5_CodesAndTitlesAndMetadata_GB_20160401.xml"
123    )
124    df = pd.read_xml(file_path)
125    df = df[["CODE", "ALT_CODE", "DESCRIPTION"]]
126    df = df.rename(
127        columns={"CODE": "icd10", "ALT_CODE": "icd10_alt", "DESCRIPTION": "description"}
128    )
129    output_path = PROCESSED_PATH / "icd10.parquet"
130    df.to_parquet(output_path, index=False)
131    logger.info(f"Extracted: {output_path}")
def extract_opsc4():
134def extract_opsc4():
135    file_path = (
136        DOWNLOADS_PATH
137        / "OPCS410 Data files txt"
138        / "OPCS410 CodesAndTitles Nov 2022 V1.0.txt"
139    )
140
141    df = pd.read_csv(file_path, sep="\t", dtype=str, header=None)
142    df = df.rename(columns={0: "opcs4", 1: "description"})
143
144    output_path = PROCESSED_PATH / "opcs4.parquet"
145    df.to_parquet(output_path, index=False)
146    logger.info(f"Extracted: {output_path}")
def extract_nhs_data_migrations():
149def extract_nhs_data_migrations():
150    # NHS Data Migrations
151
152    # snomed only
153    file_path = (
154        DOWNLOADS_PATH
155        / "Mapping Tables"
156        / "Updated"
157        / "Clinically Assured"
158        / "sctcremap_uk_20200401000001.txt"
159    )
160    df = pd.read_csv(file_path, sep="\t")
161    df = df[["SCT_CONCEPTID"]]
162    df = df.rename(columns={"SCT_CONCEPTID": "snomed"})
163    df = df.drop_duplicates()
164    df = df.astype(str)
165
166    output_path = PROCESSED_PATH / "snomed.parquet"
167    df.to_parquet(output_path, index=False)
168    logger.info(f"Extracted: {output_path}")
169
170    # r2 -> r3
171    file_path = (
172        DOWNLOADS_PATH
173        / "Mapping Tables"
174        / "Updated"
175        / "Clinically Assured"
176        / "rctctv3map_uk_20200401000001.txt"
177    )
178    df = pd.read_csv(file_path, sep="\t")
179    df = df[["V2_CONCEPTID", "CTV3_CONCEPTID"]]
180    df = df.rename(columns={"V2_CONCEPTID": "read2", "CTV3_CONCEPTID": "read3"})
181
182    output_path = PROCESSED_PATH / "read2_to_read3.parquet"
183    df.to_parquet(output_path, index=False)
184    logger.info(f"Extracted: {output_path}")
185
186    # r3->r2
187    file_path = (
188        DOWNLOADS_PATH
189        / "Mapping Tables"
190        / "Updated"
191        / "Clinically Assured"
192        / "ctv3rctmap_uk_20200401000002.txt"
193    )
194    df = pd.read_csv(file_path, sep="\t")
195    df = df[["CTV3_CONCEPTID", "V2_CONCEPTID"]]
196    df = df.rename(columns={"CTV3_CONCEPTID": "read3", "V2_CONCEPTID": "read2"})
197    df = df.drop_duplicates()
198    df = df[~df["read2"].str.match("^.*_.*$")]  # remove r2 codes with '_'
199
200    output_path = PROCESSED_PATH / "read3_to_read2.parquet"
201    df.to_parquet(output_path, index=False)
202    logger.info(f"Extracted: {output_path}")
203
204    # r2 -> snomed
205    file_path = (
206        DOWNLOADS_PATH
207        / "Mapping Tables"
208        / "Updated"
209        / "Clinically Assured"
210        / "rcsctmap2_uk_20200401000001.txt"
211    )
212    df = pd.read_csv(file_path, sep="\t", dtype=str)
213    df = df[["ReadCode", "ConceptId"]]
214    df = df.rename(columns={"ReadCode": "read2", "ConceptId": "snomed"})
215
216    output_path = PROCESSED_PATH / "read2_to_snomed.parquet"
217    df.to_parquet(output_path, index=False)
218    logger.info(f"Extracted: {output_path}")
219
220    # r3->snomed
221    file_path = (
222        DOWNLOADS_PATH
223        / "Mapping Tables"
224        / "Updated"
225        / "Clinically Assured"
226        / "ctv3sctmap2_uk_20200401000001.txt"
227    )
228    df = pd.read_csv(file_path, sep="\t", dtype=str)
229    df = df[["CTV3_TERMID", "SCT_CONCEPTID"]]
230    df = df.rename(columns={"CTV3_TERMID": "read3", "SCT_CONCEPTID": "snomed"})
231    df["snomed"] = df["snomed"].astype(str)
232    df = df[~df["snomed"].str.match("^.*_.*$")]  # remove snomed codes with '_'
233
234    output_path = PROCESSED_PATH / "read3_to_snomed.parquet"
235    df.to_parquet(output_path, index=False)
236    logger.info(f"Extracted: {output_path}")
def extract_nhs_read_browser():
239def extract_nhs_read_browser():
240    # r2 only
241    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ANCESTOR.DBF"
242    df = simpledbf.Dbf5(input_path).to_dataframe()
243    df = pd.concat([df["READCODE"], df["DESCENDANT"]])
244    df = pd.DataFrame(df.drop_duplicates())
245    df = df.rename(columns={0: "read2"})
246    output_path = PROCESSED_PATH / "read2.parquet"
247    df.to_parquet(output_path, index=False)
248    logger.info(f"Extracted: {output_path}")
249
250    # r2 -> atc
251    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ATC.DBF"
252    df = simpledbf.Dbf5(input_path).to_dataframe()
253    df = df[["READCODE", "ATC"]]
254    df = df.rename(columns={"READCODE": "read2", "ATC": "atc"})
255    output_path = PROCESSED_PATH / "read2_to_atc.parquet"
256    df.to_parquet(output_path, index=False)
257    logger.info(f"Extracted: {output_path}")
258
259    # r2 -> icd10
260    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "ICD10.DBF"
261    df = simpledbf.Dbf5(input_path).to_dataframe()
262    df = df[["READ_CODE", "TARG_CODE"]]
263    df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "icd10"})
264    df = df[~df["icd10"].str.match("^.*-.*$")]  # remove codes with '-'
265    df = df[~df["read2"].str.match("^.*-.*$")]  # remove codes with '-'
266    output_path = PROCESSED_PATH / "read2_to_icd10.parquet"
267    df.to_parquet(output_path, index=False)
268    logger.info(f"Extracted: {output_path}")
269
270    # r2 -> opcs4
271    input_path = DOWNLOADS_PATH / "Standard" / "V2" / "OPCS4V3.DBF"
272    df = simpledbf.Dbf5(input_path).to_dataframe()
273    df = df[["READ_CODE", "TARG_CODE"]]
274    df = df.rename(columns={"READ_CODE": "read2", "TARG_CODE": "opcs4"})
275    df = df[~df["opcs4"].str.match("^.*-.*$")]  # remove codes with '-'
276    df = df[~df["read2"].str.match("^.*-.*$")]  # remove codes with '-'
277    output_path = PROCESSED_PATH / "read2_to_opcs4.parquet"
278    df.to_parquet(output_path, index=False)
279    logger.info(f"Extracted: {output_path}")
280
281    # r3 only
282    input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ANCESTOR.DBF"
283    df = simpledbf.Dbf5(input_path).to_dataframe()
284    df = pd.concat([df["READCODE"], df["DESCENDANT"]])
285    df = pd.DataFrame(df.drop_duplicates())
286    df = df.rename(columns={0: "read3"})
287    output_path = PROCESSED_PATH / "read3.parquet"
288    df.to_parquet(output_path, index=False)
289    logger.info(f"Extracted: {output_path}")
290
291    # r3 -> icd10
292    input_path = DOWNLOADS_PATH / "Standard" / "V3" / "ICD10.DBF"
293    df = simpledbf.Dbf5(input_path).to_dataframe()
294    df = df[["READ_CODE", "TARG_CODE"]]
295    df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "icd10"})
296    df = df[~df["icd10"].str.match("^.*-.*$")]  # remove codes with '-'
297    df = df[~df["read3"].str.match("^.*-.*$")]  # remove codes with '-'
298    output_path = PROCESSED_PATH / "read3_to_icd10.parquet"
299    df.to_parquet(output_path, index=False)
300    logger.info(f"Extracted: {output_path}")
301
302    # r3 -> icd9
303    # dbf = simpledbf.Dbf5('build/maps/downloads/Standard/V3/ICD9V3.DBF')
304
305    # r3 -> opcs4
306    input_path = DOWNLOADS_PATH / "Standard" / "V3" / "OPCS4V3.DBF"
307    df = simpledbf.Dbf5(input_path).to_dataframe()
308    df = df[["READ_CODE", "TARG_CODE"]]
309    df = df.rename(columns={"READ_CODE": "read3", "TARG_CODE": "opcs4"})
310    df = df[~df["opcs4"].str.match("^.*-.*$")]  # remove codes with '-'
311    df = df[~df["read3"].str.match("^.*-.*$")]  # remove codes with '-'
312    output_path = PROCESSED_PATH / "read3_to_opcs4.parquet"
313    df.to_parquet(output_path, index=False)
314    logger.info(f"Extracted: {output_path}")
def create_map_directories():
317def create_map_directories():
318    """Create map directories."""
319
320    # Check if build directory exists
321    create_map_dirs = False
322    if VOCAB_PATH.exists():
323        user_input = (
324            input(
325                f"The map directory {VOCAB_PATH} already exists. Do you want to download and process trud data again? (y/n): "
326            )
327            .strip()
328            .lower()
329        )
330        if user_input == "y":
331            # delete all build files
332            shutil.rmtree(VOCAB_PATH)
333            create_map_dirs = True
334        elif user_input == "n":
335            logger.info("Exiting TRUD installation")
336            sys.exit(0)
337    else:
338        create_map_dirs = True
339
340    if create_map_dirs:
341        # create maps directories
342        VOCAB_PATH.mkdir(parents=True, exist_ok=True)
343        DOWNLOADS_PATH.mkdir(parents=True, exist_ok=True)
344        PROCESSED_PATH.mkdir(parents=True, exist_ok=True)

Create map directories.

def install():
347def install():
348    logger.info(f"Installing TRUD")
349
350    # get TRUD api key from environment variable
351    api_key = os.getenv("ACMC_TRUD_API_KEY")
352    if not api_key:
353        raise ValueError(
354            "TRUD API KEY not found. Set the ACMC_TRUD_API_KEY environment variable."
355        )
356
357    create_map_directories()
358
359    items_latest = True
360    items = [
361        {
362            "id": 259,
363            "name": "NHS ICD-10 5th Edition XML data files",
364            "hash": "A4F7BBA6E86349AADD0F4696C5E91152EB99CC06121427FC359160439B9F883F",
365            "extract": extract_icd10,
366        },
367        {
368            "id": 119,
369            "name": "OPCS-4 data files",
370            "hash": "0615A2BF43FFEF94517F1D1E0C05493B627839F323F22C52CBCD8B40BF767CD3",
371            "extract": extract_opsc4,
372        },
373        {
374            "id": 9,
375            "name": "NHS Data Migration",
376            "hash": "D4317B3ADBA6E1247CF17F0B7CD2B8850FD36C0EA2923BF684EA6159F3A54765",
377            "extract": extract_nhs_data_migrations,
378        },
379        {
380            "id": 8,
381            "name": "NHS Read Browser",
382            "hash": "1FFF2CBF11D0E6D7FC6CC6F13DD52D2F459095C3D83A3F754E6C359F16913C5E",
383            "extract": extract_nhs_read_browser,
384        },
385        # TODO: Download BNF from separate site? https://www.nhsbsa.nhs.uk/sites/default/files/2024-10/BNF%20Snomed%20Mapping%20data%2020241016.zip
386    ]
387
388    # remove function from items to save versions
389    data = [{k: v for k, v in d.items() if k != "extract"} for d in items]
390    # save TRUD versions to file to main record of what was downloaded
391    with open(VERSION_PATH, "w") as file:
392        yaml.dump(
393            data,
394            file,
395            Dumper=util.QuotedDumper,
396            default_flow_style=False,
397            sort_keys=False,
398            default_style='"',
399        )
400
401    # Validate and process each item ID
402    for item in items:
403        item_id = item["id"]
404        logger.info(f"--- {item['name']} ---")
405
406        releases = get_releases(item_id, API_KEY=api_key, latest=items_latest)
407        if not releases:
408            raise ValueError(f"No releases found for item {item_id}.")
409
410        # Process each release in reverse order
411        for release_ordinal, release in enumerate(releases[::-1], 1):
412            # Download archive file
413            file_destination = download_release_file(
414                item_id, release_ordinal, release, "archive"
415            )
416
417            # Optional files
418            # if items.checksum:
419            #     download_release_file(item["id"], release_ordinal, release, "checksum")
420            # if items.signature:
421            #     download_release_file(item["id"], release_ordinal, release, "signature")
422            # if items.public_key:
423            #     download_release_file(item["id"], release_ordinal, release, "publicKey", "public key")
424
425            # Verify Hash if available
426            if "hash" in item:
427                validate_download_hash(file_destination, item["hash"])
428
429            # Unzip downloaded .zip
430            unzip_download(file_destination)
431
432            # Extract Tables to parquet
433            if "extract" in item:
434                item["extract"]()
435
436        logger.info(f"Downloaded {release_ordinal} release(s) for item {item_id}.")
437
438    logger.info(f"TRUD installation completed")