From 9420e460161da13d7e3cfd801495c6b009d02a8b Mon Sep 17 00:00:00 2001 From: Louis Fisher Date: Fri, 24 May 2024 15:04:59 +0100 Subject: [PATCH 1/7] get dataset urls from the bsa open data API instead of scraping them --- .../management/commands/import_scmd.py | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/openprescribing/pipeline/management/commands/import_scmd.py b/openprescribing/pipeline/management/commands/import_scmd.py index 180c753280..383da750b4 100644 --- a/openprescribing/pipeline/management/commands/import_scmd.py +++ b/openprescribing/pipeline/management/commands/import_scmd.py @@ -1,8 +1,8 @@ import csv import tempfile +import re import requests -from bs4 import BeautifulSoup from django.core.management import BaseCommand from gcutils.bigquery import Client, build_schema from google.cloud.exceptions import Conflict @@ -98,24 +98,27 @@ def handle(self, *args, **kwargs): print("{} | Ingested into BigQuery".format(month)) def iter_dataset_urls(self, session): - """Extract CSV file URLs from the dataset page.""" - datasets_url = "https://opendata.nhsbsa.net/dataset/secondary-care-medicines-data-indicative-price" + """Extract CSV file URLs via the API""" + dataset_name = "secondary-care-medicines-data-indicative-price" + dataset_url = f"https://opendata.nhsbsa.net/api/3/action/package_show?id={dataset_name}" - # scrape available datasets - r = session.get(datasets_url) + r = session.get(dataset_url) r.raise_for_status() - doc = BeautifulSoup(r.text, "html.parser") + data = r.json() + resources = data['result']['resources'] - for a in doc.find_all("a", href=True): - if a["href"].endswith(".csv"): - yield a["href"] + pattern = r"scmd_(final|provisional|wip)_[0-9]{6}\.csv" + + for resource in resources: + if resource['format'].upper() == 'CSV' and re.search(pattern, resource['url'].split('/')[-1]): + yield resource['url'] def iter_months(self, urls): """ Extract a "month" from each URL given. - URLs are expected to end in the format `/SCMD__.csv`, from + URLs are expected to end in the format `/scmd__.csv`, from that we get the year and month, converting them to the format -. """ From 2dde82cef56b5325b2427958f36ece4350cf1084 Mon Sep 17 00:00:00 2001 From: Louis Fisher Date: Fri, 24 May 2024 15:51:01 +0100 Subject: [PATCH 2/7] run black --- .../pipeline/management/commands/import_scmd.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/openprescribing/pipeline/management/commands/import_scmd.py b/openprescribing/pipeline/management/commands/import_scmd.py index 383da750b4..bc734add68 100644 --- a/openprescribing/pipeline/management/commands/import_scmd.py +++ b/openprescribing/pipeline/management/commands/import_scmd.py @@ -1,6 +1,6 @@ import csv -import tempfile import re +import tempfile import requests from django.core.management import BaseCommand @@ -100,19 +100,23 @@ def handle(self, *args, **kwargs): def iter_dataset_urls(self, session): """Extract CSV file URLs via the API""" dataset_name = "secondary-care-medicines-data-indicative-price" - dataset_url = f"https://opendata.nhsbsa.net/api/3/action/package_show?id={dataset_name}" + dataset_url = ( + f"https://opendata.nhsbsa.net/api/3/action/package_show?id={dataset_name}" + ) r = session.get(dataset_url) r.raise_for_status() data = r.json() - resources = data['result']['resources'] + resources = data["result"]["resources"] pattern = r"scmd_(final|provisional|wip)_[0-9]{6}\.csv" for resource in resources: - if resource['format'].upper() == 'CSV' and re.search(pattern, resource['url'].split('/')[-1]): - yield resource['url'] + if resource["format"].upper() == "CSV" and re.search( + pattern, resource["url"].split("/")[-1] + ): + yield resource["url"] def iter_months(self, urls): """ From 151828884f1bbeaafdf0f2ea6d0b3386535af289 Mon Sep 17 00:00:00 2001 From: Louis Fisher Date: Fri, 31 May 2024 12:33:59 +0100 Subject: [PATCH 3/7] add file type to schema --- openprescribing/pipeline/management/commands/import_scmd.py | 1 + 1 file changed, 1 insertion(+) diff --git a/openprescribing/pipeline/management/commands/import_scmd.py b/openprescribing/pipeline/management/commands/import_scmd.py index bc734add68..1d595f48fa 100644 --- a/openprescribing/pipeline/management/commands/import_scmd.py +++ b/openprescribing/pipeline/management/commands/import_scmd.py @@ -8,6 +8,7 @@ from google.cloud.exceptions import Conflict SCHEMA = build_schema( + ("file_type", "STRING"), ("year_month", "DATE"), ("ods_code", "STRING"), ("vmp_snomed_code", "STRING"), From d4788c6a15fef7acf271e1ab8ba04204cdd2f815 Mon Sep 17 00:00:00 2001 From: Louis Fisher Date: Fri, 31 May 2024 12:34:38 +0100 Subject: [PATCH 4/7] get the file type in iter_months --- .../management/commands/import_scmd.py | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/openprescribing/pipeline/management/commands/import_scmd.py b/openprescribing/pipeline/management/commands/import_scmd.py index 1d595f48fa..716897fbea 100644 --- a/openprescribing/pipeline/management/commands/import_scmd.py +++ b/openprescribing/pipeline/management/commands/import_scmd.py @@ -121,16 +121,19 @@ def iter_dataset_urls(self, session): def iter_months(self, urls): """ - Extract a "month" from each URL given. + Extract a "month" and file type from each URL given. - URLs are expected to end in the format `/scmd__.csv`, from + URLs are expected to end in the format `/scmd__.csv`, from that we get the year and month, converting them to the format - -. + - and the file type. """ + pattern = r"scmd_(final|provisional|wip)_([0-9]{4})([0-9]{2})\.csv" for url in urls: - year_and_month = url.split("_")[-1].split(".")[0] - - # Split up dates with hyphens and add a day to match what we put - # into BigQuery. - date = "{}-{}".format(year_and_month[:4], year_and_month[4:]) - yield date, url + match = re.search(pattern, url.split("/")[-1]) + if match: + file_type, year, month = match.groups() + # Split up dates with hyphens and add a day to match what we put + # into BigQuery. + yield f"{year}-{month}", file_type, url + else: + raise ValueError(f"Unexpected URL format: {url}") \ No newline at end of file From d5eb6fe62fa574a9d5f17c70f4252416ac1f8ed7 Mon Sep 17 00:00:00 2001 From: Louis Fisher Date: Fri, 31 May 2024 12:39:38 +0100 Subject: [PATCH 5/7] include out of date months in pending urls Out of date months are months for which the existing file type attached to a month in the scmd table does not match that taken from the url. File type changes only go in one direction so we don't have to worry about what the file type changes to. --- .../management/commands/import_scmd.py | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/openprescribing/pipeline/management/commands/import_scmd.py b/openprescribing/pipeline/management/commands/import_scmd.py index 716897fbea..aaee4e81d3 100644 --- a/openprescribing/pipeline/management/commands/import_scmd.py +++ b/openprescribing/pipeline/management/commands/import_scmd.py @@ -34,7 +34,10 @@ def handle(self, *args, **kwargs): # Get URLs keyed by the date (year-month) they're for urls = set(self.iter_dataset_urls(s)) - urls_by_month = dict(self.iter_months(urls)) + urls_by_month_and_file_type = { + month: {"url": url, "file_type": file_type} + for month, url, file_type in self.iter_months(urls) + } # set up the BigQuery client, dataset, and table client = Client(dataset_key="scmd") @@ -42,17 +45,34 @@ def handle(self, *args, **kwargs): table = client.get_or_create_table("scmd", schema=SCHEMA) # look for existing months in BigQuery - sql = "SELECT DISTINCT year_month FROM {};".format(table.qualified_name) - known_dates = [r[0] for r in client.query(sql).rows] + sql = "SELECT DISTINCT year_month, file_type FROM {};".format( + table.qualified_name + ) + known_dates_and_types = { + r["year_month"]: r["file_type"] for r in client.query(sql).rows + } # convert the datetime.dates the query gives us to strings since # that's what we're dealing with elsewhere. - known_months = {d.strftime("%Y-%m") for d in known_dates} + known_dates_and_types = { + k.strftime("%Y-%m"): v for k, v in known_dates_and_types.items() + } + known_months = set(known_dates_and_types.keys()) print(known_months) - missing_months = set(urls_by_month.keys()) - known_months + missing_months = set(urls_by_month_and_file_type.keys()) - known_months print("Missing months: {}".format(", ".join(sorted(missing_months)))) - pending_urls = {m: urls_by_month[m] for m in missing_months} + + out_of_date_months = { + m: urls_by_month_and_file_type[m] + for m in urls_by_month_and_file_type.keys() + if urls_by_month_and_file_type[m]["file_type"] != known_dates_and_types[m] + } + + pending_urls = { + m: urls_by_month_and_file_type[m] + for m in missing_months.union(out_of_date_months) + } # abort if there's nothing to get if not pending_urls: From 8a97d5f5e995b84228326063e87ddd13d141e4ff Mon Sep 17 00:00:00 2001 From: Louis Fisher Date: Fri, 31 May 2024 12:40:55 +0100 Subject: [PATCH 6/7] append the file type to the downloaded data --- .../pipeline/management/commands/import_scmd.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/openprescribing/pipeline/management/commands/import_scmd.py b/openprescribing/pipeline/management/commands/import_scmd.py index aaee4e81d3..f64491cace 100644 --- a/openprescribing/pipeline/management/commands/import_scmd.py +++ b/openprescribing/pipeline/management/commands/import_scmd.py @@ -99,13 +99,16 @@ def handle(self, *args, **kwargs): with tempfile.NamedTemporaryFile(mode="w+") as f: writer = csv.writer(f, delimiter=",") for line in reader: + # add the file type + line.insert(0, url["file_type"]) + # Convert year-month dates to year-month-day - if len(line[0]) == 7: - line[0] = line[0] + "-01" - elif len(line[0]) == 6: - line[0] = line[0][:4] + "-" + line[0][4:6] + "-01" + if len(line[1]) == 7: + line[1] = line[1] + "-01" + elif len(line[1]) == 6: + line[1] = line[1][:4] + "-" + line[1][4:6] + "-01" else: - assert False, line[0] + assert False, line[1] writer.writerow(line) print("{} | Wrote: {}".format(month, f.name)) From 99b0a0cdad93ae1949c8befb4fd81efe0d9bec04 Mon Sep 17 00:00:00 2001 From: Louis Fisher Date: Fri, 31 May 2024 12:41:52 +0100 Subject: [PATCH 7/7] partition table on year_month This way we can easily overwrite data for a given month when the file type has changed --- openprescribing/gcutils/bigquery.py | 10 +++++++--- .../pipeline/management/commands/import_scmd.py | 15 ++++++++++++--- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/openprescribing/gcutils/bigquery.py b/openprescribing/gcutils/bigquery.py index 2b9f93ecbb..6326bd8ee4 100644 --- a/openprescribing/gcutils/bigquery.py +++ b/openprescribing/gcutils/bigquery.py @@ -128,10 +128,14 @@ def delete_dataset(self): self.gcbq_client.delete_table(table_list_item.reference) self.gcbq_client.delete_dataset(self.dataset) - def create_table(self, table_id, schema): + def create_table(self, table_id, schema, time_partitioning=None): table_ref = self.dataset.table(table_id) table = gcbq.Table(table_ref, schema=schema) + if time_partitioning: + if isinstance(time_partitioning, gcbq.TimePartitioning): + table.time_partitioning = time_partitioning + try: self.gcbq_client.create_table(table) except NotFound as e: @@ -150,9 +154,9 @@ def get_table(self, table_id): table_ref = self.dataset.table(table_id) return Table(table_ref, self) - def get_or_create_table(self, table_id, schema): + def get_or_create_table(self, table_id, schema, time_partitioning=None): try: - table = self.create_table(table_id, schema) + table = self.create_table(table_id, schema, time_partitioning) except Conflict: table = self.get_table(table_id) return table diff --git a/openprescribing/pipeline/management/commands/import_scmd.py b/openprescribing/pipeline/management/commands/import_scmd.py index f64491cace..87a8c6ced2 100644 --- a/openprescribing/pipeline/management/commands/import_scmd.py +++ b/openprescribing/pipeline/management/commands/import_scmd.py @@ -5,6 +5,7 @@ import requests from django.core.management import BaseCommand from gcutils.bigquery import Client, build_schema +from google.cloud import bigquery as gcbq from google.cloud.exceptions import Conflict SCHEMA = build_schema( @@ -42,7 +43,15 @@ def handle(self, *args, **kwargs): # set up the BigQuery client, dataset, and table client = Client(dataset_key="scmd") self.ensure_dataset_exists(client) - table = client.get_or_create_table("scmd", schema=SCHEMA) + + time_partitioning = gcbq.TimePartitioning( + type_=gcbq.TimePartitioningType.DAY, + field="year_month", + ) + + table = client.get_or_create_table( + "scmd", schema=SCHEMA, time_partitioning=time_partitioning + ) # look for existing months in BigQuery sql = "SELECT DISTINCT year_month, file_type FROM {};".format( @@ -117,7 +126,7 @@ def handle(self, *args, **kwargs): # insert into BigQuery table.insert_rows_from_csv( - f.name, SCHEMA, write_disposition="WRITE_APPEND" + f.name, SCHEMA, write_disposition="WRITE_TRUNCATE" ) print("{} | Ingested into BigQuery".format(month)) @@ -159,4 +168,4 @@ def iter_months(self, urls): # into BigQuery. yield f"{year}-{month}", file_type, url else: - raise ValueError(f"Unexpected URL format: {url}") \ No newline at end of file + raise ValueError(f"Unexpected URL format: {url}")