From 46554afec7fda8ff862a3d8d176232bd11a0562e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20Perell=C3=B3?= Date: Tue, 19 Jul 2022 14:36:14 +0200 Subject: [PATCH 1/3] Update both init and processhandler files to admit deduplication --- target_bigquery/__init__.py | 8 ++- target_bigquery/processhandler.py | 93 +++++++++++++++++++++++++------ 2 files changed, 84 insertions(+), 17 deletions(-) diff --git a/target_bigquery/__init__.py b/target_bigquery/__init__.py index 1988bef..540874a 100644 --- a/target_bigquery/__init__.py +++ b/target_bigquery/__init__.py @@ -75,6 +75,10 @@ def main(): validate_records = config.get("validate_records", True) add_metadata_columns = config.get("add_metadata_columns", True) + # deduplication arguments + deduplication_property = config.get("deduplication_property", "") + deduplication_order = config.get("deduplication_order", "DESC") + # we can pass merge state option via CLI param merge_state_messages_cli = flags.merge_state_messages @@ -126,7 +130,9 @@ def main(): table_suffix=table_suffix, add_metadata_columns=add_metadata_columns, table_configs=table_configs, - max_cache=max_cache + max_cache=max_cache, + deduplication_property=deduplication_property, + deduplication_order=deduplication_order ) # write a state file diff --git a/target_bigquery/processhandler.py b/target_bigquery/processhandler.py index 16f2e96..51e8a52 100644 --- a/target_bigquery/processhandler.py +++ b/target_bigquery/processhandler.py @@ -34,6 +34,11 @@ def __init__(self, logger, **kwargs): # LoadJobProcessHandler kwargs self.truncate = kwargs.get("truncate", False) self.incremental = kwargs.get("incremental", False) + + # deduplication kwargs + self.deduplication_property = kwargs.get("deduplication_property", False) + self.deduplication_order = kwargs.get("deduplication_order", "DESC") + self.add_metadata_columns = kwargs.get("add_metadata_columns", True) self.validate_records = kwargs.get("validate_records", True) self.table_configs = kwargs.get("table_configs", {}) or {} @@ -226,6 +231,27 @@ def primary_key_condition(self, stream): return " and ".join(keys) #TODO: test it with multiple ids (an array of ids, if there are multiple key_properties in JSON schema) #TODO: test it with dupe ids in the data + def primary_keys(self, stream): + keys = [f"temp.{k}" for k in self.key_properties[stream]] + if len(keys) < 1: + raise Exception(f"No primary keys specified from the tap and Incremental option selected") + return " , ".join(keys) + + def partition_primary_keys(self, stream): + keys = [f"s.{k}" for k in self.key_properties[stream]] + if len(keys) < 1: + raise Exception(f"No primary partition keys specified from the tap and Incremental option selected") + return " , ".join(keys) + + def duplicate_condition(self): + keys = "and t." + self.deduplication_property + "=s." + self.deduplication_property + return keys + + def first_primary_key(self, stream): + keys = self.key_properties[stream][0] + if len(keys) < 1: + raise Exception(f"No first primary key specified from the tap and Incremental option selected") + return "t." + keys def _do_temp_table_based_load(self, rows): assert isinstance(rows, dict) @@ -269,23 +295,58 @@ def _do_temp_table_based_load(self, rows): self.logger.warning(f"INCREMENTAL replication method (MERGE SQL statement) is not recommended. It might result in loss of production data, because historical records get updated during the sync operation. Instead, we recommend using the APPEND replication method, which will preserve historical data.") table_id = f"{self.project_id}.{self.dataset.dataset_id}.{self.tables[stream]}" try: - self.client.get_table(table_id) - column_names = [x.name for x in self.bq_schemas[stream]] - - query ="""MERGE `{table}` t - USING `{temp_table}` s - ON {primary_key_condition} - WHEN MATCHED THEN - UPDATE SET {set_values} - WHEN NOT MATCHED THEN - INSERT ({new_cols}) VALUES ({cols}) - """.format(table=table_id, - temp_table=f"{self.project_id}.{self.dataset.dataset_id}.{tmp_table_name}", - primary_key_condition=self.primary_key_condition(stream), - set_values=', '.join(f'{c}=s.{c}' for c in column_names), - new_cols=', '.join(column_names), - cols=', '.join(f's.{c}' for c in column_names)) + self.client.get_table(table_id) + column_names = [f"`{x.name}`" for x in self.bq_schemas[stream]] + + if self.deduplication_property: + + copy_config = CopyJobConfig() + copy_config.write_disposition = WriteDisposition.WRITE_TRUNCATE + + query_duplicates = """CREATE OR REPLACE TABLE `{temp_table}` AS + with last_versions as ( + select {primary_keys} from `{temp_table}` temp + ), dedup as( + select s.*, + ROW_NUMBER() OVER (PARTITION BY {partition_keys} ORDER BY {partition_deduplication_property} {deduplication_order}) AS finish_rank + from `{temp_table}` s + left join last_versions t on {primary_key_condition} + where {first_primary_key} is not null), + final_table as( + SELECT * FROM dedup WHERE finish_rank = 1) + SELECT * EXCEPT (finish_rank) FROM final_table ; + """.format( + temp_table=f"{self.project_id}.{self.dataset.dataset_id}.{tmp_table_name}", + primary_keys=str(self.primary_keys(stream)), + partition_keys=str(self.partition_primary_keys(stream)), + temp_deduplication_property=str("temp." + str(self.deduplication_property)), + partition_deduplication_property=str("s." + str(self.deduplication_property)), + deduplication_property=str(self.deduplication_property), + primary_key_condition=self.primary_key_condition(stream), + duplicate_condition=self.duplicate_condition(), + deduplication_order=str(self.deduplication_order), + first_primary_key=self.first_primary_key(stream)) + + job_config = QueryJobConfig() + query_job = self.client.query(query_duplicates, job_config=job_config) + query_job.result() + + self.logger.info(f"Removed duplicates by attribute: {str(self.deduplication_property)} and {str(self.deduplication_order)} order.") + + query = """MERGE `{table}` t + USING `{temp_table}` s + ON {primary_key_condition} + WHEN MATCHED THEN + UPDATE SET {set_values} + WHEN NOT MATCHED THEN + INSERT ({new_cols}) VALUES ({cols}) + """.format(table=table_id, + temp_table=f"{self.project_id}.{self.dataset.dataset_id}.{tmp_table_name}", + primary_key_condition=self.primary_key_condition(stream), + set_values=', '.join(f'{c}=s.{c}' for c in column_names), + new_cols=', '.join(column_names), + cols=', '.join(f's.{c}' for c in column_names)) job_config = QueryJobConfig() query_job = self.client.query(query, job_config=job_config) query_job.result() From 2758c33c2fbf8b4b049d27fa8c2749433ca252ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20Perell=C3=B3?= Date: Tue, 19 Jul 2022 14:43:39 +0200 Subject: [PATCH 2/3] Update README.md --- README.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 19aae79..3ff2995 100644 --- a/README.md +++ b/README.md @@ -153,7 +153,13 @@ sample [target-config.json](/sample_config/target-config-exchange-rates-api.json * `truncate`: Deleting all previous rows and uploading the new ones to the table * `incremental`: **Upserting** new rows into the table, using the **primary key** given by the tap connector (if it finds an old row with same key, updates it. Otherwise it inserts the new row) - - WARNING: We do not recommend using `incremental` option (which uses `MERGE` SQL statement). It might result in loss of production data, because historical records get updated. Instead, we recommend using the `append` replication method, which will preserve historical data. +- WARNING: We do not recommend using `incremental` option (which uses `MERGE` SQL statement). It might result in loss of production data, because historical records get updated. Instead, we recommend using the `append` replication method, which will preserve historical data. +- There's also the option to set an additional parameter `deduplication_property` in order to deduplicate values. Based on an attribute (Typically a timestamp), a query will run before the loading part to BigQuery, removing all duplicates. + - And besides that, the possibility to choose the order of this parameter with `deduplication_order` with: + * **DESC**: Default option. Keeps the one with the bigger deduplication_property or a random element between those who have the bigger one. + * **ASC**: Keeps the one with the smaller deduplication_property or a random element between those who have the bigger one. + + Sample **target-config.json** file: From d0f205c1815a44450d9c0a2aab0c74355ee52fd9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20Perell=C3=B3?= Date: Tue, 19 Jul 2022 15:00:32 +0200 Subject: [PATCH 3/3] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3ff2995..e3998bd 100644 --- a/README.md +++ b/README.md @@ -153,7 +153,7 @@ sample [target-config.json](/sample_config/target-config-exchange-rates-api.json * `truncate`: Deleting all previous rows and uploading the new ones to the table * `incremental`: **Upserting** new rows into the table, using the **primary key** given by the tap connector (if it finds an old row with same key, updates it. Otherwise it inserts the new row) -- WARNING: We do not recommend using `incremental` option (which uses `MERGE` SQL statement). It might result in loss of production data, because historical records get updated. Instead, we recommend using the `append` replication method, which will preserve historical data. + - WARNING: We do not recommend using `incremental` option (which uses `MERGE` SQL statement). It might result in loss of production data, because historical records get updated. Instead, we recommend using the `append` replication method, which will preserve historical data. - There's also the option to set an additional parameter `deduplication_property` in order to deduplicate values. Based on an attribute (Typically a timestamp), a query will run before the loading part to BigQuery, removing all duplicates. - And besides that, the possibility to choose the order of this parameter with `deduplication_order` with: * **DESC**: Default option. Keeps the one with the bigger deduplication_property or a random element between those who have the bigger one.