Skip to content

Commit

Permalink
partition table on year_month
Browse files Browse the repository at this point in the history
This way we can easily overwrite data for a given month when the file type has changed
  • Loading branch information
LFISHER7 committed May 31, 2024
1 parent 5acc29e commit 67b1db6
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
10 changes: 7 additions & 3 deletions openprescribing/gcutils/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,14 @@ def delete_dataset(self):
self.gcbq_client.delete_table(table_list_item.reference)
self.gcbq_client.delete_dataset(self.dataset)

def create_table(self, table_id, schema):
def create_table(self, table_id, schema, time_partitioning=None):
table_ref = self.dataset.table(table_id)
table = gcbq.Table(table_ref, schema=schema)

if time_partitioning:
if isinstance(time_partitioning, gcbq.TimePartitioning):
table.time_partitioning = time_partitioning

try:
self.gcbq_client.create_table(table)
except NotFound as e:
Expand All @@ -150,9 +154,9 @@ def get_table(self, table_id):
table_ref = self.dataset.table(table_id)
return Table(table_ref, self)

def get_or_create_table(self, table_id, schema):
def get_or_create_table(self, table_id, schema, time_partitioning=None):
try:
table = self.create_table(table_id, schema)
table = self.create_table(table_id, schema, time_partitioning)
except Conflict:
table = self.get_table(table_id)
return table
Expand Down
13 changes: 10 additions & 3 deletions openprescribing/pipeline/management/commands/import_scmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from django.core.management import BaseCommand
from gcutils.bigquery import Client, build_schema
from google.cloud.exceptions import Conflict
from google.cloud import bigquery as gcbq

SCHEMA = build_schema(
("file_type", "STRING"),
Expand Down Expand Up @@ -42,7 +43,13 @@ def handle(self, *args, **kwargs):
# set up the BigQuery client, dataset, and table
client = Client(dataset_key="scmd")
self.ensure_dataset_exists(client)
table = client.get_or_create_table("scmd", schema=SCHEMA)

time_partitioning = gcbq.TimePartitioning(
type_=gcbq.TimePartitioningType.DAY,
field="year_month",
)

table = client.get_or_create_table("scmd", schema=SCHEMA, time_partitioning=time_partitioning)

# look for existing months in BigQuery
sql = "SELECT DISTINCT year_month, file_type FROM {};".format(table.qualified_name)
Expand Down Expand Up @@ -108,7 +115,7 @@ def handle(self, *args, **kwargs):

# insert into BigQuery
table.insert_rows_from_csv(
f.name, SCHEMA, write_disposition="WRITE_APPEND"
f.name, SCHEMA, write_disposition="WRITE_TRUNCATE"
)
print("{} | Ingested into BigQuery".format(month))

Expand Down Expand Up @@ -150,4 +157,4 @@ def iter_months(self, urls):
# into BigQuery.
yield f"{year}-{month}", file_type, url
else:
raise ValueError(f"Unexpected URL format: {url}")
raise ValueError(f"Unexpected URL format: {url}")

0 comments on commit 67b1db6

Please sign in to comment.