Skip to content

Commit

Permalink
feat: pipeline error handling and CLI return status (#4)
Browse files Browse the repository at this point in the history
* feat: implement failure handling for pipeline (#1)

* wip

* wip

* Update gorgias.yml

* Update gorgias.yml

* wip

* feat: add destination unnesting for file

* feat: assert schema is provided if unnest set to True

* add test for config parsing unnest

* Update process.py

* chore: test shutdown wait true pending

* chore(fix): cast bizon date columns as string in bq_streaming

* feat: implement stop events, runner status, refacto time partitioning

* fix test

* delete gha

---------

Co-authored-by: Anas El Mhamdi <[email protected]>

* feat(cli): raise python error if pipeline error (#3)

* fix: multiprocessing event type

* feat: cli return exist code 1 if pipeline error

* cli raises clickException

* always run pytest on PR

---------

Co-authored-by: Anas El Mhamdi <[email protected]>
  • Loading branch information
aballiet and anaselmhamdi authored Dec 20, 2024
1 parent 19127b5 commit 034925b
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 11 deletions.
1 change: 0 additions & 1 deletion .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ name: Pytest bizon

on:
pull_request:
branches: [ "main" ]

permissions:
contents: read
Expand Down
8 changes: 6 additions & 2 deletions bizon/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,13 @@ def run(
set_runner_in_config(config=config, runner=runner)

runner = RunnerFactory.create_from_config_dict(config=config)
runner.run()
result = runner.run()

click.echo("Pipeline finished.")
if result.is_success:
click.secho("Pipeline finished successfully.", fg="green")

else:
raise click.exceptions.ClickException(result.to_string())


if __name__ == "__main__":
Expand Down
3 changes: 2 additions & 1 deletion bizon/engine/pipeline/consumer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import multiprocessing
import multiprocessing.synchronize
import threading
from abc import ABC, abstractmethod
from typing import Union
Expand All @@ -16,5 +17,5 @@ def __init__(self, config: AbstractQueueConfig, destination: AbstractDestination
self.transform = transform

@abstractmethod
def run(self, stop_event: Union[multiprocessing.Event, threading.Event]) -> PipelineReturnStatus:
def run(self, stop_event: Union[multiprocessing.synchronize.Event, threading.Event]) -> PipelineReturnStatus:
pass
5 changes: 4 additions & 1 deletion bizon/engine/pipeline/producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import ast
import multiprocessing
import multiprocessing.synchronize
import threading
import traceback
from datetime import datetime
Expand Down Expand Up @@ -101,7 +102,9 @@ def is_queue_full(self, cursor: Cursor) -> Tuple[bool, int, int]:

return False, queue_size, approximate_nb_records_in_queue

def run(self, job_id: int, stop_event: Union[multiprocessing.Event, threading.Event]) -> PipelineReturnStatus:
def run(
self, job_id: int, stop_event: Union[multiprocessing.synchronize.Event, threading.Event]
) -> PipelineReturnStatus:

return_value: PipelineReturnStatus = PipelineReturnStatus.SUCCESS

Expand Down
3 changes: 2 additions & 1 deletion bizon/engine/queue/adapters/python_queue/consumer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import multiprocessing
import multiprocessing.synchronize
import threading
import traceback
from typing import Union
Expand All @@ -21,7 +22,7 @@ def __init__(
super().__init__(config, destination=destination, transform=transform)
self.queue = queue

def run(self, stop_event: Union[threading.Event, multiprocessing.Event]) -> PipelineReturnStatus:
def run(self, stop_event: Union[threading.Event, multiprocessing.synchronize.Event]) -> PipelineReturnStatus:
while True:
if stop_event.is_set():
logger.info("Stop event is set, closing consumer ...")
Expand Down
7 changes: 6 additions & 1 deletion bizon/engine/runner/adapters/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,9 @@ def run(self) -> RunnerStatus:
logger.error("Consumer thread failed, stopping producer ...")
producer_stop_event.set()

return RunnerStatus(producer=future_producer.result(), consumer=future_consumer.result())
runner_status = RunnerStatus(producer=future_producer.result(), consumer=future_consumer.result())

if not runner_status.is_success:
logger.error(runner_status.to_string())

return runner_status
6 changes: 6 additions & 0 deletions bizon/engine/runner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,9 @@ def is_success(self):
return True
else:
return False

def to_string(self):
return (
f"Pipeline finished with status {'Success' if self.is_success else 'Failure'} "
f"(Producer: {self.producer.value}, Consumer: {self.consumer.value})"
)
8 changes: 6 additions & 2 deletions bizon/engine/runner/runner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import multiprocessing
import multiprocessing.synchronize
import sys
import threading
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -183,7 +184,7 @@ def instanciate_and_run_producer(
bizon_config: BizonConfig,
config: dict,
job_id: str,
stop_event: Union[multiprocessing.Event, threading.Event],
stop_event: Union[multiprocessing.synchronize.Event, threading.Event],
**kwargs,
):

Expand All @@ -203,7 +204,10 @@ def instanciate_and_run_producer(

@staticmethod
def instanciate_and_run_consumer(
bizon_config: BizonConfig, job_id: str, stop_event: Union[multiprocessing.Event, threading.Event], **kwargs
bizon_config: BizonConfig,
job_id: str,
stop_event: Union[multiprocessing.synchronize.Event, threading.Event],
**kwargs,
):

queue = AbstractRunner.get_queue(bizon_config=bizon_config, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion tests/cli/test_e2e_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ def test_e2e_run_command_dummy_to_file():

result = runner.invoke(cli, ["run", "config.yml"], catch_exceptions=True)
assert result.exit_code == 0
assert result.output == "Pipeline finished.\n"
assert result.output == "Pipeline finished successfully.\n"
49 changes: 49 additions & 0 deletions tests/cli/test_e2e_cli_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import tempfile

from click.testing import CliRunner

from bizon.cli.main import cli

BIZON_CONFIG_DUMMY_TO_FILE = f"""
name: test_job_3
source:
source_name: dummy
stream_name: creatures
authentication:
type: api_key
params:
token: dummy_key
sleep: 2
destination:
name: file
config:
filepath: test.jsonl
transforms:
- label: transform_data
python: |
if 'name' in data:
data['name'] = data['this_key_doesnt_exist'].upper()
"""


def test_e2e_run_command_dummy_to_file():

runner = CliRunner(mix_stderr=False)

with tempfile.NamedTemporaryFile(delete=False) as temp:
# Write config in temp file
with open(temp.name, "w") as f:
f.write(BIZON_CONFIG_DUMMY_TO_FILE)

runner = CliRunner(mix_stderr=False)
result = runner.invoke(cli, ["run", temp.name])

assert result.exit_code == 1
assert (
"Pipeline finished with status Failure (Producer: killed_by_runner, Consumer: transform_error)"
in result.stderr
)
2 changes: 1 addition & 1 deletion tests/engine/test_producer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import threading
from datetime import datetime
from queue import Queue

Expand All @@ -10,7 +11,6 @@
from bizon.engine.engine import RunnerFactory
from bizon.engine.pipeline.producer import Producer
from bizon.source.models import SourceIteration, SourceRecord
import threading


@pytest.fixture(scope="function")
Expand Down

0 comments on commit 034925b

Please sign in to comment.