Skip to content

Commit

Permalink
Merge pull request #1 from gorgias/antoineballiet/daae-4566-create-tr…
Browse files Browse the repository at this point in the history
…ansfo-layer-to-implement-gorgias-transform

feat: implement transform module with polars
  • Loading branch information
aballiet authored Dec 16, 2024
2 parents 3ee32af + 26cdbeb commit cdaed9f
Show file tree
Hide file tree
Showing 21 changed files with 357 additions and 190 deletions.
8 changes: 7 additions & 1 deletion bizon/common/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Union
from typing import Optional, Union

from pydantic import BaseModel, ConfigDict, Field

Expand All @@ -8,6 +8,7 @@
from bizon.destinations.logger.src.config import LoggerConfig
from bizon.engine.config import EngineConfig
from bizon.source.config import SourceConfig, SourceSyncModes
from bizon.transform.config import TransformModel


class BizonConfig(BaseModel):
Expand All @@ -23,6 +24,11 @@ class BizonConfig(BaseModel):
default=...,
)

transforms: Optional[list[TransformModel]] = Field(
description="List of transformations to apply to the source data",
default=[],
)

destination: Union[
BigQueryConfig,
BigQueryStreamingConfig,
Expand Down
54 changes: 47 additions & 7 deletions bizon/destinations/bigquery/src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,33 @@ class TimePartitioning(str, Enum):
YEAR = "YEAR"


class BigQueryColumnType(str, Enum):
BOOLEAN = "BOOLEAN"
BYTES = "BYTES"
DATE = "DATE"
DATETIME = "DATETIME"
FLOAT = "FLOAT"
GEOGRAPHY = "GEOGRAPHY"
INTEGER = "INTEGER"
RECORD = "RECORD"
STRING = "STRING"
TIME = "TIME"
TIMESTAMP = "TIMESTAMP"


class BigQueryColumnMode(str, Enum):
NULLABLE = "NULLABLE"
REQUIRED = "REQUIRED"
REPEATED = "REPEATED"


class BigQueryColumn(BaseModel):
name: str = Field(..., description="Name of the column")
type: BigQueryColumnType = Field(..., description="Type of the column")
mode: BigQueryColumnMode = Field(..., description="Mode of the column")
description: Optional[str] = Field(..., description="Description of the column")


class BigQueryAuthentication(BaseModel):
service_account_key: str = Field(
description="Service Account Key JSON string. If empty it will be infered",
Expand All @@ -30,18 +57,31 @@ class BigQueryAuthentication(BaseModel):


class BigQueryConfigDetails(AbstractDestinationDetailsConfig):
project_id: str
dataset_id: str
dataset_location: Optional[str] = "US"

# Table details
project_id: str = Field(..., description="BigQuery Project ID")
dataset_id: str = Field(..., description="BigQuery Dataset ID")
table_id: Optional[str] = Field(
default=None, description="Table ID, if not provided it will be inferred from source name"
default=None,
description="Table ID, if not provided it will be inferred from source name",
)
gcs_buffer_bucket: str
gcs_buffer_format: Optional[GCSBufferFormat] = GCSBufferFormat.PARQUET

time_partitioning: Optional[TimePartitioning] = Field(
dataset_location: str = Field(default="US", description="BigQuery Dataset location")

# GCS Buffer
gcs_buffer_bucket: str = Field(..., description="GCS Buffer bucket")
gcs_buffer_format: GCSBufferFormat = Field(default=GCSBufferFormat.PARQUET, description="GCS Buffer format")

# Time partitioning
time_partitioning: TimePartitioning = Field(
default=TimePartitioning.DAY, description="BigQuery Time partitioning type"
)

# Schema for unnesting
record_schema: Optional[list[BigQueryColumn]] = Field(
default=None, description="Schema for the records. Required if unnest is set to true."
)

authentication: Optional[BigQueryAuthentication] = None


Expand Down
41 changes: 10 additions & 31 deletions bizon/destinations/bigquery/src/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from loguru import logger

from bizon.common.models import SyncMetadata
from bizon.destinations.config import NormalizationType
from bizon.destinations.destination import AbstractDestination
from bizon.engine.backend.backend import AbstractBackend
from bizon.source.config import SourceSyncModes
Expand Down Expand Up @@ -61,36 +60,16 @@ def temp_table_id(self) -> str:

def get_bigquery_schema(self, df_destination_records: pl.DataFrame) -> List[bigquery.SchemaField]:

# we keep raw data in the column source_data
if self.config.normalization.type == NormalizationType.NONE:
return [
bigquery.SchemaField("_source_record_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("_source_timestamp", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("_source_data", "STRING", mode="NULLABLE"),
bigquery.SchemaField("_bizon_extracted_at", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField(
"_bizon_loaded_at", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP()"
),
bigquery.SchemaField("_bizon_id", "STRING", mode="REQUIRED"),
]

# If normalization is tabular, we parse key / value pairs to columns
elif self.config.normalization.type == NormalizationType.TABULAR:

# We use the first record to infer the schema of tabular data (key / value pairs)
source_data_keys = list(json.loads(df_destination_records["source_data"][0]).keys())

return [bigquery.SchemaField(key, "STRING", mode="NULLABLE") for key in source_data_keys] + [
bigquery.SchemaField("_source_record_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("_source_timestamp", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("_bizon_extracted_at", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField(
"_bizon_loaded_at", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP()"
),
bigquery.SchemaField("_bizon_id", "STRING", mode="REQUIRED"),
]

raise NotImplementedError(f"Normalization type {self.config.normalization.type} is not supported")
return [
bigquery.SchemaField("_source_record_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("_source_timestamp", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("_source_data", "STRING", mode="NULLABLE"),
bigquery.SchemaField("_bizon_extracted_at", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField(
"_bizon_loaded_at", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP()"
),
bigquery.SchemaField("_bizon_id", "STRING", mode="REQUIRED"),
]

def check_connection(self) -> bool:
dataset_ref = DatasetReference(self.project_id, self.dataset_id)
Expand Down
24 changes: 10 additions & 14 deletions bizon/destinations/bigquery_streaming/src/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from google.protobuf.message import Message

from bizon.common.models import SyncMetadata
from bizon.destinations.config import NormalizationType
from bizon.destinations.destination import AbstractDestination
from bizon.engine.backend.backend import AbstractBackend

Expand Down Expand Up @@ -50,19 +49,16 @@ def table_id(self) -> str:
def get_bigquery_schema(self) -> List[bigquery.SchemaField]:

# we keep raw data in the column source_data
if self.config.normalization.type == NormalizationType.NONE:
return [
bigquery.SchemaField("_source_record_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("_source_timestamp", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("_source_data", "STRING", mode="NULLABLE"),
bigquery.SchemaField("_bizon_extracted_at", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField(
"_bizon_loaded_at", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP()"
),
bigquery.SchemaField("_bizon_id", "STRING", mode="REQUIRED"),
]

raise NotImplementedError(f"Normalization type {self.config.normalization.type} is not supported")
return [
bigquery.SchemaField("_source_record_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("_source_timestamp", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("_source_data", "STRING", mode="NULLABLE"),
bigquery.SchemaField("_bizon_extracted_at", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField(
"_bizon_loaded_at", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP()"
),
bigquery.SchemaField("_bizon_id", "STRING", mode="REQUIRED"),
]

def check_connection(self) -> bool:
dataset_ref = DatasetReference(self.project_id, self.dataset_id)
Expand Down
29 changes: 19 additions & 10 deletions bizon/destinations/config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from abc import ABC
from enum import Enum
from typing import Optional

Expand All @@ -11,29 +12,37 @@ class DestinationTypes(str, Enum):
FILE = "file"


class NormalizationType(str, Enum):
TABULAR = "tabular" # Parse key / value pairs to columns
NONE = "none" # No normalization, raw data is stored
DEBEZIUM = "debezium" # Debezium normalization
class DestinationColumn(BaseModel, ABC):
name: str = Field(..., description="Name of the column")
type: str = Field(..., description="Type of the column")
mode: Optional[str] = Field(..., description="Mode of the column")
description: Optional[str] = Field(..., description="Description of the column")


class NormalizationConfig(BaseModel):
type: NormalizationType = Field(description="Normalization type")
class AbstractDestinationDetailsConfig(BaseModel):

# Forbid extra keys in the model
model_config = ConfigDict(extra="forbid")

class AbstractDestinationDetailsConfig(BaseModel):
buffer_size: int = Field(
default=50,
description="Buffer size in Mb for the destination. Set to 0 to disable and write directly to the destination.",
)

buffer_flush_timeout: int = Field(
default=600,
description="Maximum time in seconds for buffering after which the records will be written to the destination. Set to 0 to deactivate the timeout buffer check.", # noqa
)
normalization: Optional[NormalizationConfig] = Field(
description="Normalization configuration, by default no normalization is applied",
default=NormalizationConfig(type=NormalizationType.NONE),

unnest: Optional[bool] = Field(
default=False,
description="Unnest the data before writing to the destination. Schema should be provided in the model_config.",
)

record_schema: Optional[list[DestinationColumn]] = Field(
default=None, description="Schema for the records. Required if unnest is set to true."
)

authentication: Optional[BaseModel] = Field(
description="Authentication configuration for the destination, if needed", default=None
)
Expand Down
6 changes: 5 additions & 1 deletion bizon/destinations/logger/src/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
class LoggerDestination(AbstractDestination):

def __init__(self, sync_metadata: SyncMetadata, config: LoggerDestinationConfig, backend: AbstractBackend):
super().__init__(sync_metadata, config, backend)
super().__init__(
sync_metadata=sync_metadata,
config=config,
backend=backend,
)

def check_connection(self) -> bool:
return True
Expand Down
4 changes: 3 additions & 1 deletion bizon/engine/backend/adapters/sqlalchemy/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ def _add_and_commit(self, obj, session: Optional[Session] = None):

def _execute(self, select: Select, session: Optional[Session] = None) -> Result:
session = session or self.session
return session.execute(select)
result = session.execute(select)
session.commit()
return result

#### STREAM JOB ####

Expand Down
5 changes: 3 additions & 2 deletions bizon/engine/pipeline/consumer.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from abc import ABC, abstractmethod
from enum import Enum

from bizon.destinations.destination import AbstractDestination
from bizon.engine.queue.config import AbstractQueueConfig
from bizon.transform.transform import Transform


class AbstractQueueConsumer(ABC):
def __init__(self, config: AbstractQueueConfig, destination: AbstractDestination):
def __init__(self, config: AbstractQueueConfig, destination: AbstractDestination, transform: Transform):
self.config = config
self.destination = destination
self.transform = transform

@abstractmethod
def run(self):
Expand Down
16 changes: 12 additions & 4 deletions bizon/engine/queue/adapters/python_queue/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,31 @@
from bizon.destinations.destination import AbstractDestination
from bizon.engine.pipeline.consumer import AbstractQueueConsumer
from bizon.engine.queue.queue import QUEUE_TERMINATION, AbstractQueue, QueueMessage
from bizon.transform.transform import Transform

from .config import PythonQueueConfig


class PythonQueueConsumer(AbstractQueueConsumer):
def __init__(self, config: PythonQueueConfig, queue: AbstractQueue, destination: AbstractDestination):
super().__init__(config, destination=destination)
def __init__(
self, config: PythonQueueConfig, queue: AbstractQueue, destination: AbstractDestination, transform: Transform
):
super().__init__(config, destination=destination, transform=transform)
self.queue = queue

def run(self) -> None:
while True:

# Retrieve the message from the queue
queue_message: QueueMessage = self.queue.get()

# Apply the transformation
df_source_records = self.transform.apply_transforms(df_source_records=queue_message.df_source_records)

if queue_message.signal == QUEUE_TERMINATION:
logger.info("Received termination signal, waiting for destination to close gracefully ...")
self.destination.write_records_and_update_cursor(
df_source_records=queue_message.df_source_records,
df_source_records=df_source_records,
iteration=queue_message.iteration,
extracted_at=queue_message.extracted_at,
pagination=queue_message.pagination,
Expand All @@ -28,7 +36,7 @@ def run(self) -> None:
break

self.destination.write_records_and_update_cursor(
df_source_records=queue_message.df_source_records,
df_source_records=df_source_records,
iteration=queue_message.iteration,
extracted_at=queue_message.extracted_at,
pagination=queue_message.pagination,
Expand Down
5 changes: 3 additions & 2 deletions bizon/engine/queue/adapters/python_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
QueueMessage,
)
from bizon.source.models import SourceIteration
from bizon.transform.transform import Transform

from .config import PythonQueueConfigDetails
from .consumer import PythonQueueConsumer
Expand All @@ -31,8 +32,8 @@ def connect(self):
# No connection to establish for PythonQueue
pass

def get_consumer(self, destination: AbstractDestination) -> AbstractQueueConsumer:
return PythonQueueConsumer(config=self.config, queue=self.queue, destination=destination)
def get_consumer(self, destination: AbstractDestination, transform: Transform) -> AbstractQueueConsumer:
return PythonQueueConsumer(config=self.config, queue=self.queue, destination=destination, transform=transform)

def put_queue_message(self, queue_message: QueueMessage):
if not self.queue.full():
Expand Down
3 changes: 2 additions & 1 deletion bizon/engine/queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from bizon.destinations.destination import AbstractDestination
from bizon.engine.pipeline.consumer import AbstractQueueConsumer
from bizon.source.models import SourceIteration, source_record_schema
from bizon.transform.transform import Transform

from .config import AbastractQueueConfigDetails, AbstractQueueConfig, QueueTypes

Expand All @@ -35,7 +36,7 @@ def connect(self):
pass

@abstractmethod
def get_consumer(self, destination: AbstractDestination) -> AbstractQueueConsumer:
def get_consumer(self, destination: AbstractDestination, transform: Transform) -> AbstractQueueConsumer:
pass

@abstractmethod
Expand Down
9 changes: 8 additions & 1 deletion bizon/engine/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from bizon.engine.queue.queue import AbstractQueue, QueueFactory
from bizon.source.discover import get_source_instance_by_source_and_stream
from bizon.source.source import AbstractSource
from bizon.transform.transform import Transform


class AbstractRunner(ABC):
Expand Down Expand Up @@ -89,6 +90,11 @@ def get_queue(bizon_config: BizonConfig, **kwargs) -> AbstractQueue:
**kwargs,
)

@staticmethod
def get_transform(bizon_config: BizonConfig) -> Transform:
"""Return the transform instance to apply to the source records"""
return Transform(transforms=bizon_config.transforms)

@staticmethod
def get_or_create_job(
bizon_config: BizonConfig,
Expand Down Expand Up @@ -192,8 +198,9 @@ def instanciate_and_run_consumer(bizon_config: BizonConfig, job_id: str, **kwarg
queue = AbstractRunner.get_queue(bizon_config=bizon_config, **kwargs)
backend = AbstractRunner.get_backend(bizon_config=bizon_config, **kwargs)
destination = AbstractRunner.get_destination(bizon_config=bizon_config, backend=backend, job_id=job_id)
transform = AbstractRunner.get_transform(bizon_config=bizon_config)

consumer = queue.get_consumer(destination=destination)
consumer = queue.get_consumer(destination=destination, transform=transform)

status = consumer.run()
return status
Expand Down
Loading

0 comments on commit cdaed9f

Please sign in to comment.