Skip to content

Commit

Permalink
doc: example with debezium
Browse files Browse the repository at this point in the history
  • Loading branch information
aballiet committed Dec 23, 2024
1 parent aa38887 commit 6e89b36
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 4 deletions.
112 changes: 112 additions & 0 deletions bizon/connectors/sources/kafka/config/kafka_debezium.example.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
name: Kafka debezium messages to bigquery streaming

source:
name: kafka
stream: topic

sync_mode: full_refresh

force_ignore_checkpoint: true

topic: <TOPIC_NAME>

nb_bytes_schema_id: 8

batch_size: 1000
consumer_timeout: 10
bootstrap_servers: <BOOTSTRAP_SERVERS>
group_id: <GROUP_ID>

authentication:
type: basic

schema_registry_url: <SCHEMA_REGISTRY_URL>
schema_registry_username: <SCHEMA_REGISTRY_USERNAME>
schema_registry_password: <SCHEMA_REGISTRY_PASSWORD>

params:
username: <USERNAME>
password: <PASSWORD>

destination:
name: bigquery_streaming

config:
buffer_size: 50
bq_max_rows_per_request: 10000
buffer_flush_timeout: 30

table_id: <TABLE_ID>
dataset_id: <DATASET_ID>
dataset_location: US
project_id: <PROJECT_ID>

unnest: true

time_partitioning:
# Mandatory if unnested
field: __event_timestamp

record_schema:
- name: account_id
type: INTEGER
mode: REQUIRED

- name: team_id
type: INTEGER
mode: REQUIRED

- name: user_id
type: INTEGER
mode: REQUIRED

- name: __deleted
type: BOOLEAN
mode: NULLABLE

- name: __cluster
type: STRING
mode: NULLABLE

- name: __kafka_partition
type: INTEGER
mode: NULLABLE

- name: __kafka_offset
type: INTEGER
mode: NULLABLE

- name: __event_timestamp
type: TIMESTAMP
mode: NULLABLE

transforms:
- label: debezium
python: |
from datetime import datetime
cluster = data['value']['source']['name'].replace('_', '-')
partition = data['partition']
offset = data['offset']
kafka_timestamp = datetime.utcfromtimestamp(data['value']['source']['ts_ms'] / 1000).strftime('%Y-%m-%d %H:%M:%S.%f')
deleted = False
if data['value']['op'] == 'd':
data = data['value']['before']
deleted = True
else:
data = data['value']['after']
data['__deleted'] = deleted
data['__cluster'] = cluster
data['__kafka_partition'] = partition
data['__kafka_offset'] = offset
data['__event_timestamp'] = kafka_timestamp
engine:
queue:
type: python_queue
config:
max_nb_messages: 1000000
12 changes: 8 additions & 4 deletions bizon/transform/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ def __init__(self, transforms: list[TransformModel]):

def apply_transforms(self, df_source_records: pl.DataFrame) -> pl.DataFrame:
"""Apply transformation on df_source_records"""

# Process the transformations
for transform in self.transforms:

logger.debug(f"Applying transform {transform.label}")

# Create a function to be executed in the desired context
def my_transform(data: dict) -> str:
def my_transform(data: str) -> str:

data = json.loads(data)

# Start writing here
local_vars = {"data": data}
Expand All @@ -33,9 +36,10 @@ def my_transform(data: dict) -> str:
# Stop writing here
return json.dumps(local_vars["data"])

transformed_source_records = [
my_transform(row) for row in df_source_records["data"].str.json_decode().to_list()
]
transformed_source_records = []

for row in df_source_records["data"].to_list():
transformed_source_records.append(my_transform(row))

df_source_records = df_source_records.with_columns(
pl.Series("data", transformed_source_records, dtype=pl.String).alias("data")
Expand Down

0 comments on commit 6e89b36

Please sign in to comment.