Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pipeline error handling and CLI return status #4

Merged
merged 3 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ name: Pytest bizon

on:
pull_request:
branches: [ "main" ]

permissions:
contents: read
Expand Down
8 changes: 6 additions & 2 deletions bizon/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,13 @@ def run(
set_runner_in_config(config=config, runner=runner)

runner = RunnerFactory.create_from_config_dict(config=config)
runner.run()
result = runner.run()

click.echo("Pipeline finished.")
if result.is_success:
click.secho("Pipeline finished successfully.", fg="green")

else:
raise click.exceptions.ClickException(result.to_string())


if __name__ == "__main__":
Expand Down
3 changes: 2 additions & 1 deletion bizon/engine/pipeline/consumer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import multiprocessing
import multiprocessing.synchronize
import threading
from abc import ABC, abstractmethod
from typing import Union
Expand All @@ -16,5 +17,5 @@ def __init__(self, config: AbstractQueueConfig, destination: AbstractDestination
self.transform = transform

@abstractmethod
def run(self, stop_event: Union[multiprocessing.Event, threading.Event]) -> PipelineReturnStatus:
def run(self, stop_event: Union[multiprocessing.synchronize.Event, threading.Event]) -> PipelineReturnStatus:
pass
5 changes: 4 additions & 1 deletion bizon/engine/pipeline/producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import ast
import multiprocessing
import multiprocessing.synchronize
import threading
import traceback
from datetime import datetime
Expand Down Expand Up @@ -101,7 +102,9 @@ def is_queue_full(self, cursor: Cursor) -> Tuple[bool, int, int]:

return False, queue_size, approximate_nb_records_in_queue

def run(self, job_id: int, stop_event: Union[multiprocessing.Event, threading.Event]) -> PipelineReturnStatus:
def run(
self, job_id: int, stop_event: Union[multiprocessing.synchronize.Event, threading.Event]
) -> PipelineReturnStatus:

return_value: PipelineReturnStatus = PipelineReturnStatus.SUCCESS

Expand Down
3 changes: 2 additions & 1 deletion bizon/engine/queue/adapters/python_queue/consumer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import multiprocessing
import multiprocessing.synchronize
import threading
import traceback
from typing import Union
Expand All @@ -21,7 +22,7 @@ def __init__(
super().__init__(config, destination=destination, transform=transform)
self.queue = queue

def run(self, stop_event: Union[threading.Event, multiprocessing.Event]) -> PipelineReturnStatus:
def run(self, stop_event: Union[threading.Event, multiprocessing.synchronize.Event]) -> PipelineReturnStatus:
while True:
if stop_event.is_set():
logger.info("Stop event is set, closing consumer ...")
Expand Down
7 changes: 6 additions & 1 deletion bizon/engine/runner/adapters/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,9 @@ def run(self) -> RunnerStatus:
logger.error("Consumer thread failed, stopping producer ...")
producer_stop_event.set()

return RunnerStatus(producer=future_producer.result(), consumer=future_consumer.result())
runner_status = RunnerStatus(producer=future_producer.result(), consumer=future_consumer.result())

if not runner_status.is_success:
logger.error(runner_status.to_string())

return runner_status
6 changes: 6 additions & 0 deletions bizon/engine/runner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,9 @@ def is_success(self):
return True
else:
return False

def to_string(self):
return (
f"Pipeline finished with status {'Success' if self.is_success else 'Failure'} "
f"(Producer: {self.producer.value}, Consumer: {self.consumer.value})"
)
8 changes: 6 additions & 2 deletions bizon/engine/runner/runner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import multiprocessing
import multiprocessing.synchronize
import sys
import threading
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -183,7 +184,7 @@ def instanciate_and_run_producer(
bizon_config: BizonConfig,
config: dict,
job_id: str,
stop_event: Union[multiprocessing.Event, threading.Event],
stop_event: Union[multiprocessing.synchronize.Event, threading.Event],
**kwargs,
):

Expand All @@ -203,7 +204,10 @@ def instanciate_and_run_producer(

@staticmethod
def instanciate_and_run_consumer(
bizon_config: BizonConfig, job_id: str, stop_event: Union[multiprocessing.Event, threading.Event], **kwargs
bizon_config: BizonConfig,
job_id: str,
stop_event: Union[multiprocessing.synchronize.Event, threading.Event],
**kwargs,
):

queue = AbstractRunner.get_queue(bizon_config=bizon_config, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion tests/cli/test_e2e_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ def test_e2e_run_command_dummy_to_file():

result = runner.invoke(cli, ["run", "config.yml"], catch_exceptions=True)
assert result.exit_code == 0
assert result.output == "Pipeline finished.\n"
assert result.output == "Pipeline finished successfully.\n"
49 changes: 49 additions & 0 deletions tests/cli/test_e2e_cli_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import tempfile

from click.testing import CliRunner

from bizon.cli.main import cli

BIZON_CONFIG_DUMMY_TO_FILE = f"""
name: test_job_3

source:
source_name: dummy
stream_name: creatures

authentication:
type: api_key
params:
token: dummy_key
sleep: 2

destination:
name: file
config:
filepath: test.jsonl

transforms:
- label: transform_data
python: |
if 'name' in data:
data['name'] = data['this_key_doesnt_exist'].upper()
"""


def test_e2e_run_command_dummy_to_file():

runner = CliRunner(mix_stderr=False)

with tempfile.NamedTemporaryFile(delete=False) as temp:
# Write config in temp file
with open(temp.name, "w") as f:
f.write(BIZON_CONFIG_DUMMY_TO_FILE)

runner = CliRunner(mix_stderr=False)
result = runner.invoke(cli, ["run", temp.name])

assert result.exit_code == 1
assert (
"Pipeline finished with status Failure (Producer: killed_by_runner, Consumer: transform_error)"
in result.stderr
)
2 changes: 1 addition & 1 deletion tests/engine/test_producer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import threading
from datetime import datetime
from queue import Queue

Expand All @@ -10,7 +11,6 @@
from bizon.engine.engine import RunnerFactory
from bizon.engine.pipeline.producer import Producer
from bizon.source.models import SourceIteration, SourceRecord
import threading


@pytest.fixture(scope="function")
Expand Down
Loading