diff --git a/openprescribing/gcutils/bigquery.py b/openprescribing/gcutils/bigquery.py index 2b9f93ecbb..6326bd8ee4 100644 --- a/openprescribing/gcutils/bigquery.py +++ b/openprescribing/gcutils/bigquery.py @@ -128,10 +128,14 @@ def delete_dataset(self): self.gcbq_client.delete_table(table_list_item.reference) self.gcbq_client.delete_dataset(self.dataset) - def create_table(self, table_id, schema): + def create_table(self, table_id, schema, time_partitioning=None): table_ref = self.dataset.table(table_id) table = gcbq.Table(table_ref, schema=schema) + if time_partitioning: + if isinstance(time_partitioning, gcbq.TimePartitioning): + table.time_partitioning = time_partitioning + try: self.gcbq_client.create_table(table) except NotFound as e: @@ -150,9 +154,9 @@ def get_table(self, table_id): table_ref = self.dataset.table(table_id) return Table(table_ref, self) - def get_or_create_table(self, table_id, schema): + def get_or_create_table(self, table_id, schema, time_partitioning=None): try: - table = self.create_table(table_id, schema) + table = self.create_table(table_id, schema, time_partitioning) except Conflict: table = self.get_table(table_id) return table diff --git a/openprescribing/pipeline/management/commands/import_scmd.py b/openprescribing/pipeline/management/commands/import_scmd.py index bec9b42118..231dd24d63 100644 --- a/openprescribing/pipeline/management/commands/import_scmd.py +++ b/openprescribing/pipeline/management/commands/import_scmd.py @@ -6,6 +6,7 @@ from django.core.management import BaseCommand from gcutils.bigquery import Client, build_schema from google.cloud.exceptions import Conflict +from google.cloud import bigquery as gcbq SCHEMA = build_schema( ("file_type", "STRING"), @@ -42,7 +43,13 @@ def handle(self, *args, **kwargs): # set up the BigQuery client, dataset, and table client = Client(dataset_key="scmd") self.ensure_dataset_exists(client) - table = client.get_or_create_table("scmd", schema=SCHEMA) + + time_partitioning = gcbq.TimePartitioning( + type_=gcbq.TimePartitioningType.DAY, + field="year_month", + ) + + table = client.get_or_create_table("scmd", schema=SCHEMA, time_partitioning=time_partitioning) # look for existing months in BigQuery sql = "SELECT DISTINCT year_month, file_type FROM {};".format(table.qualified_name) @@ -108,7 +115,7 @@ def handle(self, *args, **kwargs): # insert into BigQuery table.insert_rows_from_csv( - f.name, SCHEMA, write_disposition="WRITE_APPEND" + f.name, SCHEMA, write_disposition="WRITE_TRUNCATE" ) print("{} | Ingested into BigQuery".format(month))