From 19127b5406e64372b12954b13728791fe13cc85f Mon Sep 17 00:00:00 2001 From: Antoine Balliet Date: Thu, 19 Dec 2024 17:34:56 +0100 Subject: [PATCH] feat: implement failure handling for pipeline (#1) (#4) --- .../destinations/bigquery/src/destination.py | 4 +- .../bigquery_streaming/src/config.py | 15 ++++-- .../bigquery_streaming/src/destination.py | 7 ++- .../bigquery_streaming/src/proto_utils.py | 2 +- bizon/destinations/config.py | 19 ++++--- bizon/destinations/file/src/destination.py | 15 +++++- bizon/engine/pipeline/consumer.py | 6 ++- bizon/engine/pipeline/models.py | 6 ++- bizon/engine/pipeline/producer.py | 10 +++- .../queue/adapters/python_queue/consumer.py | 51 ++++++++++++++----- bizon/engine/runner/adapters/process.py | 16 +++++- bizon/engine/runner/adapters/thread.py | 35 +++++++++---- bizon/engine/runner/config.py | 14 +++++ bizon/engine/runner/runner.py | 23 ++++++--- bizon/sources/dummy/src/fake_api.py | 7 ++- bizon/sources/dummy/src/source.py | 6 ++- .../dummy/tests/dummy_pipeline_unnest.py | 29 +++++++++++ bizon/sources/kafka/tests/kafka_pipeline.py | 2 +- bizon/transform/transform.py | 7 ++- .../test_bigquery_streaming_client.py | 6 +-- tests/destinations/test_config.py | 42 +++++++++++++++ tests/e2e/test_e2e_stop_pipeline.py | 41 +++++++++++++++ tests/engine/test_producer.py | 4 +- .../{test_config.py => test_kafka_config.py} | 0 24 files changed, 307 insertions(+), 60 deletions(-) create mode 100644 bizon/sources/dummy/tests/dummy_pipeline_unnest.py create mode 100644 tests/destinations/test_config.py create mode 100644 tests/e2e/test_e2e_stop_pipeline.py rename tests/sources/kafka/{test_config.py => test_kafka_config.py} (100%) diff --git a/bizon/destinations/bigquery/src/destination.py b/bizon/destinations/bigquery/src/destination.py index d3f692d..07209b5 100644 --- a/bizon/destinations/bigquery/src/destination.py +++ b/bizon/destinations/bigquery/src/destination.py @@ -126,7 +126,9 @@ def unnest_data(df_destination_records: pl.DataFrame, record_schema: list[BigQue """Unnest the source_data field into separate columns""" # Check if the schema matches the expected schema - source_data_fields = pl.DataFrame(df_destination_records['source_data'].str.json_decode()).schema["source_data"].fields + source_data_fields = ( + pl.DataFrame(df_destination_records["source_data"].str.json_decode()).schema["source_data"].fields + ) record_schema_fields = [col.name for col in record_schema] diff --git a/bizon/destinations/bigquery_streaming/src/config.py b/bizon/destinations/bigquery_streaming/src/config.py index c8d9a7b..b34ea3f 100644 --- a/bizon/destinations/bigquery_streaming/src/config.py +++ b/bizon/destinations/bigquery_streaming/src/config.py @@ -11,13 +11,20 @@ ) -class TimePartitioning(str, Enum): +class TimePartitioningWindow(str, Enum): DAY = "DAY" HOUR = "HOUR" MONTH = "MONTH" YEAR = "YEAR" +class TimePartitioning(BaseModel): + type: TimePartitioningWindow = Field(default=TimePartitioningWindow.DAY, description="Time partitioning type") + field: Optional[str] = Field( + "_bizon_loaded_at", description="Field to partition by. You can use a transformation to create this field." + ) + + class BigQueryAuthentication(BaseModel): service_account_key: str = Field( description="Service Account Key JSON string. If empty it will be infered", @@ -33,10 +40,8 @@ class BigQueryStreamingConfigDetails(AbstractDestinationDetailsConfig): default=None, description="Table ID, if not provided it will be inferred from source name" ) time_partitioning: Optional[TimePartitioning] = Field( - default=TimePartitioning.DAY, description="BigQuery Time partitioning type" - ) - time_partitioning_field: Optional[str] = Field( - "_bizon_loaded_at", description="Field to partition by. You can use a transformation to create this field." + default=TimePartitioning(type=TimePartitioningWindow.DAY, field="_bizon_loaded_at"), + description="BigQuery Time partitioning type", ) authentication: Optional[BigQueryAuthentication] = None bq_max_rows_per_request: Optional[int] = Field(30000, description="Max rows per buffer streaming request.") diff --git a/bizon/destinations/bigquery_streaming/src/destination.py b/bizon/destinations/bigquery_streaming/src/destination.py index e40f538..8e777ae 100644 --- a/bizon/destinations/bigquery_streaming/src/destination.py +++ b/bizon/destinations/bigquery_streaming/src/destination.py @@ -115,7 +115,7 @@ def load_to_bigquery_via_streaming(self, df_destination_records: pl.DataFrame) - schema = self.get_bigquery_schema() table = bigquery.Table(self.table_id, schema=schema) time_partitioning = TimePartitioning( - field=self.config.time_partitioning_field, type_=self.config.time_partitioning + field=self.config.time_partitioning.field, type_=self.config.time_partitioning.type ) table.time_partitioning = time_partitioning @@ -136,6 +136,11 @@ def load_to_bigquery_via_streaming(self, df_destination_records: pl.DataFrame) - for row in df_destination_records["source_data"].str.json_decode().to_list() ] else: + df_destination_records = df_destination_records.with_columns( + pl.col("bizon_extracted_at").dt.strftime("%Y-%m-%d %H:%M:%S").alias("bizon_extracted_at"), + pl.col("bizon_loaded_at").dt.strftime("%Y-%m-%d %H:%M:%S").alias("bizon_loaded_at"), + pl.col("source_timestamp").dt.strftime("%Y-%m-%d %H:%M:%S").alias("source_timestamp"), + ) df_destination_records = df_destination_records.rename( { "bizon_id": "_bizon_id", diff --git a/bizon/destinations/bigquery_streaming/src/proto_utils.py b/bizon/destinations/bigquery_streaming/src/proto_utils.py index 27419e8..3810927 100644 --- a/bizon/destinations/bigquery_streaming/src/proto_utils.py +++ b/bizon/destinations/bigquery_streaming/src/proto_utils.py @@ -25,7 +25,7 @@ def map_bq_type_to_field_descriptor(bq_type: str) -> int: "DATE": FieldDescriptorProto.TYPE_STRING, # DATE -> TYPE_STRING "DATETIME": FieldDescriptorProto.TYPE_STRING, # DATETIME -> TYPE_STRING "TIME": FieldDescriptorProto.TYPE_STRING, # TIME -> TYPE_STRING - "TIMESTAMP": FieldDescriptorProto.TYPE_INT64, # TIMESTAMP -> TYPE_INT64 (Unix epoch time) + "TIMESTAMP": FieldDescriptorProto.TYPE_STRING, # TIMESTAMP -> TYPE_INT64 (Unix epoch time) "RECORD": FieldDescriptorProto.TYPE_MESSAGE, # RECORD -> TYPE_MESSAGE (nested message) } diff --git a/bizon/destinations/config.py b/bizon/destinations/config.py index 653d1d0..92eebad 100644 --- a/bizon/destinations/config.py +++ b/bizon/destinations/config.py @@ -2,7 +2,7 @@ from enum import Enum from typing import Optional -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel, ConfigDict, Field, field_validator class DestinationTypes(str, Enum): @@ -33,19 +33,26 @@ class AbstractDestinationDetailsConfig(BaseModel): description="Maximum time in seconds for buffering after which the records will be written to the destination. Set to 0 to deactivate the timeout buffer check.", # noqa ) - unnest: Optional[bool] = Field( - default=False, - description="Unnest the data before writing to the destination. Schema should be provided in the model_config.", - ) - record_schema: Optional[list[DestinationColumn]] = Field( default=None, description="Schema for the records. Required if unnest is set to true." ) + unnest: bool = Field( + default=False, + description="Unnest the data before writing to the destination. Schema should be provided in the model_config.", + ) + authentication: Optional[BaseModel] = Field( description="Authentication configuration for the destination, if needed", default=None ) + @field_validator("unnest", mode="before") + def validate_record_schema_if_unnest(cls, value, values): + if bool(value) and values.data.get("record_schema") is None: + raise ValueError("A `record_schema` must be provided if `unnest` is set to True.") + + return value + class AbstractDestinationConfig(BaseModel): # Forbid extra keys in the model diff --git a/bizon/destinations/file/src/destination.py b/bizon/destinations/file/src/destination.py index 89d5847..6c1aea4 100644 --- a/bizon/destinations/file/src/destination.py +++ b/bizon/destinations/file/src/destination.py @@ -1,3 +1,4 @@ +import json from typing import Tuple import polars as pl @@ -23,5 +24,17 @@ def delete_table(self) -> bool: return True def write_records(self, df_destination_records: pl.DataFrame) -> Tuple[bool, str]: - df_destination_records.write_ndjson(self.config.filepath) + + if self.config.unnest: + + schema_keys = set([column.name for column in self.config.record_schema]) + + with open(self.config.filepath, "a") as f: + for value in df_destination_records["source_data"].str.json_decode().to_list(): + assert set(value.keys()) == schema_keys, "Keys do not match the schema" + f.write(f"{json.dumps(value)}\n") + + else: + df_destination_records.write_ndjson(self.config.filepath) + return True, "" diff --git a/bizon/engine/pipeline/consumer.py b/bizon/engine/pipeline/consumer.py index c0fbba8..fb889a9 100644 --- a/bizon/engine/pipeline/consumer.py +++ b/bizon/engine/pipeline/consumer.py @@ -1,6 +1,10 @@ +import multiprocessing +import threading from abc import ABC, abstractmethod +from typing import Union from bizon.destinations.destination import AbstractDestination +from bizon.engine.pipeline.models import PipelineReturnStatus from bizon.engine.queue.config import AbstractQueueConfig from bizon.transform.transform import Transform @@ -12,5 +16,5 @@ def __init__(self, config: AbstractQueueConfig, destination: AbstractDestination self.transform = transform @abstractmethod - def run(self): + def run(self, stop_event: Union[multiprocessing.Event, threading.Event]) -> PipelineReturnStatus: pass diff --git a/bizon/engine/pipeline/models.py b/bizon/engine/pipeline/models.py index 1a1a83a..399470f 100644 --- a/bizon/engine/pipeline/models.py +++ b/bizon/engine/pipeline/models.py @@ -1,10 +1,14 @@ from enum import Enum -class PipelineReturnStatus(Enum): +class PipelineReturnStatus(str, Enum): """Producer error types""" SUCCESS = "success" + ERROR = "error" + KILLED_BY_RUNNER = "killed_by_runner" QUEUE_ERROR = "queue_error" SOURCE_ERROR = "source_error" BACKEND_ERROR = "backend_error" + TRANSFORM_ERROR = "transform_error" + DESTINATION_ERROR = "destination_error" diff --git a/bizon/engine/pipeline/producer.py b/bizon/engine/pipeline/producer.py index 772cb08..bf1c200 100644 --- a/bizon/engine/pipeline/producer.py +++ b/bizon/engine/pipeline/producer.py @@ -1,8 +1,10 @@ import ast +import multiprocessing +import threading import traceback from datetime import datetime from time import sleep -from typing import Tuple +from typing import Tuple, Union from loguru import logger from pytz import UTC @@ -99,7 +101,7 @@ def is_queue_full(self, cursor: Cursor) -> Tuple[bool, int, int]: return False, queue_size, approximate_nb_records_in_queue - def run(self, job_id: int): + def run(self, job_id: int, stop_event: Union[multiprocessing.Event, threading.Event]) -> PipelineReturnStatus: return_value: PipelineReturnStatus = PipelineReturnStatus.SUCCESS @@ -128,6 +130,10 @@ def run(self, job_id: int): while not cursor.is_finished: + if stop_event.is_set(): + logger.info("Stop event is set, terminating producer ...") + return PipelineReturnStatus.KILLED_BY_RUNNER + timestamp_start_iteration = datetime.now(tz=UTC) # Handle the case where last cursor already reach max_iterations diff --git a/bizon/engine/queue/adapters/python_queue/consumer.py b/bizon/engine/queue/adapters/python_queue/consumer.py index 01461c0..eeb0761 100644 --- a/bizon/engine/queue/adapters/python_queue/consumer.py +++ b/bizon/engine/queue/adapters/python_queue/consumer.py @@ -1,7 +1,13 @@ +import multiprocessing +import threading +import traceback +from typing import Union + from loguru import logger from bizon.destinations.destination import AbstractDestination from bizon.engine.pipeline.consumer import AbstractQueueConsumer +from bizon.engine.pipeline.models import PipelineReturnStatus from bizon.engine.queue.queue import QUEUE_TERMINATION, AbstractQueue, QueueMessage from bizon.transform.transform import Transform @@ -15,30 +21,47 @@ def __init__( super().__init__(config, destination=destination, transform=transform) self.queue = queue - def run(self) -> None: + def run(self, stop_event: Union[threading.Event, multiprocessing.Event]) -> PipelineReturnStatus: while True: - + if stop_event.is_set(): + logger.info("Stop event is set, closing consumer ...") + return PipelineReturnStatus.KILLED_BY_RUNNER # Retrieve the message from the queue queue_message: QueueMessage = self.queue.get() # Apply the transformation - df_source_records = self.transform.apply_transforms(df_source_records=queue_message.df_source_records) + try: + df_source_records = self.transform.apply_transforms(df_source_records=queue_message.df_source_records) + except Exception as e: + logger.error(f"Error applying transformation: {e}") + logger.error(traceback.format_exc()) + return PipelineReturnStatus.TRANSFORM_ERROR - if queue_message.signal == QUEUE_TERMINATION: - logger.info("Received termination signal, waiting for destination to close gracefully ...") + try: + if queue_message.signal == QUEUE_TERMINATION: + logger.info("Received termination signal, waiting for destination to close gracefully ...") + self.destination.write_records_and_update_cursor( + df_source_records=df_source_records, + iteration=queue_message.iteration, + extracted_at=queue_message.extracted_at, + pagination=queue_message.pagination, + last_iteration=True, + ) + break + except Exception as e: + logger.error(f"Error writing records to destination: {e}") + return PipelineReturnStatus.DESTINATION_ERROR + + try: self.destination.write_records_and_update_cursor( df_source_records=df_source_records, iteration=queue_message.iteration, extracted_at=queue_message.extracted_at, pagination=queue_message.pagination, - last_iteration=True, ) - break - - self.destination.write_records_and_update_cursor( - df_source_records=df_source_records, - iteration=queue_message.iteration, - extracted_at=queue_message.extracted_at, - pagination=queue_message.pagination, - ) + except Exception as e: + logger.error(f"Error writing records to destination: {e}") + return PipelineReturnStatus.DESTINATION_ERROR + self.queue.task_done() + return PipelineReturnStatus.SUCCESS diff --git a/bizon/engine/runner/adapters/process.py b/bizon/engine/runner/adapters/process.py index b0d4db0..e2ad774 100644 --- a/bizon/engine/runner/adapters/process.py +++ b/bizon/engine/runner/adapters/process.py @@ -1,5 +1,6 @@ import concurrent.futures import time +import traceback from loguru import logger @@ -68,8 +69,19 @@ def run(self): result_producer = future_producer.result() logger.info(f"Producer process stopped running with result: {result_producer}") + if result_producer.SUCCESS: + logger.info("Producer thread has finished successfully, will wait for consumer to finish ...") + else: + logger.error("Producer thread failed, stopping consumer ...") + executor.shutdown(wait=False) + if not future_consumer.running(): - result_consumer = future_consumer.result() - logger.info(f"Consumer process stopped running with result: {result_consumer}") + try: + future_consumer.result() + except Exception as e: + logger.error(f"Consumer thread stopped running with error {e}") + logger.error(traceback.format_exc()) + finally: + executor.shutdown(wait=False) return True diff --git a/bizon/engine/runner/adapters/thread.py b/bizon/engine/runner/adapters/thread.py index b6e3fe8..5e4ec3c 100644 --- a/bizon/engine/runner/adapters/thread.py +++ b/bizon/engine/runner/adapters/thread.py @@ -1,10 +1,12 @@ import concurrent.futures import time -import traceback +from threading import Event from loguru import logger from bizon.common.models import BizonConfig +from bizon.engine.pipeline.models import PipelineReturnStatus +from bizon.engine.runner.config import RunnerStatus from bizon.engine.runner.runner import AbstractRunner @@ -25,7 +27,7 @@ def get_kwargs(self): return extra_kwargs - def run(self) -> bool: + def run(self) -> RunnerStatus: """Run the pipeline with dedicated threads for source and destination""" extra_kwargs = self.get_kwargs() @@ -35,6 +37,10 @@ def run(self) -> bool: result_producer = None result_consumer = None + # Start the producer and consumer events + producer_stop_event = Event() + consumer_stop_event = Event() + extra_kwargs = self.get_kwargs() with concurrent.futures.ThreadPoolExecutor( @@ -46,6 +52,7 @@ def run(self) -> bool: self.bizon_config, self.config, job.id, + producer_stop_event, **extra_kwargs, ) logger.info("Producer thread has started ...") @@ -56,6 +63,7 @@ def run(self) -> bool: AbstractRunner.instanciate_and_run_consumer, self.bizon_config, job.id, + consumer_stop_event, **extra_kwargs, ) logger.info("Consumer thread has started ...") @@ -68,14 +76,23 @@ def run(self) -> bool: self._is_running = False if not future_producer.running(): - result_producer = future_producer.result() + result_producer: PipelineReturnStatus = future_producer.result() logger.info(f"Producer thread stopped running with result: {result_producer}") + if result_producer.SUCCESS: + logger.info("Producer thread has finished successfully, will wait for consumer to finish ...") + else: + logger.error("Producer thread failed, stopping consumer ...") + consumer_stop_event.set() + if not future_consumer.running(): - try: - future_consumer.result() - except Exception as e: - logger.error(f"Consumer thread stopped running with error {e}") - logger.error(traceback.format_exc()) + result_consumer = future_consumer.result() + logger.info(f"Consumer thread stopped running with result: {result_consumer}") + + if result_consumer == PipelineReturnStatus.SUCCESS: + logger.info("Consumer thread has finished successfully") + else: + logger.error("Consumer thread failed, stopping producer ...") + producer_stop_event.set() - return True + return RunnerStatus(producer=future_producer.result(), consumer=future_consumer.result()) diff --git a/bizon/engine/runner/config.py b/bizon/engine/runner/config.py index c954d84..8569f19 100644 --- a/bizon/engine/runner/config.py +++ b/bizon/engine/runner/config.py @@ -3,6 +3,8 @@ from pydantic import BaseModel, Field +from bizon.engine.pipeline.models import PipelineReturnStatus + class RunnerTypes(str, Enum): THREAD = "thread" @@ -49,3 +51,15 @@ class RunnerConfig(BaseModel): description="Logging level", default=LoggerLevel.INFO, ) + + +class RunnerStatus(BaseModel): + producer: PipelineReturnStatus + consumer: PipelineReturnStatus + + @property + def is_success(self): + if self.producer == PipelineReturnStatus.SUCCESS and self.consumer == PipelineReturnStatus.SUCCESS: + return True + else: + return False diff --git a/bizon/engine/runner/runner.py b/bizon/engine/runner/runner.py index 4811335..0a0cb49 100644 --- a/bizon/engine/runner/runner.py +++ b/bizon/engine/runner/runner.py @@ -1,6 +1,8 @@ -import os +import multiprocessing import sys +import threading from abc import ABC, abstractmethod +from typing import Union from loguru import logger @@ -11,6 +13,7 @@ from bizon.engine.backend.models import JobStatus, StreamJob from bizon.engine.pipeline.producer import Producer from bizon.engine.queue.queue import AbstractQueue, QueueFactory +from bizon.engine.runner.config import RunnerStatus from bizon.source.discover import get_source_instance_by_source_and_stream from bizon.source.source import AbstractSource from bizon.transform.transform import Transform @@ -176,7 +179,13 @@ def init_job(bizon_config: BizonConfig, config: dict, **kwargs) -> StreamJob: return job @staticmethod - def instanciate_and_run_producer(bizon_config: BizonConfig, config: dict, job_id: str, **kwargs): + def instanciate_and_run_producer( + bizon_config: BizonConfig, + config: dict, + job_id: str, + stop_event: Union[multiprocessing.Event, threading.Event], + **kwargs, + ): source = AbstractRunner.get_source(bizon_config=bizon_config, config=config) queue = AbstractRunner.get_queue(bizon_config=bizon_config, **kwargs) @@ -189,11 +198,13 @@ def instanciate_and_run_producer(bizon_config: BizonConfig, config: dict, job_id backend=backend, ) - status = producer.run(job_id) + status = producer.run(job_id, stop_event) return status @staticmethod - def instanciate_and_run_consumer(bizon_config: BizonConfig, job_id: str, **kwargs): + def instanciate_and_run_consumer( + bizon_config: BizonConfig, job_id: str, stop_event: Union[multiprocessing.Event, threading.Event], **kwargs + ): queue = AbstractRunner.get_queue(bizon_config=bizon_config, **kwargs) backend = AbstractRunner.get_backend(bizon_config=bizon_config, **kwargs) @@ -202,10 +213,10 @@ def instanciate_and_run_consumer(bizon_config: BizonConfig, job_id: str, **kwarg consumer = queue.get_consumer(destination=destination, transform=transform) - status = consumer.run() + status = consumer.run(stop_event) return status @abstractmethod - def run(self) -> bool: + def run(self) -> RunnerStatus: """Run the pipeline with dedicated adapter for source and destination""" pass diff --git a/bizon/sources/dummy/src/fake_api.py b/bizon/sources/dummy/src/fake_api.py index 65eedf8..30a7771 100644 --- a/bizon/sources/dummy/src/fake_api.py +++ b/bizon/sources/dummy/src/fake_api.py @@ -1,5 +1,10 @@ +import time + + # Function emulating an API call to a source endpoint -def fake_api_call(url: str, cursor: str = None) -> dict: +def fake_api_call(url: str, cursor: str = None, sleep: int = None) -> dict: + if sleep: + time.sleep(sleep) if url == "https://api.dummy.com/v1/creatures": return fake_api_call_creatures(cursor) diff --git a/bizon/sources/dummy/src/source.py b/bizon/sources/dummy/src/source.py index e753b3b..f293081 100644 --- a/bizon/sources/dummy/src/source.py +++ b/bizon/sources/dummy/src/source.py @@ -23,12 +23,14 @@ class DummyAuthConfig(AuthConfig): class DummySourceConfig(SourceConfig): authentication: DummyAuthConfig + sleep: int = Field(0, description="Sleep time in seconds between API calls") class DummySource(AbstractSource): def __init__(self, config: DummySourceConfig): super().__init__(config) + self.config = config @staticmethod def streams() -> List[str]: @@ -71,11 +73,11 @@ def get(self, pagination: dict = None) -> SourceIteration: # If no pagination data is passed, we want to reach first page if not pagination: - response = fake_api_call(url=self.url_entity) + response = fake_api_call(url=self.url_entity, sleep=self.config.sleep) # If we have pagination data we pass it to the API else: - response = fake_api_call(url=self.url_entity, cursor=pagination.get("cursor")) + response = fake_api_call(url=self.url_entity, cursor=pagination.get("cursor"), sleep=self.config.sleep) # Now we process the response to: # - allow bizon to process the records and write them to destination diff --git a/bizon/sources/dummy/tests/dummy_pipeline_unnest.py b/bizon/sources/dummy/tests/dummy_pipeline_unnest.py new file mode 100644 index 0000000..6786982 --- /dev/null +++ b/bizon/sources/dummy/tests/dummy_pipeline_unnest.py @@ -0,0 +1,29 @@ +from yaml import safe_load + +from bizon.engine.engine import RunnerFactory + +config_yaml = """ +name: test_job + +source: + source_name: dummy + stream_name: creatures + authentication: + type: api_key + params: + token: dummy_key + +destination: + name: logger + config: + dummy: dummy + +transforms: + - label: failure_transform + python: | + data['cookies'] = data['key_that_does_not_exist'].upper() +""" + +config = safe_load(config_yaml) +runner = RunnerFactory.create_from_config_dict(config=config) +runner.run() diff --git a/bizon/sources/kafka/tests/kafka_pipeline.py b/bizon/sources/kafka/tests/kafka_pipeline.py index 680d212..86a144e 100644 --- a/bizon/sources/kafka/tests/kafka_pipeline.py +++ b/bizon/sources/kafka/tests/kafka_pipeline.py @@ -4,6 +4,6 @@ if __name__ == "__main__": runner = RunnerFactory.create_from_yaml( - filepath=os.path.abspath("bizon/sources/kafka/config/kafka_teams_users_eu_west3_86c1.yml") + filepath=os.path.abspath("bizon/sources/kafka/config/kafka_teams_users.yml") ) runner.run() diff --git a/bizon/transform/transform.py b/bizon/transform/transform.py index 3e1bb65..4deacbd 100644 --- a/bizon/transform/transform.py +++ b/bizon/transform/transform.py @@ -14,7 +14,6 @@ def __init__(self, transforms: list[TransformModel]): def apply_transforms(self, df_source_records: pl.DataFrame) -> pl.DataFrame: """Apply transformation on df_source_records""" - # Process the transformations for transform in self.transforms: @@ -34,8 +33,12 @@ def my_transform(data: dict) -> str: # Stop writing here return json.dumps(local_vars["data"]) + transformed_source_records = [ + my_transform(row) for row in df_source_records["data"].str.json_decode().to_list() + ] + df_source_records = df_source_records.with_columns( - pl.col("data").str.json_decode().map_elements(my_transform, return_dtype=pl.String).alias("data") + pl.Series("data", transformed_source_records, dtype=pl.String).alias("data") ) return df_source_records diff --git a/tests/destinations/bigquery_streaming/test_bigquery_streaming_client.py b/tests/destinations/bigquery_streaming/test_bigquery_streaming_client.py index 1308f73..201cd73 100644 --- a/tests/destinations/bigquery_streaming/test_bigquery_streaming_client.py +++ b/tests/destinations/bigquery_streaming/test_bigquery_streaming_client.py @@ -98,7 +98,7 @@ def test_streaming_unnested_records(my_backend_config, sync_metadata_stream): dataset_id=TEST_DATASET_ID, table_id=TEST_TABLE_ID, unnest=True, - time_partitioning_field="created_at", + time_partitioning={"type": "DAY", "field": "created_at"}, record_schema=[ { "name": "id", @@ -165,7 +165,7 @@ def test_error_on_added_column(my_backend_config, sync_metadata_stream): dataset_id=TEST_DATASET_ID, table_id=TEST_TABLE_ID, unnest=True, - time_partitioning_field="created_at", + time_partitioning={"type": "DAY", "field": "created_at"}, record_schema=[ { "name": "id", @@ -242,7 +242,7 @@ def test_error_on_deleted_column(my_backend_config, sync_metadata_stream): dataset_id=TEST_DATASET_ID, table_id=TEST_TABLE_ID, unnest=True, - time_partitioning_field="created_at", + time_partitioning={"type": "DAY", "field": "created_at"}, record_schema=[ { "name": "id", diff --git a/tests/destinations/test_config.py b/tests/destinations/test_config.py new file mode 100644 index 0000000..d952c68 --- /dev/null +++ b/tests/destinations/test_config.py @@ -0,0 +1,42 @@ +import pytest +from pydantic import ValidationError + +from bizon.destinations.config import ( + AbstractDestinationConfig, + AbstractDestinationDetailsConfig, +) + + +def test_config(): + config = AbstractDestinationConfig( + name="file", + config=AbstractDestinationDetailsConfig( + unnest=False, + ), + ) + assert config + + +def test_config_no_record_schema_provided(): + + with pytest.raises(ValidationError) as e: + AbstractDestinationConfig( + name="file", + config=AbstractDestinationDetailsConfig( + unnest=True, + ), + ) + + +def test_config_with_unnest_provided_schema(): + config = AbstractDestinationConfig( + name="file", + config=AbstractDestinationDetailsConfig( + unnest=True, + record_schema=[ + {"name": "name", "type": "string", "description": "Name of the user"}, + {"name": "age", "type": "int", "description": "Age of the user"}, + ], + ), + ) + assert config diff --git a/tests/e2e/test_e2e_stop_pipeline.py b/tests/e2e/test_e2e_stop_pipeline.py new file mode 100644 index 0000000..76e0ce6 --- /dev/null +++ b/tests/e2e/test_e2e_stop_pipeline.py @@ -0,0 +1,41 @@ +import tempfile + +import yaml + +from bizon.engine.engine import RunnerFactory +from bizon.engine.pipeline.models import PipelineReturnStatus + + +def test_e2e_pipeline_should_stop(): + + with tempfile.NamedTemporaryFile(delete=False) as temp: + + BIZON_CONFIG_DUMMY_TO_FILE = f""" + name: test_job_3 + + source: + source_name: dummy + stream_name: creatures + authentication: + type: api_key + params: + token: dummy_key + sleep: 2 + destination: + name: file + config: + filepath: {temp.name} + + transforms: + - label: transform_data + python: | + if 'name' in data: + data['name'] = data['this_key_doesnt_exist'].upper() + """ + + runner = RunnerFactory.create_from_config_dict(yaml.safe_load(BIZON_CONFIG_DUMMY_TO_FILE)) + + status = runner.run() + + assert status.producer == PipelineReturnStatus.KILLED_BY_RUNNER + assert status.consumer == PipelineReturnStatus.TRANSFORM_ERROR diff --git a/tests/engine/test_producer.py b/tests/engine/test_producer.py index f4cf870..d2dfd31 100644 --- a/tests/engine/test_producer.py +++ b/tests/engine/test_producer.py @@ -10,6 +10,7 @@ from bizon.engine.engine import RunnerFactory from bizon.engine.pipeline.producer import Producer from bizon.source.models import SourceIteration, SourceRecord +import threading @pytest.fixture(scope="function") @@ -38,7 +39,8 @@ def test_cursor_recovery_after_iteration(my_producer: Producer, sqlite_db_sessio cursor = my_producer.get_or_create_cursor(job_id=my_job.id, session=sqlite_db_session) assert cursor is not None - my_producer.run(job_id=my_job.id) + stop_event = threading.Event() + my_producer.run(job_id=my_job.id, stop_event=stop_event) # Here we did not run the job, so the cursor should be None cursor_from_db = my_producer.backend.get_last_cursor_by_job_id(job_id=my_job.id, session=sqlite_db_session) diff --git a/tests/sources/kafka/test_config.py b/tests/sources/kafka/test_kafka_config.py similarity index 100% rename from tests/sources/kafka/test_config.py rename to tests/sources/kafka/test_kafka_config.py