diff --git a/python/lib/sift_py/ingestion/_service_test.py b/python/lib/sift_py/ingestion/_service_test.py index 34b110ec..109b7d24 100644 --- a/python/lib/sift_py/ingestion/_service_test.py +++ b/python/lib/sift_py/ingestion/_service_test.py @@ -1,9 +1,12 @@ import random from contextlib import contextmanager from datetime import datetime, timezone +from time import sleep +from typing import Callable, List import pytest from pytest_mock import MockFixture +from sift.ingest.v1.ingest_pb2 import IngestWithConfigDataStreamRequest from sift.ingestion_configs.v1.ingestion_configs_pb2 import FlowConfig as FlowConfigPb from sift.ingestion_configs.v1.ingestion_configs_pb2 import IngestionConfig as IngestionConfigPb @@ -184,6 +187,57 @@ def mock_ctx_manager(): assert len(buffered_ingestion._buffer) == 0 assert mock_ingest.call_count == 7 + with mock_ctx_manager(): + on_error_spy = mocker.stub() + + def on_error( + err: BaseException, requests: List[IngestWithConfigDataStreamRequest], _flush: Callable + ): + on_error_spy() + pass + + with pytest.raises(Exception): + with ingestion_service.buffered_ingestion(on_error=on_error) as buffered_ingestion: + for _ in range(6_600): + buffered_ingestion.ingest_flows( + { + "flow_name": "readings", + "timestamp": datetime.now(timezone.utc), + "channel_values": [double_value(random.random())], + } + ) + raise + + on_error_spy.assert_called_once() + assert len(buffered_ingestion._buffer) == 600 + assert mock_ingest.call_count == 6 + + with mock_ctx_manager(): + on_error_flush_spy = mocker.stub() + + def on_error( + err: BaseException, requests: List[IngestWithConfigDataStreamRequest], _flush: Callable + ): + on_error_flush_spy() + _flush() + pass + + with pytest.raises(Exception): + with ingestion_service.buffered_ingestion(on_error=on_error) as buffered_ingestion: + for _ in range(6_600): + buffered_ingestion.ingest_flows( + { + "flow_name": "readings", + "timestamp": datetime.now(timezone.utc), + "channel_values": [double_value(random.random())], + } + ) + raise + + on_error_spy.assert_called_once() + assert len(buffered_ingestion._buffer) == 0 + assert mock_ingest.call_count == 7 + def test_ingestion_service_modify_existing_channel_configs(mocker: MockFixture): """ @@ -353,3 +407,72 @@ def test_ingestion_service_register_new_flow(mocker: MockFixture): ingestion_service.create_flow(new_flow_config_name_collision) assert ingestion_service.flow_configs_by_name["my_new_flow"] == new_flow_config_name_collision assert ingestion_service.flow_configs_by_name["my_new_flow"] != new_flow_config + + +def test_ingestion_service_buffered_ingestion_flush_timeout(mocker: MockFixture): + """ + Test for timeout based flush mechanism in buffered ingestion. If buffer hasn't been flushed + after a certain time then the buffer will be automatically flushed. + """ + + mock_ingest = mocker.patch.object(IngestionService, "ingest") + mock_ingest.return_value = None + + readings_flow = FlowConfig( + name="readings", + channels=[ + ChannelConfig( + name="my-channel", + data_type=ChannelDataType.DOUBLE, + ), + ], + ) + + telemetry_config = TelemetryConfig( + asset_name="my-asset", + ingestion_client_key="ingestion-client-key", + flows=[readings_flow], + ) + + mock_ingestion_config = IngestionConfigPb( + ingestion_config_id="ingestion-config-id", + asset_id="asset-id", + client_key="client-key", + ) + + mock_get_ingestion_config_by_client_key = mocker.patch( + _mock_path(get_ingestion_config_by_client_key) + ) + mock_get_ingestion_config_by_client_key.return_value = mock_ingestion_config + + mock_get_ingestion_config_flows = mocker.patch(_mock_path(get_ingestion_config_flows)) + mock_get_ingestion_config_flows.return_value = [readings_flow.as_pb(FlowConfigPb)] + + ingestion_service = IngestionService(MockChannel(), telemetry_config) + + @contextmanager + def mock_ctx_manager(): + yield + mock_ingest.reset_mock() + + with mock_ctx_manager(): + with ingestion_service.buffered_ingestion(flush_interval_sec=2) as buffered_ingestion: + assert buffered_ingestion._buffer_size == 1_000 + + for _ in range(1_500): + buffered_ingestion.try_ingest_flows( + { + "flow_name": "readings", + "timestamp": datetime.now(timezone.utc), + "channel_values": [ + {"channel_name": "my-channel", "value": double_value(random.random())} + ], + } + ) + assert mock_ingest.call_count == 1 + assert len(buffered_ingestion._buffer) == 500 + + # This will cause the flush timer to flush based on provided interval + sleep(5) + assert mock_ingest.call_count == 2 + assert len(buffered_ingestion._buffer) == 0 diff --git a/python/lib/sift_py/ingestion/buffer.py b/python/lib/sift_py/ingestion/buffer.py index b2d5a4d4..193d5aec 100644 --- a/python/lib/sift_py/ingestion/buffer.py +++ b/python/lib/sift_py/ingestion/buffer.py @@ -1,8 +1,10 @@ +import threading +from contextlib import contextmanager from types import TracebackType -from typing import Generic, List, Optional, Type, TypeVar +from typing import Callable, Generic, List, Optional, Type, TypeVar from sift.ingest.v1.ingest_pb2 import IngestWithConfigDataStreamRequest -from typing_extensions import Self +from typing_extensions import Self, TypeAlias from sift_py.ingestion._internal.ingest import _IngestionServiceImpl from sift_py.ingestion.flow import Flow, FlowOrderedChannelValues @@ -11,16 +13,46 @@ T = TypeVar("T", bound=_IngestionServiceImpl) +FlushCallback: TypeAlias = Callable[[], None] +OnErrorCallback: TypeAlias = Callable[ + [BaseException, List[IngestWithConfigDataStreamRequest], FlushCallback], None +] + class BufferedIngestionService(Generic[T]): + """ + See `sift_py.ingestion.service.IngestionService.buffered_ingestion` + for more information and how to leverage buffered ingestion. + """ + _buffer: List[IngestWithConfigDataStreamRequest] _buffer_size: int _ingestion_service: T + _flush_interval_sec: Optional[float] + _flush_timer: Optional[threading.Timer] + _lock: Optional[threading.Lock] + _on_error: Optional[OnErrorCallback] - def __init__(self, ingestion_service: T, buffer_size: Optional[int]): + def __init__( + self, + ingestion_service: T, + buffer_size: Optional[int], + flush_interval_sec: Optional[float], + on_error: Optional[OnErrorCallback], + ): self._buffer = [] self._buffer_size = buffer_size or DEFAULT_BUFFER_SIZE self._ingestion_service = ingestion_service + self._on_error = on_error + self._flush_timer = None + + if flush_interval_sec: + self._flush_interval_sec = flush_interval_sec + self._lock = threading.Lock() + self._start_flush_timer() + else: + self._flush_interval_sec = None + self._lock = None def __enter__(self) -> Self: return self @@ -31,10 +63,17 @@ def __exit__( exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> bool: - self.flush() + self._cancel_flush_timer() if exc_val is not None: + if self._on_error is not None: + self._on_error(exc_val, self._buffer, self.flush) + else: + self.flush() + raise exc_val + else: + self.flush() return True @@ -44,72 +83,107 @@ def ingest_flows(self, *flows: FlowOrderedChannelValues): See `sift_py.ingestion.service.IngestionService.create_ingestion_request` for more information. """ - lhs_cursor = 0 - rhs_cursor = min( - self._buffer_size - len(self._buffer), - len(flows), - ) - - while lhs_cursor < len(flows): - for flow in flows[lhs_cursor:rhs_cursor]: - flow_name = flow["flow_name"] - timestamp = flow["timestamp"] - channel_values = flow["channel_values"] - - req = self._ingestion_service.create_ingestion_request( - flow_name=flow_name, - timestamp=timestamp, - channel_values=channel_values, - ) - self._buffer.append(req) - - if len(self._buffer) >= self._buffer_size: - self.flush() - - lhs_cursor = rhs_cursor + with self._use_lock(): + lhs_cursor = 0 rhs_cursor = min( - rhs_cursor + (self._buffer_size - len(self._buffer)), + self._buffer_size - len(self._buffer), len(flows), ) + while lhs_cursor < len(flows): + for flow in flows[lhs_cursor:rhs_cursor]: + flow_name = flow["flow_name"] + timestamp = flow["timestamp"] + channel_values = flow["channel_values"] + + req = self._ingestion_service.create_ingestion_request( + flow_name=flow_name, + timestamp=timestamp, + channel_values=channel_values, + ) + self._buffer.append(req) + + if len(self._buffer) >= self._buffer_size: + self._flush() + + lhs_cursor = rhs_cursor + rhs_cursor = min( + rhs_cursor + (self._buffer_size - len(self._buffer)), + len(flows), + ) + def try_ingest_flows(self, *flows: Flow): """ Ingests flows in batches and performs client-side validations for each request generated from a flow. See `sift_py.ingestion.service.IngestionService.try_create_ingestion_request` for more information. """ - lhs_cursor = 0 - rhs_cursor = min( - self._buffer_size - len(self._buffer), - len(flows), - ) - - while lhs_cursor < len(flows): - for flow in flows[lhs_cursor:rhs_cursor]: - flow_name = flow["flow_name"] - timestamp = flow["timestamp"] - channel_values = flow["channel_values"] - - req = self._ingestion_service.try_create_ingestion_request( - flow_name=flow_name, - timestamp=timestamp, - channel_values=channel_values, - ) - self._buffer.append(req) - - if len(self._buffer) >= self._buffer_size: - self.flush() - - lhs_cursor = rhs_cursor + with self._use_lock(): + lhs_cursor = 0 rhs_cursor = min( - rhs_cursor + (self._buffer_size - len(self._buffer)), + self._buffer_size - len(self._buffer), len(flows), ) + while lhs_cursor < len(flows): + for flow in flows[lhs_cursor:rhs_cursor]: + flow_name = flow["flow_name"] + timestamp = flow["timestamp"] + channel_values = flow["channel_values"] + + req = self._ingestion_service.try_create_ingestion_request( + flow_name=flow_name, + timestamp=timestamp, + channel_values=channel_values, + ) + self._buffer.append(req) + + if len(self._buffer) >= self._buffer_size: + self._flush() + + lhs_cursor = rhs_cursor + rhs_cursor = min( + rhs_cursor + (self._buffer_size - len(self._buffer)), + len(flows), + ) + def flush(self): """ Flush and ingest all requests in buffer. """ + + if self._flush_timer and self._lock: + with self._lock: + self._flush() + self._restart_flush_timer() + else: + self._flush() + + def _flush(self): if len(self._buffer) > 0: self._ingestion_service.ingest(*self._buffer) self._buffer.clear() + + def _start_flush_timer(self): + if self._flush_interval_sec: + self._flush_timer = threading.Timer(self._flush_interval_sec, self.flush) + self._flush_timer.start() + + def _cancel_flush_timer(self): + if self._flush_timer: + self._flush_timer.cancel() + self._flush_timer = None + + def _restart_flush_timer(self): + self._cancel_flush_timer() + self._start_flush_timer() + + @contextmanager + def _use_lock(self): + try: + if self._lock: + self._lock.acquire() + yield + finally: + if self._lock: + self._lock.release() diff --git a/python/lib/sift_py/ingestion/service.py b/python/lib/sift_py/ingestion/service.py index 3f725ea1..0a1f37f7 100644 --- a/python/lib/sift_py/ingestion/service.py +++ b/python/lib/sift_py/ingestion/service.py @@ -11,7 +11,7 @@ from sift_py.grpc.transport import SiftChannel from sift_py.ingestion._internal.ingest import _IngestionServiceImpl -from sift_py.ingestion.buffer import BufferedIngestionService +from sift_py.ingestion.buffer import BufferedIngestionService, OnErrorCallback from sift_py.ingestion.channel import ChannelValue from sift_py.ingestion.config.telemetry import TelemetryConfig from sift_py.ingestion.flow import Flow, FlowConfig, FlowOrderedChannelValues @@ -149,15 +149,32 @@ def try_ingest_flows(self, *flows: Flow): """ return super().try_ingest_flows(*flows) - def buffered_ingestion(self, buffer_size: Optional[int] = None) -> BufferedIngestionService: + def buffered_ingestion( + self, + buffer_size: Optional[int] = None, + flush_interval_sec: Optional[float] = None, + on_error: Optional[OnErrorCallback] = None, + ) -> BufferedIngestionService: """ - This method automates buffering requests and streams them in batches and is meant to be used + This method automates buffering requests and streams them in batches. It is recommended to be used in a with-block. Failure to put this in a with-block may result in some data not being ingested unless - the caller explicitly calls `sift_py.ingestion.buffer.BufferedIngestionService.flush`. + the caller explicitly calls `sift_py.ingestion.buffer.BufferedIngestionService.flush` before the returned + instance of `sift_py.ingestion.buffer.BufferedIngestionService` goes out of scope. Once the with-block + is exited then a final call to the aforementioned `flush` method will be made to ingest the remaining data. + + Buffered ingestion works by automatically flushing and ingesting data into Sift whenever the buffer is filled. + The size of the buffer is configured via the `buffer_size` argument and defaults to `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE`. + + It is also possible to configure buffered ingestion to periodically flush the buffer regardless of whether or not the buffer + is filled. The interval between flushes is set via the `flush_interval_sec` argument which is the number of seconds between each flush. + If a flush were to occur due to the buffer being filled, then the timer will restart. If `flush_interval_sec` is `None`, then flushes will only + occur once the buffer is filled and at the end of the scope of the with-block. - Once the with-block is exited then a final call to the aforementioned `flush` method will be made - to ingest the remaining data. If a `buffer_size` is not provided then it will default to - `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE`. + If an error were to occur that would cause the context manager to call `__exit__`, one last attempt to flush the buffer will be made + before the error is re-raised for the caller to handle. If the caller would instead like to customize `__exit__` behavior in the case + of an error, they can make use of the `on_error` argument whose type signature is a function where the first argument is the error, + the second is the buffer containing the uningested request, and the third argument being a function where, when called, will attempt + to flush the buffer. Example usage: @@ -171,8 +188,6 @@ def buffered_ingestion(self, buffer_size: Optional[int] = None) -> BufferedInges "channel_values": [ { "channel_name": "my-channel", - "value": double_value(3) - } ], }) @@ -184,9 +199,28 @@ def buffered_ingestion(self, buffer_size: Optional[int] = None) -> BufferedInges "timestamp": datetime.now(timezone.utc), "channel_values": [double_value(3)] }) + + # With default buffer size and periodic flushes of 3.2 seconds + with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion: + for _ in range(6_000): + buffered_ingestion.ingest_flows({ + "flow_name": "readings", + "timestamp": datetime.now(timezone.utc), + "channel_values": [double_value(3)] + }) + + # Custom code to run when error + def on_error_calback(err, buffer, flush): + # Save contents of buffer to disk + ... + # Try once more to flush the buffer + flush() + + with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion: + ... ``` """ - return BufferedIngestionService(self, buffer_size) + return BufferedIngestionService(self, buffer_size, flush_interval_sec, on_error) def create_flow(self, flow_config: FlowConfig): """