Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support changing file types in scmd import #4805

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions openprescribing/gcutils/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,14 @@ def delete_dataset(self):
self.gcbq_client.delete_table(table_list_item.reference)
self.gcbq_client.delete_dataset(self.dataset)

def create_table(self, table_id, schema):
def create_table(self, table_id, schema, time_partitioning=None):
table_ref = self.dataset.table(table_id)
table = gcbq.Table(table_ref, schema=schema)

if time_partitioning:
if isinstance(time_partitioning, gcbq.TimePartitioning):
table.time_partitioning = time_partitioning

try:
self.gcbq_client.create_table(table)
except NotFound as e:
Expand All @@ -150,9 +154,9 @@ def get_table(self, table_id):
table_ref = self.dataset.table(table_id)
return Table(table_ref, self)

def get_or_create_table(self, table_id, schema):
def get_or_create_table(self, table_id, schema, time_partitioning=None):
try:
table = self.create_table(table_id, schema)
table = self.create_table(table_id, schema, time_partitioning)
except Conflict:
table = self.get_table(table_id)
return table
Expand Down
105 changes: 74 additions & 31 deletions openprescribing/pipeline/management/commands/import_scmd.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import csv
import re
import tempfile

import requests
from bs4 import BeautifulSoup
from django.core.management import BaseCommand
from gcutils.bigquery import Client, build_schema
from google.cloud import bigquery as gcbq
from google.cloud.exceptions import Conflict

SCHEMA = build_schema(
("file_type", "STRING"),
("year_month", "DATE"),
("ods_code", "STRING"),
("vmp_snomed_code", "STRING"),
Expand All @@ -33,25 +35,53 @@ def handle(self, *args, **kwargs):

# Get URLs keyed by the date (year-month) they're for
urls = set(self.iter_dataset_urls(s))
urls_by_month = dict(self.iter_months(urls))
urls_by_month_and_file_type = {
month: {"url": url, "file_type": file_type}
for month, url, file_type in self.iter_months(urls)
}

# set up the BigQuery client, dataset, and table
client = Client(dataset_key="scmd")
self.ensure_dataset_exists(client)
table = client.get_or_create_table("scmd", schema=SCHEMA)

time_partitioning = gcbq.TimePartitioning(
type_=gcbq.TimePartitioningType.DAY,
field="year_month",
)

table = client.get_or_create_table(
"scmd", schema=SCHEMA, time_partitioning=time_partitioning
)

# look for existing months in BigQuery
sql = "SELECT DISTINCT year_month FROM {};".format(table.qualified_name)
known_dates = [r[0] for r in client.query(sql).rows]
sql = "SELECT DISTINCT year_month, file_type FROM {};".format(
table.qualified_name
)
known_dates_and_types = {
r["year_month"]: r["file_type"] for r in client.query(sql).rows
}

# convert the datetime.dates the query gives us to strings since
# that's what we're dealing with elsewhere.
known_months = {d.strftime("%Y-%m") for d in known_dates}
known_dates_and_types = {
k.strftime("%Y-%m"): v for k, v in known_dates_and_types.items()
}
known_months = set(known_dates_and_types.keys())
print(known_months)

missing_months = set(urls_by_month.keys()) - known_months
missing_months = set(urls_by_month_and_file_type.keys()) - known_months
print("Missing months: {}".format(", ".join(sorted(missing_months))))
pending_urls = {m: urls_by_month[m] for m in missing_months}

out_of_date_months = {
m: urls_by_month_and_file_type[m]
for m in urls_by_month_and_file_type.keys()
if urls_by_month_and_file_type[m]["file_type"] != known_dates_and_types[m]
}

pending_urls = {
m: urls_by_month_and_file_type[m]
for m in missing_months.union(out_of_date_months)
}

# abort if there's nothing to get
if not pending_urls:
Expand All @@ -78,13 +108,16 @@ def handle(self, *args, **kwargs):
with tempfile.NamedTemporaryFile(mode="w+") as f:
writer = csv.writer(f, delimiter=",")
for line in reader:
# add the file type
line.insert(0, url["file_type"])

# Convert year-month dates to year-month-day
if len(line[0]) == 7:
line[0] = line[0] + "-01"
elif len(line[0]) == 6:
line[0] = line[0][:4] + "-" + line[0][4:6] + "-01"
if len(line[1]) == 7:
line[1] = line[1] + "-01"
elif len(line[1]) == 6:
line[1] = line[1][:4] + "-" + line[1][4:6] + "-01"
else:
assert False, line[0]
assert False, line[1]
writer.writerow(line)
print("{} | Wrote: {}".format(month, f.name))

Expand All @@ -93,36 +126,46 @@ def handle(self, *args, **kwargs):

# insert into BigQuery
table.insert_rows_from_csv(
f.name, SCHEMA, write_disposition="WRITE_APPEND"
f.name, SCHEMA, write_disposition="WRITE_TRUNCATE"
)
print("{} | Ingested into BigQuery".format(month))

def iter_dataset_urls(self, session):
"""Extract CSV file URLs from the dataset page."""
datasets_url = "https://opendata.nhsbsa.net/dataset/secondary-care-medicines-data-indicative-price"
"""Extract CSV file URLs via the API"""
dataset_name = "secondary-care-medicines-data-indicative-price"
dataset_url = (
f"https://opendata.nhsbsa.net/api/3/action/package_show?id={dataset_name}"
)

# scrape available datasets
r = session.get(datasets_url)
r = session.get(dataset_url)
r.raise_for_status()

doc = BeautifulSoup(r.text, "html.parser")
data = r.json()
resources = data["result"]["resources"]

pattern = r"scmd_(final|provisional|wip)_[0-9]{6}\.csv"

for a in doc.find_all("a", href=True):
if a["href"].endswith(".csv"):
yield a["href"]
for resource in resources:
if resource["format"].upper() == "CSV" and re.search(
pattern, resource["url"].split("/")[-1]
):
yield resource["url"]

def iter_months(self, urls):
"""
Extract a "month" from each URL given.
Extract a "month" and file type from each URL given.

URLs are expected to end in the format `/SCMD_<something>_<year><month>.csv`, from
URLs are expected to end in the format `/scmd_<file_type>_<year><month>.csv`, from
that we get the year and month, converting them to the format
<year>-<month>.
<year>-<month> and the file type.
"""
pattern = r"scmd_(final|provisional|wip)_([0-9]{4})([0-9]{2})\.csv"
for url in urls:
year_and_month = url.split("_")[-1].split(".")[0]

# Split up dates with hyphens and add a day to match what we put
# into BigQuery.
date = "{}-{}".format(year_and_month[:4], year_and_month[4:])
yield date, url
match = re.search(pattern, url.split("/")[-1])
if match:
file_type, year, month = match.groups()
# Split up dates with hyphens and add a day to match what we put
# into BigQuery.
yield f"{year}-{month}", file_type, url
else:
raise ValueError(f"Unexpected URL format: {url}")
Loading