Skip to content

Commit

Permalink
python(feature): timeout-based flushing for buffered ingestion (#88)
Browse files Browse the repository at this point in the history
  • Loading branch information
solidiquis authored Sep 4, 2024
1 parent aa8e262 commit 6f934ec
Show file tree
Hide file tree
Showing 3 changed files with 293 additions and 62 deletions.
123 changes: 123 additions & 0 deletions python/lib/sift_py/ingestion/_service_test.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import random
from contextlib import contextmanager
from datetime import datetime, timezone
from time import sleep
from typing import Callable, List

import pytest
from pytest_mock import MockFixture
from sift.ingest.v1.ingest_pb2 import IngestWithConfigDataStreamRequest
from sift.ingestion_configs.v1.ingestion_configs_pb2 import FlowConfig as FlowConfigPb
from sift.ingestion_configs.v1.ingestion_configs_pb2 import IngestionConfig as IngestionConfigPb

Expand Down Expand Up @@ -184,6 +187,57 @@ def mock_ctx_manager():
assert len(buffered_ingestion._buffer) == 0
assert mock_ingest.call_count == 7

with mock_ctx_manager():
on_error_spy = mocker.stub()

def on_error(
err: BaseException, requests: List[IngestWithConfigDataStreamRequest], _flush: Callable
):
on_error_spy()
pass

with pytest.raises(Exception):
with ingestion_service.buffered_ingestion(on_error=on_error) as buffered_ingestion:
for _ in range(6_600):
buffered_ingestion.ingest_flows(
{
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [double_value(random.random())],
}
)
raise

on_error_spy.assert_called_once()
assert len(buffered_ingestion._buffer) == 600
assert mock_ingest.call_count == 6

with mock_ctx_manager():
on_error_flush_spy = mocker.stub()

def on_error(
err: BaseException, requests: List[IngestWithConfigDataStreamRequest], _flush: Callable
):
on_error_flush_spy()
_flush()
pass

with pytest.raises(Exception):
with ingestion_service.buffered_ingestion(on_error=on_error) as buffered_ingestion:
for _ in range(6_600):
buffered_ingestion.ingest_flows(
{
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [double_value(random.random())],
}
)
raise

on_error_spy.assert_called_once()
assert len(buffered_ingestion._buffer) == 0
assert mock_ingest.call_count == 7


def test_ingestion_service_modify_existing_channel_configs(mocker: MockFixture):
"""
Expand Down Expand Up @@ -353,3 +407,72 @@ def test_ingestion_service_register_new_flow(mocker: MockFixture):
ingestion_service.create_flow(new_flow_config_name_collision)
assert ingestion_service.flow_configs_by_name["my_new_flow"] == new_flow_config_name_collision
assert ingestion_service.flow_configs_by_name["my_new_flow"] != new_flow_config


def test_ingestion_service_buffered_ingestion_flush_timeout(mocker: MockFixture):
"""
Test for timeout based flush mechanism in buffered ingestion. If buffer hasn't been flushed
after a certain time then the buffer will be automatically flushed.
"""

mock_ingest = mocker.patch.object(IngestionService, "ingest")
mock_ingest.return_value = None

readings_flow = FlowConfig(
name="readings",
channels=[
ChannelConfig(
name="my-channel",
data_type=ChannelDataType.DOUBLE,
),
],
)

telemetry_config = TelemetryConfig(
asset_name="my-asset",
ingestion_client_key="ingestion-client-key",
flows=[readings_flow],
)

mock_ingestion_config = IngestionConfigPb(
ingestion_config_id="ingestion-config-id",
asset_id="asset-id",
client_key="client-key",
)

mock_get_ingestion_config_by_client_key = mocker.patch(
_mock_path(get_ingestion_config_by_client_key)
)
mock_get_ingestion_config_by_client_key.return_value = mock_ingestion_config

mock_get_ingestion_config_flows = mocker.patch(_mock_path(get_ingestion_config_flows))
mock_get_ingestion_config_flows.return_value = [readings_flow.as_pb(FlowConfigPb)]

ingestion_service = IngestionService(MockChannel(), telemetry_config)

@contextmanager
def mock_ctx_manager():
yield
mock_ingest.reset_mock()

with mock_ctx_manager():
with ingestion_service.buffered_ingestion(flush_interval_sec=2) as buffered_ingestion:
assert buffered_ingestion._buffer_size == 1_000

for _ in range(1_500):
buffered_ingestion.try_ingest_flows(
{
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{"channel_name": "my-channel", "value": double_value(random.random())}
],
}
)
assert mock_ingest.call_count == 1
assert len(buffered_ingestion._buffer) == 500

# This will cause the flush timer to flush based on provided interval
sleep(5)
assert mock_ingest.call_count == 2
assert len(buffered_ingestion._buffer) == 0
178 changes: 126 additions & 52 deletions python/lib/sift_py/ingestion/buffer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import threading
from contextlib import contextmanager
from types import TracebackType
from typing import Generic, List, Optional, Type, TypeVar
from typing import Callable, Generic, List, Optional, Type, TypeVar

from sift.ingest.v1.ingest_pb2 import IngestWithConfigDataStreamRequest
from typing_extensions import Self
from typing_extensions import Self, TypeAlias

from sift_py.ingestion._internal.ingest import _IngestionServiceImpl
from sift_py.ingestion.flow import Flow, FlowOrderedChannelValues
Expand All @@ -11,16 +13,46 @@

T = TypeVar("T", bound=_IngestionServiceImpl)

FlushCallback: TypeAlias = Callable[[], None]
OnErrorCallback: TypeAlias = Callable[
[BaseException, List[IngestWithConfigDataStreamRequest], FlushCallback], None
]


class BufferedIngestionService(Generic[T]):
"""
See `sift_py.ingestion.service.IngestionService.buffered_ingestion`
for more information and how to leverage buffered ingestion.
"""

_buffer: List[IngestWithConfigDataStreamRequest]
_buffer_size: int
_ingestion_service: T
_flush_interval_sec: Optional[float]
_flush_timer: Optional[threading.Timer]
_lock: Optional[threading.Lock]
_on_error: Optional[OnErrorCallback]

def __init__(self, ingestion_service: T, buffer_size: Optional[int]):
def __init__(
self,
ingestion_service: T,
buffer_size: Optional[int],
flush_interval_sec: Optional[float],
on_error: Optional[OnErrorCallback],
):
self._buffer = []
self._buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
self._ingestion_service = ingestion_service
self._on_error = on_error
self._flush_timer = None

if flush_interval_sec:
self._flush_interval_sec = flush_interval_sec
self._lock = threading.Lock()
self._start_flush_timer()
else:
self._flush_interval_sec = None
self._lock = None

def __enter__(self) -> Self:
return self
Expand All @@ -31,10 +63,17 @@ def __exit__(
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> bool:
self.flush()
self._cancel_flush_timer()

if exc_val is not None:
if self._on_error is not None:
self._on_error(exc_val, self._buffer, self.flush)
else:
self.flush()

raise exc_val
else:
self.flush()

return True

Expand All @@ -44,72 +83,107 @@ def ingest_flows(self, *flows: FlowOrderedChannelValues):
See `sift_py.ingestion.service.IngestionService.create_ingestion_request`
for more information.
"""
lhs_cursor = 0
rhs_cursor = min(
self._buffer_size - len(self._buffer),
len(flows),
)

while lhs_cursor < len(flows):
for flow in flows[lhs_cursor:rhs_cursor]:
flow_name = flow["flow_name"]
timestamp = flow["timestamp"]
channel_values = flow["channel_values"]

req = self._ingestion_service.create_ingestion_request(
flow_name=flow_name,
timestamp=timestamp,
channel_values=channel_values,
)
self._buffer.append(req)

if len(self._buffer) >= self._buffer_size:
self.flush()

lhs_cursor = rhs_cursor
with self._use_lock():
lhs_cursor = 0
rhs_cursor = min(
rhs_cursor + (self._buffer_size - len(self._buffer)),
self._buffer_size - len(self._buffer),
len(flows),
)

while lhs_cursor < len(flows):
for flow in flows[lhs_cursor:rhs_cursor]:
flow_name = flow["flow_name"]
timestamp = flow["timestamp"]
channel_values = flow["channel_values"]

req = self._ingestion_service.create_ingestion_request(
flow_name=flow_name,
timestamp=timestamp,
channel_values=channel_values,
)
self._buffer.append(req)

if len(self._buffer) >= self._buffer_size:
self._flush()

lhs_cursor = rhs_cursor
rhs_cursor = min(
rhs_cursor + (self._buffer_size - len(self._buffer)),
len(flows),
)

def try_ingest_flows(self, *flows: Flow):
"""
Ingests flows in batches and performs client-side validations for each request
generated from a flow. See `sift_py.ingestion.service.IngestionService.try_create_ingestion_request`
for more information.
"""
lhs_cursor = 0
rhs_cursor = min(
self._buffer_size - len(self._buffer),
len(flows),
)

while lhs_cursor < len(flows):
for flow in flows[lhs_cursor:rhs_cursor]:
flow_name = flow["flow_name"]
timestamp = flow["timestamp"]
channel_values = flow["channel_values"]

req = self._ingestion_service.try_create_ingestion_request(
flow_name=flow_name,
timestamp=timestamp,
channel_values=channel_values,
)
self._buffer.append(req)

if len(self._buffer) >= self._buffer_size:
self.flush()

lhs_cursor = rhs_cursor
with self._use_lock():
lhs_cursor = 0
rhs_cursor = min(
rhs_cursor + (self._buffer_size - len(self._buffer)),
self._buffer_size - len(self._buffer),
len(flows),
)

while lhs_cursor < len(flows):
for flow in flows[lhs_cursor:rhs_cursor]:
flow_name = flow["flow_name"]
timestamp = flow["timestamp"]
channel_values = flow["channel_values"]

req = self._ingestion_service.try_create_ingestion_request(
flow_name=flow_name,
timestamp=timestamp,
channel_values=channel_values,
)
self._buffer.append(req)

if len(self._buffer) >= self._buffer_size:
self._flush()

lhs_cursor = rhs_cursor
rhs_cursor = min(
rhs_cursor + (self._buffer_size - len(self._buffer)),
len(flows),
)

def flush(self):
"""
Flush and ingest all requests in buffer.
"""

if self._flush_timer and self._lock:
with self._lock:
self._flush()
self._restart_flush_timer()
else:
self._flush()

def _flush(self):
if len(self._buffer) > 0:
self._ingestion_service.ingest(*self._buffer)
self._buffer.clear()

def _start_flush_timer(self):
if self._flush_interval_sec:
self._flush_timer = threading.Timer(self._flush_interval_sec, self.flush)
self._flush_timer.start()

def _cancel_flush_timer(self):
if self._flush_timer:
self._flush_timer.cancel()
self._flush_timer = None

def _restart_flush_timer(self):
self._cancel_flush_timer()
self._start_flush_timer()

@contextmanager
def _use_lock(self):
try:
if self._lock:
self._lock.acquire()
yield
finally:
if self._lock:
self._lock.release()
Loading

0 comments on commit 6f934ec

Please sign in to comment.