diff --git a/bizon/destinations/bigquery/src/config.py b/bizon/destinations/bigquery/src/config.py index 2db0faa..d03c3ff 100644 --- a/bizon/destinations/bigquery/src/config.py +++ b/bizon/destinations/bigquery/src/config.py @@ -1,11 +1,13 @@ from enum import Enum from typing import Literal, Optional +import polars as pl from pydantic import BaseModel, Field from bizon.destinations.config import ( AbstractDestinationConfig, AbstractDestinationDetailsConfig, + DestinationColumn, DestinationTypes, ) @@ -42,11 +44,37 @@ class BigQueryColumnMode(str, Enum): REPEATED = "REPEATED" -class BigQueryColumn(BaseModel): +BIGQUERY_TO_POLARS_TYPE_MAPPING = { + "STRING": pl.String, + "BYTES": pl.Binary, + "INTEGER": pl.Int64, + "INT64": pl.Int64, + "FLOAT": pl.Float64, + "FLOAT64": pl.Float64, + "NUMERIC": pl.Float64, # Can be refined for precision with Decimal128 if needed + "BIGNUMERIC": pl.Float64, # Similar to NUMERIC + "BOOLEAN": pl.Boolean, + "BOOL": pl.Boolean, + "TIMESTAMP": pl.String, # We use BigQuery internal parsing to convert to datetime + "DATE": pl.String, # We use BigQuery internal parsing to convert to datetime + "DATETIME": pl.String, # We use BigQuery internal parsing to convert to datetime + "TIME": pl.Time, + "GEOGRAPHY": pl.Object, # Polars doesn't natively support geography types + "ARRAY": pl.List, # Requires additional handling for element types + "STRUCT": pl.Struct, # TODO + "JSON": pl.Object, # TODO +} + + +class BigQueryColumn(DestinationColumn): name: str = Field(..., description="Name of the column") type: BigQueryColumnType = Field(..., description="Type of the column") mode: BigQueryColumnMode = Field(..., description="Mode of the column") - description: Optional[str] = Field(..., description="Description of the column") + description: Optional[str] = Field(None, description="Description of the column") + + @property + def polars_type(self): + return BIGQUERY_TO_POLARS_TYPE_MAPPING.get(self.type.upper()) class BigQueryAuthentication(BaseModel): @@ -87,5 +115,5 @@ class BigQueryConfigDetails(AbstractDestinationDetailsConfig): class BigQueryConfig(AbstractDestinationConfig): name: Literal[DestinationTypes.BIGQUERY] - buffer_size: Optional[int] = 2000 + buffer_size: Optional[int] = 400 config: BigQueryConfigDetails diff --git a/bizon/destinations/bigquery/src/destination.py b/bizon/destinations/bigquery/src/destination.py index d5ab493..d3f692d 100644 --- a/bizon/destinations/bigquery/src/destination.py +++ b/bizon/destinations/bigquery/src/destination.py @@ -1,5 +1,4 @@ import io -import json import os import tempfile import traceback @@ -17,7 +16,7 @@ from bizon.engine.backend.backend import AbstractBackend from bizon.source.config import SourceSyncModes -from .config import BigQueryConfigDetails +from .config import BigQueryColumn, BigQueryConfigDetails class BigQueryDestination(AbstractDestination): @@ -60,16 +59,30 @@ def temp_table_id(self) -> str: def get_bigquery_schema(self, df_destination_records: pl.DataFrame) -> List[bigquery.SchemaField]: - return [ - bigquery.SchemaField("_source_record_id", "STRING", mode="REQUIRED"), - bigquery.SchemaField("_source_timestamp", "TIMESTAMP", mode="REQUIRED"), - bigquery.SchemaField("_source_data", "STRING", mode="NULLABLE"), - bigquery.SchemaField("_bizon_extracted_at", "TIMESTAMP", mode="REQUIRED"), - bigquery.SchemaField( - "_bizon_loaded_at", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP()" - ), - bigquery.SchemaField("_bizon_id", "STRING", mode="REQUIRED"), - ] + # Case we unnest the data + if self.config.unnest: + return [ + bigquery.SchemaField( + col.name, + col.type, + mode=col.mode, + description=col.description, + ) + for col in self.config.record_schema + ] + + # Case we don't unnest the data + else: + return [ + bigquery.SchemaField("_source_record_id", "STRING", mode="REQUIRED"), + bigquery.SchemaField("_source_timestamp", "TIMESTAMP", mode="REQUIRED"), + bigquery.SchemaField("_source_data", "STRING", mode="NULLABLE"), + bigquery.SchemaField("_bizon_extracted_at", "TIMESTAMP", mode="REQUIRED"), + bigquery.SchemaField( + "_bizon_loaded_at", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP()" + ), + bigquery.SchemaField("_bizon_id", "STRING", mode="REQUIRED"), + ] def check_connection(self) -> bool: dataset_ref = DatasetReference(self.project_id, self.dataset_id) @@ -108,6 +121,24 @@ def convert_and_upload_to_buffer(self, df_destination_records: pl.DataFrame) -> raise NotImplementedError(f"Buffer format {self.buffer_format} is not supported") + @staticmethod + def unnest_data(df_destination_records: pl.DataFrame, record_schema: list[BigQueryColumn]) -> pl.DataFrame: + """Unnest the source_data field into separate columns""" + + # Check if the schema matches the expected schema + source_data_fields = pl.DataFrame(df_destination_records['source_data'].str.json_decode()).schema["source_data"].fields + + record_schema_fields = [col.name for col in record_schema] + + for field in source_data_fields: + assert field.name in record_schema_fields, f"Column {field.name} not found in BigQuery schema" + + # Parse the JSON and unnest the fields to polar type + return df_destination_records.select( + pl.col("source_data").str.json_path_match(f"$.{col.name}").cast(col.polars_type).alias(col.name) + for col in record_schema + ) + def load_to_bigquery(self, gcs_file: str, df_destination_records: pl.DataFrame): # We always partition by the loaded_at field diff --git a/bizon/destinations/bigquery_streaming/src/config.py b/bizon/destinations/bigquery_streaming/src/config.py index 87cb6fa..c8d9a7b 100644 --- a/bizon/destinations/bigquery_streaming/src/config.py +++ b/bizon/destinations/bigquery_streaming/src/config.py @@ -1,8 +1,9 @@ from enum import Enum from typing import Literal, Optional -from pydantic import BaseModel, Field, field_validator +from pydantic import BaseModel, Field +from bizon.destinations.bigquery.src.config import BigQueryColumn from bizon.destinations.config import ( AbstractDestinationConfig, AbstractDestinationDetailsConfig, @@ -34,8 +35,14 @@ class BigQueryStreamingConfigDetails(AbstractDestinationDetailsConfig): time_partitioning: Optional[TimePartitioning] = Field( default=TimePartitioning.DAY, description="BigQuery Time partitioning type" ) + time_partitioning_field: Optional[str] = Field( + "_bizon_loaded_at", description="Field to partition by. You can use a transformation to create this field." + ) authentication: Optional[BigQueryAuthentication] = None bq_max_rows_per_request: Optional[int] = Field(30000, description="Max rows per buffer streaming request.") + record_schema: Optional[list[BigQueryColumn]] = Field( + default=None, description="Schema for the records. Required if unnest is set to true." + ) class BigQueryStreamingConfig(AbstractDestinationConfig): diff --git a/bizon/destinations/bigquery_streaming/src/destination.py b/bizon/destinations/bigquery_streaming/src/destination.py index 8fe02d9..e40f538 100644 --- a/bizon/destinations/bigquery_streaming/src/destination.py +++ b/bizon/destinations/bigquery_streaming/src/destination.py @@ -12,6 +12,7 @@ ProtoRows, ProtoSchema, ) +from google.protobuf.json_format import ParseDict from google.protobuf.message import Message from bizon.common.models import SyncMetadata @@ -48,17 +49,29 @@ def table_id(self) -> str: def get_bigquery_schema(self) -> List[bigquery.SchemaField]: - # we keep raw data in the column source_data - return [ - bigquery.SchemaField("_source_record_id", "STRING", mode="REQUIRED"), - bigquery.SchemaField("_source_timestamp", "TIMESTAMP", mode="REQUIRED"), - bigquery.SchemaField("_source_data", "STRING", mode="NULLABLE"), - bigquery.SchemaField("_bizon_extracted_at", "TIMESTAMP", mode="REQUIRED"), - bigquery.SchemaField( - "_bizon_loaded_at", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP()" - ), - bigquery.SchemaField("_bizon_id", "STRING", mode="REQUIRED"), - ] + if self.config.unnest: + return [ + bigquery.SchemaField( + col.name, + col.type, + mode=col.mode, + description=col.description, + ) + for col in self.config.record_schema + ] + + # Case we don't unnest the data + else: + return [ + bigquery.SchemaField("_source_record_id", "STRING", mode="REQUIRED"), + bigquery.SchemaField("_source_timestamp", "TIMESTAMP", mode="REQUIRED"), + bigquery.SchemaField("_source_data", "STRING", mode="NULLABLE"), + bigquery.SchemaField("_bizon_extracted_at", "TIMESTAMP", mode="REQUIRED"), + bigquery.SchemaField( + "_bizon_loaded_at", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP()" + ), + bigquery.SchemaField("_bizon_id", "STRING", mode="REQUIRED"), + ] def check_connection(self) -> bool: dataset_ref = DatasetReference(self.project_id, self.dataset_id) @@ -90,14 +103,8 @@ def append_rows_to_stream( @staticmethod def to_protobuf_serialization(TableRowClass: Type[Message], row: dict) -> bytes: - """Convert a row to a protobuf serialization""" - record = TableRowClass() - record._bizon_id = row["bizon_id"] - record._bizon_extracted_at = row["bizon_extracted_at"].strftime("%Y-%m-%d %H:%M:%S.%f") - record._bizon_loaded_at = row["bizon_loaded_at"].strftime("%Y-%m-%d %H:%M:%S.%f") - record._source_record_id = row["source_record_id"] - record._source_timestamp = row["source_timestamp"].strftime("%Y-%m-%d %H:%M:%S.%f") - record._source_data = row["source_data"] + """Convert a row to a Protobuf serialization.""" + record = ParseDict(row, TableRowClass()) return record.SerializeToString() def load_to_bigquery_via_streaming(self, df_destination_records: pl.DataFrame) -> str: @@ -107,7 +114,9 @@ def load_to_bigquery_via_streaming(self, df_destination_records: pl.DataFrame) - # Create table if it doesnt exist schema = self.get_bigquery_schema() table = bigquery.Table(self.table_id, schema=schema) - time_partitioning = TimePartitioning(field="_bizon_loaded_at", type_=self.config.time_partitioning) + time_partitioning = TimePartitioning( + field=self.config.time_partitioning_field, type_=self.config.time_partitioning + ) table.time_partitioning = time_partitioning table = self.bq_client.create_table(table, exists_ok=True) @@ -119,12 +128,29 @@ def load_to_bigquery_via_streaming(self, df_destination_records: pl.DataFrame) - stream_name = f"{parent}/_default" # Generating the protocol buffer representation of the message descriptor. - proto_schema, TableRow = get_proto_schema_and_class(clustering_keys) + proto_schema, TableRow = get_proto_schema_and_class(schema, clustering_keys) - serialized_rows = [ - self.to_protobuf_serialization(TableRowClass=TableRow, row=row) - for row in df_destination_records.iter_rows(named=True) - ] + if self.config.unnest: + serialized_rows = [ + self.to_protobuf_serialization(TableRowClass=TableRow, row=row) + for row in df_destination_records["source_data"].str.json_decode().to_list() + ] + else: + df_destination_records = df_destination_records.rename( + { + "bizon_id": "_bizon_id", + "bizon_extracted_at": "_bizon_extracted_at", + "bizon_loaded_at": "_bizon_loaded_at", + "source_record_id": "_source_record_id", + "source_timestamp": "_source_timestamp", + "source_data": "_source_data", + } + ) + + serialized_rows = [ + self.to_protobuf_serialization(TableRowClass=TableRow, row=row) + for row in df_destination_records.iter_rows(named=True) + ] results = [] with ThreadPoolExecutor() as executor: diff --git a/bizon/destinations/bigquery_streaming/src/proto_utils.py b/bizon/destinations/bigquery_streaming/src/proto_utils.py index 9ba9616..27419e8 100644 --- a/bizon/destinations/bigquery_streaming/src/proto_utils.py +++ b/bizon/destinations/bigquery_streaming/src/proto_utils.py @@ -1,5 +1,6 @@ from typing import List, Tuple, Type +from google.cloud.bigquery import SchemaField from google.cloud.bigquery_storage_v1.types import ProtoSchema from google.protobuf.descriptor_pb2 import ( DescriptorProto, @@ -11,7 +12,30 @@ from google.protobuf.message_factory import GetMessageClassesForFiles -def get_proto_schema_and_class(clustering_keys: List[str] = None) -> Tuple[ProtoSchema, Type[Message]]: +def map_bq_type_to_field_descriptor(bq_type: str) -> int: + """Map BigQuery type to Protobuf FieldDescriptorProto type.""" + type_map = { + "STRING": FieldDescriptorProto.TYPE_STRING, # STRING -> TYPE_STRING + "BYTES": FieldDescriptorProto.TYPE_BYTES, # BYTES -> TYPE_BYTES + "INTEGER": FieldDescriptorProto.TYPE_INT64, # INTEGER -> TYPE_INT64 + "FLOAT": FieldDescriptorProto.TYPE_DOUBLE, # FLOAT -> TYPE_DOUBLE + "NUMERIC": FieldDescriptorProto.TYPE_STRING, # NUMERIC -> TYPE_STRING (use string to handle precision) + "BIGNUMERIC": FieldDescriptorProto.TYPE_STRING, # BIGNUMERIC -> TYPE_STRING + "BOOLEAN": FieldDescriptorProto.TYPE_BOOL, # BOOLEAN -> TYPE_BOOL + "DATE": FieldDescriptorProto.TYPE_STRING, # DATE -> TYPE_STRING + "DATETIME": FieldDescriptorProto.TYPE_STRING, # DATETIME -> TYPE_STRING + "TIME": FieldDescriptorProto.TYPE_STRING, # TIME -> TYPE_STRING + "TIMESTAMP": FieldDescriptorProto.TYPE_INT64, # TIMESTAMP -> TYPE_INT64 (Unix epoch time) + "RECORD": FieldDescriptorProto.TYPE_MESSAGE, # RECORD -> TYPE_MESSAGE (nested message) + } + + return type_map.get(bq_type, FieldDescriptorProto.TYPE_STRING) # Default to TYPE_STRING + + +def get_proto_schema_and_class( + bq_schema: List[SchemaField], clustering_keys: List[str] = None +) -> Tuple[ProtoSchema, Type[Message]]: + """Generate a ProtoSchema and a TableRow class for unnested BigQuery schema.""" # Define the FileDescriptorProto file_descriptor_proto = FileDescriptorProto() file_descriptor_proto.name = "dynamic.proto" @@ -26,32 +50,14 @@ def get_proto_schema_and_class(clustering_keys: List[str] = None) -> Tuple[Proto # https://stackoverflow.com/questions/70489919/protobuf-type-for-bigquery-timestamp-field fields = [ - {"name": "_bizon_id", "type": FieldDescriptorProto.TYPE_STRING, "label": FieldDescriptorProto.LABEL_REQUIRED}, - { - "name": "_bizon_extracted_at", - "type": FieldDescriptorProto.TYPE_STRING, - "label": FieldDescriptorProto.LABEL_REQUIRED, - }, - { - "name": "_bizon_loaded_at", - "type": FieldDescriptorProto.TYPE_STRING, - "label": FieldDescriptorProto.LABEL_REQUIRED, - }, - { - "name": "_source_record_id", - "type": FieldDescriptorProto.TYPE_STRING, - "label": FieldDescriptorProto.LABEL_REQUIRED, - }, - { - "name": "_source_timestamp", - "type": FieldDescriptorProto.TYPE_STRING, - "label": FieldDescriptorProto.LABEL_REQUIRED, - }, { - "name": "_source_data", - "type": FieldDescriptorProto.TYPE_STRING, - "label": FieldDescriptorProto.LABEL_OPTIONAL, - }, + "name": col.name, + "type": map_bq_type_to_field_descriptor(col.field_type), + "label": ( + FieldDescriptorProto.LABEL_REQUIRED if col.mode == "REQUIRED" else FieldDescriptorProto.LABEL_OPTIONAL + ), + } + for col in bq_schema ] if clustering_keys: diff --git a/bizon/destinations/config.py b/bizon/destinations/config.py index f6bff4e..653d1d0 100644 --- a/bizon/destinations/config.py +++ b/bizon/destinations/config.py @@ -15,8 +15,7 @@ class DestinationTypes(str, Enum): class DestinationColumn(BaseModel, ABC): name: str = Field(..., description="Name of the column") type: str = Field(..., description="Type of the column") - mode: Optional[str] = Field(..., description="Mode of the column") - description: Optional[str] = Field(..., description="Description of the column") + description: Optional[str] = Field(None, description="Description of the column") class AbstractDestinationDetailsConfig(BaseModel): diff --git a/tests/destinations/bigquery/test_unnest.py b/tests/destinations/bigquery/test_unnest.py new file mode 100644 index 0000000..59ca896 --- /dev/null +++ b/tests/destinations/bigquery/test_unnest.py @@ -0,0 +1,61 @@ +from datetime import datetime + +import polars as pl +import pytest + +from bizon.destinations.bigquery.src.config import BigQueryColumn +from bizon.destinations.bigquery.src.destination import BigQueryDestination +from bizon.destinations.models import destination_record_schema + + +def test_unnest_records_to_bigquery(): + df_destination_records = pl.DataFrame( + data={ + "source_record_id": ["1"], + "source_timestamp": [datetime.now()], + "source_data": ['{"id": 1, "name": "Alice", "created_at": "2021-01-01 00:00:00"}'], + "bizon_extracted_at": [datetime.now()], + "bizon_loaded_at": [datetime.now()], + "bizon_id": ["1"], + }, + schema=destination_record_schema, + ) + + assert df_destination_records.height > 0 + + res = BigQueryDestination.unnest_data( + df_destination_records=df_destination_records, + record_schema=[ + BigQueryColumn(name="id", type="INTEGER", mode="REQUIRED"), + BigQueryColumn(name="name", type="STRING", mode="REQUIRED"), + BigQueryColumn(name="created_at", type="DATETIME", mode="REQUIRED"), + ], + ) + + assert res.height == 1 + + +def test_unnest_records_to_bigquery_with_invalid_data(): + df_destination_records = pl.DataFrame( + data={ + "source_record_id": ["1"], + "source_timestamp": [datetime.now()], + "source_data": ['{"id": 1, "name": "Alice", "created_at": "2021-01-01 00:00:00", "cookies": "chocolate"}'], + "bizon_extracted_at": [datetime.now()], + "bizon_loaded_at": [datetime.now()], + "bizon_id": ["1"], + }, + schema=destination_record_schema, + ) + + # We raise exception as the data has an extra column compared to BigQuery schema + + with pytest.raises(AssertionError) as e: + res = BigQueryDestination.unnest_data( + df_destination_records=df_destination_records, + record_schema=[ + BigQueryColumn(name="id", type="INTEGER", mode="REQUIRED"), + BigQueryColumn(name="name", type="STRING", mode="REQUIRED"), + BigQueryColumn(name="created_at", type="DATETIME", mode="REQUIRED"), + ], + ) diff --git a/tests/destinations/bigquery_streaming/test_bigquery_streaming_client.py b/tests/destinations/bigquery_streaming/test_bigquery_streaming_client.py index a60357e..1308f73 100644 --- a/tests/destinations/bigquery_streaming/test_bigquery_streaming_client.py +++ b/tests/destinations/bigquery_streaming/test_bigquery_streaming_client.py @@ -1,9 +1,13 @@ +import json import os from datetime import datetime import polars as pl import pytest +from dotenv import load_dotenv from faker import Faker +from google.protobuf.json_format import ParseError +from google.protobuf.message import EncodeError from bizon.common.models import SyncMetadata from bizon.destinations.bigquery_streaming.src.config import ( @@ -14,11 +18,14 @@ from bizon.destinations.destination import DestinationFactory from bizon.destinations.models import destination_record_schema +load_dotenv() + + fake = Faker("en_US") -TEST_PROJECT_ID = "my_project" -TEST_TABLE_ID = "test_fake_records" +TEST_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "test_project") +TEST_TABLE_ID = "test_fake_records_streaming" TEST_DATASET_ID = "bizon_test" TEST_BUFFER_BUCKET = "bizon-buffer" @@ -77,3 +84,223 @@ def test_streaming_records_to_bigquery(my_backend_config, sync_metadata_stream): assert success is True assert error_msg == "" + + +@pytest.mark.skipif( + os.getenv("POETRY_ENV_TEST") == "CI", + reason="Skipping tests that require a BigQuery database", +) +def test_streaming_unnested_records(my_backend_config, sync_metadata_stream): + bigquery_config = BigQueryStreamingConfig( + name=DestinationTypes.BIGQUERY_STREAMING, + config=BigQueryStreamingConfigDetails( + project_id=TEST_PROJECT_ID, + dataset_id=TEST_DATASET_ID, + table_id=TEST_TABLE_ID, + unnest=True, + time_partitioning_field="created_at", + record_schema=[ + { + "name": "id", + "type": "INTEGER", + "mode": "REQUIRED", + }, + { + "name": "name", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "name": "created_at", + "type": "DATETIME", + "mode": "REQUIRED", + }, + ], + ), + ) + + bq_destination = DestinationFactory().get_destination( + sync_metadata=sync_metadata_stream, config=bigquery_config, backend=my_backend_config + ) + + # Import here to not throw auth errors when running tests + from bizon.destinations.bigquery_streaming.src.destination import ( + BigQueryStreamingDestination, + ) + + assert isinstance(bq_destination, BigQueryStreamingDestination) + + records = [ + {"id": 1, "name": "Alice", "created_at": "2021-01-01 00:00:00"}, + {"id": 2, "name": "Bob", "created_at": "2021-01-01 00:00:00"}, + ] + + df_unnested_records = pl.DataFrame( + { + "bizon_id": ["id_1", "id_2"], + "bizon_extracted_at": [datetime(2024, 12, 5, 12, 0), datetime(2024, 12, 5, 13, 0)], + "bizon_loaded_at": [datetime(2024, 12, 5, 12, 30), datetime(2024, 12, 5, 13, 30)], + "source_record_id": ["record_1", "record_2"], + "source_timestamp": [datetime(2024, 12, 5, 11, 30), datetime(2024, 12, 5, 12, 30)], + "source_data": [json.dumps(record) for record in records], + }, + schema=destination_record_schema, + ) + + success, error_msg = bq_destination.write_records(df_destination_records=df_unnested_records) + + assert success is True + assert error_msg == "" + + +@pytest.mark.skipif( + os.getenv("POETRY_ENV_TEST") == "CI", + reason="Skipping tests that require a BigQuery database", +) +def test_error_on_added_column(my_backend_config, sync_metadata_stream): + bigquery_config = BigQueryStreamingConfig( + name=DestinationTypes.BIGQUERY_STREAMING, + config=BigQueryStreamingConfigDetails( + project_id=TEST_PROJECT_ID, + dataset_id=TEST_DATASET_ID, + table_id=TEST_TABLE_ID, + unnest=True, + time_partitioning_field="created_at", + record_schema=[ + { + "name": "id", + "type": "INTEGER", + "mode": "REQUIRED", + }, + { + "name": "name", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "name": "created_at", + "type": "DATETIME", + "mode": "REQUIRED", + }, + ], + ), + ) + + bq_destination = DestinationFactory().get_destination( + sync_metadata=sync_metadata_stream, config=bigquery_config, backend=my_backend_config + ) + + # Insert proper records + records = [ + {"id": 1, "name": "Alice", "created_at": "2021-01-01 00:00:00"}, + {"id": 2, "name": "Bob", "created_at": "2021-01-01 00:00:00"}, + ] + df_unnested_records = pl.DataFrame( + { + "bizon_id": ["id_1", "id_2"], + "bizon_extracted_at": [datetime(2024, 12, 5, 12, 0), datetime(2024, 12, 5, 13, 0)], + "bizon_loaded_at": [datetime(2024, 12, 5, 12, 30), datetime(2024, 12, 5, 13, 30)], + "source_record_id": ["record_1", "record_2"], + "source_timestamp": [datetime(2024, 12, 5, 11, 30), datetime(2024, 12, 5, 12, 30)], + "source_data": [json.dumps(record) for record in records], + }, + schema=destination_record_schema, + ) + + bq_destination.write_records(df_destination_records=df_unnested_records) + + # Try to insert a new record with an added column + + new_column_in_record = {"id": 3, "name": "Charlie", "last_name": "Chaplin", "created_at": "2021-01-01 00:00:00"} + + df_new = pl.DataFrame( + { + "bizon_id": ["id_3"], + "bizon_extracted_at": [datetime(2024, 12, 5, 12, 0)], + "bizon_loaded_at": [datetime(2024, 12, 5, 12, 30)], + "source_record_id": ["record_3"], + "source_timestamp": [datetime(2024, 12, 5, 11, 30)], + "source_data": [json.dumps(new_column_in_record)], + }, + schema=destination_record_schema, + ) + + # The call should raise an error because the new record has an extra column + with pytest.raises(ParseError): + bq_destination.write_records(df_destination_records=df_new) + + +@pytest.mark.skipif( + os.getenv("POETRY_ENV_TEST") == "CI", + reason="Skipping tests that require a BigQuery database", +) +def test_error_on_deleted_column(my_backend_config, sync_metadata_stream): + bigquery_config = BigQueryStreamingConfig( + name=DestinationTypes.BIGQUERY_STREAMING, + config=BigQueryStreamingConfigDetails( + project_id=TEST_PROJECT_ID, + dataset_id=TEST_DATASET_ID, + table_id=TEST_TABLE_ID, + unnest=True, + time_partitioning_field="created_at", + record_schema=[ + { + "name": "id", + "type": "INTEGER", + "mode": "REQUIRED", + }, + { + "name": "name", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "name": "created_at", + "type": "DATETIME", + "mode": "REQUIRED", + }, + ], + ), + ) + + bq_destination = DestinationFactory().get_destination( + sync_metadata=sync_metadata_stream, config=bigquery_config, backend=my_backend_config + ) + + # Insert proper records + records = [ + {"id": 1, "name": "Alice", "created_at": "2021-01-01 00:00:00"}, + {"id": 2, "name": "Bob", "created_at": "2021-01-01 00:00:00"}, + ] + df_unnested_records = pl.DataFrame( + { + "bizon_id": ["id_1", "id_2"], + "bizon_extracted_at": [datetime(2024, 12, 5, 12, 0), datetime(2024, 12, 5, 13, 0)], + "bizon_loaded_at": [datetime(2024, 12, 5, 12, 30), datetime(2024, 12, 5, 13, 30)], + "source_record_id": ["record_1", "record_2"], + "source_timestamp": [datetime(2024, 12, 5, 11, 30), datetime(2024, 12, 5, 12, 30)], + "source_data": [json.dumps(record) for record in records], + }, + schema=destination_record_schema, + ) + + bq_destination.write_records(df_destination_records=df_unnested_records) + + # Try to insert a new record with a deleted column + new_column_in_record = {"id": 3, "created_at": "2021-01-01 00:00:00"} + + df_new = pl.DataFrame( + { + "bizon_id": ["id_3"], + "bizon_extracted_at": [datetime(2024, 12, 5, 12, 0)], + "bizon_loaded_at": [datetime(2024, 12, 5, 12, 30)], + "source_record_id": ["record_3"], + "source_timestamp": [datetime(2024, 12, 5, 11, 30)], + "source_data": [json.dumps(new_column_in_record)], + }, + schema=destination_record_schema, + ) + + # The call should raise an error because the new record has a missing column + with pytest.raises(EncodeError): + bq_destination.write_records(df_destination_records=df_new)