Skip to content

Commit

Permalink
feat: safe_cast timestamp for bigquery destination (bizon-data#18)
Browse files Browse the repository at this point in the history
* fix: doc et test (#7)

* doc: example with kafka debezium (#8)

* doc: example with debezium

* fix: bigquery safe_cast timestamp parsing
  • Loading branch information
aballiet authored Dec 23, 2024
1 parent b138999 commit 4d7dc04
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 7 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ bizon run config.yml

Backend is the interface used by Bizon to store its state. It can be configured in the `backend` section of the configuration file. The following backends are supported:
- `sqlite`: In-memory SQLite database, useful for testing and development.
- `biguquery`: Google BigQuery backend, perfect for light setup & production.
- `bigquery`: Google BigQuery backend, perfect for light setup & production.
- `postgres`: PostgreSQL backend, for production use and frequent cursor updates.

## Queue configuration
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import tempfile
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from typing import List, Tuple, Type

import polars as pl
Expand Down Expand Up @@ -101,6 +102,16 @@ def append_rows_to_stream(
response = write_client.append_rows(iter([request]))
return response.code().name

def safe_cast_record_values(self, row: dict):
for col in self.config.record_schema:
if col.type in ["TIMESTAMP", "DATETIME"]:
if isinstance(row[col.name], int):
if row[col.name] > datetime(9999, 12, 31).timestamp():
row[col.name] = datetime.fromtimestamp(row[col.name]/1_000_000).strftime("%Y-%m-%d %H:%M:%S.%f")
else:
row[col.name] = datetime.fromtimestamp(row[col.name]).strftime("%Y-%m-%d %H:%M:%S.%f")
return row

@staticmethod
def to_protobuf_serialization(TableRowClass: Type[Message], row: dict) -> bytes:
"""Convert a row to a Protobuf serialization."""
Expand Down Expand Up @@ -132,7 +143,7 @@ def load_to_bigquery_via_streaming(self, df_destination_records: pl.DataFrame) -

if self.config.unnest:
serialized_rows = [
self.to_protobuf_serialization(TableRowClass=TableRow, row=row)
self.to_protobuf_serialization(TableRowClass=TableRow, row=self.safe_cast_record_values(row))
for row in df_destination_records["source_data"].str.json_decode().to_list()
]
else:
Expand Down
112 changes: 112 additions & 0 deletions bizon/connectors/sources/kafka/config/kafka_debezium.example.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
name: Kafka debezium messages to bigquery streaming

source:
name: kafka
stream: topic

sync_mode: full_refresh

force_ignore_checkpoint: true

topic: <TOPIC_NAME>

nb_bytes_schema_id: 8

batch_size: 1000
consumer_timeout: 10
bootstrap_servers: <BOOTSTRAP_SERVERS>
group_id: <GROUP_ID>

authentication:
type: basic

schema_registry_url: <SCHEMA_REGISTRY_URL>
schema_registry_username: <SCHEMA_REGISTRY_USERNAME>
schema_registry_password: <SCHEMA_REGISTRY_PASSWORD>

params:
username: <USERNAME>
password: <PASSWORD>

destination:
name: bigquery_streaming

config:
buffer_size: 50
bq_max_rows_per_request: 10000
buffer_flush_timeout: 30

table_id: <TABLE_ID>
dataset_id: <DATASET_ID>
dataset_location: US
project_id: <PROJECT_ID>

unnest: true

time_partitioning:
# Mandatory if unnested
field: __event_timestamp

record_schema:
- name: account_id
type: INTEGER
mode: REQUIRED

- name: team_id
type: INTEGER
mode: REQUIRED

- name: user_id
type: INTEGER
mode: REQUIRED

- name: __deleted
type: BOOLEAN
mode: NULLABLE

- name: __cluster
type: STRING
mode: NULLABLE

- name: __kafka_partition
type: INTEGER
mode: NULLABLE

- name: __kafka_offset
type: INTEGER
mode: NULLABLE

- name: __event_timestamp
type: TIMESTAMP
mode: NULLABLE

transforms:
- label: debezium
python: |
from datetime import datetime
cluster = data['value']['source']['name'].replace('_', '-')
partition = data['partition']
offset = data['offset']
kafka_timestamp = datetime.utcfromtimestamp(data['value']['source']['ts_ms'] / 1000).strftime('%Y-%m-%d %H:%M:%S.%f')
deleted = False
if data['value']['op'] == 'd':
data = data['value']['before']
deleted = True
else:
data = data['value']['after']
data['__deleted'] = deleted
data['__cluster'] = cluster
data['__kafka_partition'] = partition
data['__kafka_offset'] = offset
data['__event_timestamp'] = kafka_timestamp
engine:
queue:
type: python_queue
config:
max_nb_messages: 1000000
2 changes: 1 addition & 1 deletion bizon/connectors/sources/kafka/tests/kafka_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

if __name__ == "__main__":
runner = RunnerFactory.create_from_yaml(
filepath=os.path.abspath("bizon/sources/kafka/config/kafka_teams_users.yml")
filepath=os.path.abspath("bizon/connectors/sources/kafka/config/kafka.example.yml")
)
runner.run()
12 changes: 8 additions & 4 deletions bizon/transform/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ def __init__(self, transforms: list[TransformModel]):

def apply_transforms(self, df_source_records: pl.DataFrame) -> pl.DataFrame:
"""Apply transformation on df_source_records"""

# Process the transformations
for transform in self.transforms:

logger.debug(f"Applying transform {transform.label}")

# Create a function to be executed in the desired context
def my_transform(data: dict) -> str:
def my_transform(data: str) -> str:

data = json.loads(data)

# Start writing here
local_vars = {"data": data}
Expand All @@ -33,9 +36,10 @@ def my_transform(data: dict) -> str:
# Stop writing here
return json.dumps(local_vars["data"])

transformed_source_records = [
my_transform(row) for row in df_source_records["data"].str.json_decode().to_list()
]
transformed_source_records = []

for row in df_source_records["data"].to_list():
transformed_source_records.append(my_transform(row))

df_source_records = df_source_records.with_columns(
pl.Series("data", transformed_source_records, dtype=pl.String).alias("data")
Expand Down

0 comments on commit 4d7dc04

Please sign in to comment.