From 034925b3284934e187f5954cfd4fabfde59f9a86 Mon Sep 17 00:00:00 2001 From: Antoine Balliet Date: Fri, 20 Dec 2024 16:03:31 +0100 Subject: [PATCH] feat: pipeline error handling and CLI return status (#4) * feat: implement failure handling for pipeline (#1) * wip * wip * Update gorgias.yml * Update gorgias.yml * wip * feat: add destination unnesting for file * feat: assert schema is provided if unnest set to True * add test for config parsing unnest * Update process.py * chore: test shutdown wait true pending * chore(fix): cast bizon date columns as string in bq_streaming * feat: implement stop events, runner status, refacto time partitioning * fix test * delete gha --------- Co-authored-by: Anas El Mhamdi * feat(cli): raise python error if pipeline error (#3) * fix: multiprocessing event type * feat: cli return exist code 1 if pipeline error * cli raises clickException * always run pytest on PR --------- Co-authored-by: Anas El Mhamdi --- .github/workflows/pytest.yml | 1 - bizon/cli/main.py | 8 ++- bizon/engine/pipeline/consumer.py | 3 +- bizon/engine/pipeline/producer.py | 5 +- .../queue/adapters/python_queue/consumer.py | 3 +- bizon/engine/runner/adapters/thread.py | 7 ++- bizon/engine/runner/config.py | 6 +++ bizon/engine/runner/runner.py | 8 ++- tests/cli/test_e2e_cli.py | 2 +- tests/cli/test_e2e_cli_error.py | 49 +++++++++++++++++++ tests/engine/test_producer.py | 2 +- 11 files changed, 83 insertions(+), 11 deletions(-) create mode 100644 tests/cli/test_e2e_cli_error.py diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index c8d6962..9c57170 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -2,7 +2,6 @@ name: Pytest bizon on: pull_request: - branches: [ "main" ] permissions: contents: read diff --git a/bizon/cli/main.py b/bizon/cli/main.py index 7f2bac1..454372d 100644 --- a/bizon/cli/main.py +++ b/bizon/cli/main.py @@ -117,9 +117,13 @@ def run( set_runner_in_config(config=config, runner=runner) runner = RunnerFactory.create_from_config_dict(config=config) - runner.run() + result = runner.run() - click.echo("Pipeline finished.") + if result.is_success: + click.secho("Pipeline finished successfully.", fg="green") + + else: + raise click.exceptions.ClickException(result.to_string()) if __name__ == "__main__": diff --git a/bizon/engine/pipeline/consumer.py b/bizon/engine/pipeline/consumer.py index fb889a9..e7e6e58 100644 --- a/bizon/engine/pipeline/consumer.py +++ b/bizon/engine/pipeline/consumer.py @@ -1,4 +1,5 @@ import multiprocessing +import multiprocessing.synchronize import threading from abc import ABC, abstractmethod from typing import Union @@ -16,5 +17,5 @@ def __init__(self, config: AbstractQueueConfig, destination: AbstractDestination self.transform = transform @abstractmethod - def run(self, stop_event: Union[multiprocessing.Event, threading.Event]) -> PipelineReturnStatus: + def run(self, stop_event: Union[multiprocessing.synchronize.Event, threading.Event]) -> PipelineReturnStatus: pass diff --git a/bizon/engine/pipeline/producer.py b/bizon/engine/pipeline/producer.py index bf1c200..aa373cb 100644 --- a/bizon/engine/pipeline/producer.py +++ b/bizon/engine/pipeline/producer.py @@ -1,5 +1,6 @@ import ast import multiprocessing +import multiprocessing.synchronize import threading import traceback from datetime import datetime @@ -101,7 +102,9 @@ def is_queue_full(self, cursor: Cursor) -> Tuple[bool, int, int]: return False, queue_size, approximate_nb_records_in_queue - def run(self, job_id: int, stop_event: Union[multiprocessing.Event, threading.Event]) -> PipelineReturnStatus: + def run( + self, job_id: int, stop_event: Union[multiprocessing.synchronize.Event, threading.Event] + ) -> PipelineReturnStatus: return_value: PipelineReturnStatus = PipelineReturnStatus.SUCCESS diff --git a/bizon/engine/queue/adapters/python_queue/consumer.py b/bizon/engine/queue/adapters/python_queue/consumer.py index eeb0761..66eb3f2 100644 --- a/bizon/engine/queue/adapters/python_queue/consumer.py +++ b/bizon/engine/queue/adapters/python_queue/consumer.py @@ -1,4 +1,5 @@ import multiprocessing +import multiprocessing.synchronize import threading import traceback from typing import Union @@ -21,7 +22,7 @@ def __init__( super().__init__(config, destination=destination, transform=transform) self.queue = queue - def run(self, stop_event: Union[threading.Event, multiprocessing.Event]) -> PipelineReturnStatus: + def run(self, stop_event: Union[threading.Event, multiprocessing.synchronize.Event]) -> PipelineReturnStatus: while True: if stop_event.is_set(): logger.info("Stop event is set, closing consumer ...") diff --git a/bizon/engine/runner/adapters/thread.py b/bizon/engine/runner/adapters/thread.py index 5e4ec3c..a7330c8 100644 --- a/bizon/engine/runner/adapters/thread.py +++ b/bizon/engine/runner/adapters/thread.py @@ -95,4 +95,9 @@ def run(self) -> RunnerStatus: logger.error("Consumer thread failed, stopping producer ...") producer_stop_event.set() - return RunnerStatus(producer=future_producer.result(), consumer=future_consumer.result()) + runner_status = RunnerStatus(producer=future_producer.result(), consumer=future_consumer.result()) + + if not runner_status.is_success: + logger.error(runner_status.to_string()) + + return runner_status diff --git a/bizon/engine/runner/config.py b/bizon/engine/runner/config.py index 8569f19..487e52f 100644 --- a/bizon/engine/runner/config.py +++ b/bizon/engine/runner/config.py @@ -63,3 +63,9 @@ def is_success(self): return True else: return False + + def to_string(self): + return ( + f"Pipeline finished with status {'Success' if self.is_success else 'Failure'} " + f"(Producer: {self.producer.value}, Consumer: {self.consumer.value})" + ) diff --git a/bizon/engine/runner/runner.py b/bizon/engine/runner/runner.py index 0a0cb49..1a2b571 100644 --- a/bizon/engine/runner/runner.py +++ b/bizon/engine/runner/runner.py @@ -1,4 +1,5 @@ import multiprocessing +import multiprocessing.synchronize import sys import threading from abc import ABC, abstractmethod @@ -183,7 +184,7 @@ def instanciate_and_run_producer( bizon_config: BizonConfig, config: dict, job_id: str, - stop_event: Union[multiprocessing.Event, threading.Event], + stop_event: Union[multiprocessing.synchronize.Event, threading.Event], **kwargs, ): @@ -203,7 +204,10 @@ def instanciate_and_run_producer( @staticmethod def instanciate_and_run_consumer( - bizon_config: BizonConfig, job_id: str, stop_event: Union[multiprocessing.Event, threading.Event], **kwargs + bizon_config: BizonConfig, + job_id: str, + stop_event: Union[multiprocessing.synchronize.Event, threading.Event], + **kwargs, ): queue = AbstractRunner.get_queue(bizon_config=bizon_config, **kwargs) diff --git a/tests/cli/test_e2e_cli.py b/tests/cli/test_e2e_cli.py index 43daca7..b2201a2 100644 --- a/tests/cli/test_e2e_cli.py +++ b/tests/cli/test_e2e_cli.py @@ -48,4 +48,4 @@ def test_e2e_run_command_dummy_to_file(): result = runner.invoke(cli, ["run", "config.yml"], catch_exceptions=True) assert result.exit_code == 0 - assert result.output == "Pipeline finished.\n" + assert result.output == "Pipeline finished successfully.\n" diff --git a/tests/cli/test_e2e_cli_error.py b/tests/cli/test_e2e_cli_error.py new file mode 100644 index 0000000..eacd29e --- /dev/null +++ b/tests/cli/test_e2e_cli_error.py @@ -0,0 +1,49 @@ +import tempfile + +from click.testing import CliRunner + +from bizon.cli.main import cli + +BIZON_CONFIG_DUMMY_TO_FILE = f""" +name: test_job_3 + +source: + source_name: dummy + stream_name: creatures + + authentication: + type: api_key + params: + token: dummy_key + sleep: 2 + +destination: + name: file + config: + filepath: test.jsonl + +transforms: +- label: transform_data + python: | + if 'name' in data: + data['name'] = data['this_key_doesnt_exist'].upper() +""" + + +def test_e2e_run_command_dummy_to_file(): + + runner = CliRunner(mix_stderr=False) + + with tempfile.NamedTemporaryFile(delete=False) as temp: + # Write config in temp file + with open(temp.name, "w") as f: + f.write(BIZON_CONFIG_DUMMY_TO_FILE) + + runner = CliRunner(mix_stderr=False) + result = runner.invoke(cli, ["run", temp.name]) + + assert result.exit_code == 1 + assert ( + "Pipeline finished with status Failure (Producer: killed_by_runner, Consumer: transform_error)" + in result.stderr + ) diff --git a/tests/engine/test_producer.py b/tests/engine/test_producer.py index d2dfd31..b07f284 100644 --- a/tests/engine/test_producer.py +++ b/tests/engine/test_producer.py @@ -1,4 +1,5 @@ import os +import threading from datetime import datetime from queue import Queue @@ -10,7 +11,6 @@ from bizon.engine.engine import RunnerFactory from bizon.engine.pipeline.producer import Producer from bizon.source.models import SourceIteration, SourceRecord -import threading @pytest.fixture(scope="function")