Skip to content

Commit

Permalink
python(feature): timeout-based flushing for buffered ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
solidiquis committed Aug 23, 2024
1 parent 7e29667 commit 720f678
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 10 deletions.
70 changes: 70 additions & 0 deletions python/lib/sift_py/ingestion/_service_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import random
from contextlib import contextmanager
from datetime import datetime, timezone
from time import sleep

import pytest
from pytest_mock import MockFixture
Expand Down Expand Up @@ -353,3 +354,72 @@ def test_ingestion_service_register_new_flow(mocker: MockFixture):
ingestion_service.create_flow(new_flow_config_name_collision)
assert ingestion_service.flow_configs_by_name["my_new_flow"] == new_flow_config_name_collision
assert ingestion_service.flow_configs_by_name["my_new_flow"] != new_flow_config


def test_ingestion_service_buffered_ingestion_flush_timeout(mocker: MockFixture):
"""
Test for timeout based flush mechanism in buffered ingestion. If buffer hasn't been flushed
after a certain time then the buffer will be automatically flushed.
"""

mock_ingest = mocker.patch.object(IngestionService, "ingest")
mock_ingest.return_value = None

readings_flow = FlowConfig(
name="readings",
channels=[
ChannelConfig(
name="my-channel",
data_type=ChannelDataType.DOUBLE,
),
],
)

telemetry_config = TelemetryConfig(
asset_name="my-asset",
ingestion_client_key="ingestion-client-key",
flows=[readings_flow],
)

mock_ingestion_config = IngestionConfigPb(
ingestion_config_id="ingestion-config-id",
asset_id="asset-id",
client_key="client-key",
)

mock_get_ingestion_config_by_client_key = mocker.patch(
_mock_path(get_ingestion_config_by_client_key)
)
mock_get_ingestion_config_by_client_key.return_value = mock_ingestion_config

mock_get_ingestion_config_flows = mocker.patch(_mock_path(get_ingestion_config_flows))
mock_get_ingestion_config_flows.return_value = [readings_flow.as_pb(FlowConfigPb)]

ingestion_service = IngestionService(MockChannel(), telemetry_config)

@contextmanager
def mock_ctx_manager():
yield
mock_ingest.reset_mock()

with mock_ctx_manager():
with ingestion_service.buffered_ingestion(flush_interval_sec=2) as buffered_ingestion:
assert buffered_ingestion._buffer_size == 1_000

for _ in range(1_500):
buffered_ingestion.try_ingest_flows(
{
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{"channel_name": "my-channel", "value": double_value(random.random())}
],
}
)
assert mock_ingest.call_count == 1
assert len(buffered_ingestion._buffer) == 500

# This will cause the flush timer to flush based on provided interval
sleep(5)
assert mock_ingest.call_count == 2
assert len(buffered_ingestion._buffer) == 0
50 changes: 49 additions & 1 deletion python/lib/sift_py/ingestion/buffer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
from types import TracebackType
from typing import Generic, List, Optional, Type, TypeVar

Expand All @@ -13,14 +14,36 @@


class BufferedIngestionService(Generic[T]):
"""
See `sift_py.ingestion.service.IngestionService.buffered_ingestion`
for more information and how to leverage buffered ingestion.
"""

_buffer: List[IngestWithConfigDataStreamRequest]
_buffer_size: int
_ingestion_service: T
_flush_interval_sec: Optional[float]
_flush_timer: Optional[threading.Timer]
_lock: Optional[threading.Lock]

def __init__(self, ingestion_service: T, buffer_size: Optional[int]):
def __init__(
self,
ingestion_service: T,
buffer_size: Optional[int],
flush_interval_sec: Optional[float],
):
self._buffer = []
self._buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
self._ingestion_service = ingestion_service
self._flush_timer = None

if flush_interval_sec:
self._flush_interval_sec = flush_interval_sec
self._lock = threading.Lock()
self._start_flush_timer()
else:
self._flush_interval_sec = None
self._lock = None

def __enter__(self) -> Self:
return self
Expand All @@ -31,6 +54,7 @@ def __exit__(
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> bool:
self._cancel_flush_timer()
self.flush()

if exc_val is not None:
Expand Down Expand Up @@ -110,6 +134,30 @@ def flush(self):
"""
Flush and ingest all requests in buffer.
"""

# TODO: Make sure that this is the correct way to ensure that this is the only
# Python thread running. Buffer is written to in the ingest flow methods..
if self._flush_timer and self._lock:
with self._lock:
self._flush()
self._restart_flush_timer()
else:
self._flush()

def _flush(self):
if len(self._buffer) > 0:
self._ingestion_service.ingest(*self._buffer)
self._buffer.clear()

def _start_flush_timer(self):
if self._flush_interval_sec:
self._flush_timer = threading.Timer(self._flush_interval_sec, self.flush)
self._flush_timer.start()

def _cancel_flush_timer(self):
if self._flush_timer:
self._flush_timer = self._flush_timer.cancel()

def _restart_flush_timer(self):
self._cancel_flush_timer()
self._start_flush_timer()
33 changes: 24 additions & 9 deletions python/lib/sift_py/ingestion/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,23 @@ def try_ingest_flows(self, *flows: Flow):
"""
return super().try_ingest_flows(*flows)

def buffered_ingestion(self, buffer_size: Optional[int] = None) -> BufferedIngestionService:
def buffered_ingestion(
self, buffer_size: Optional[int] = None, flush_interval_sec: Optional[float] = None
) -> BufferedIngestionService:
"""
This method automates buffering requests and streams them in batches and is meant to be used
This method automates buffering requests and streams them in batches. It is recommended to be used
in a with-block. Failure to put this in a with-block may result in some data not being ingested unless
the caller explicitly calls `sift_py.ingestion.buffer.BufferedIngestionService.flush`.
the caller explicitly calls `sift_py.ingestion.buffer.BufferedIngestionService.flush` before the returned
instance of `sift_py.ingestion.buffer.BufferedIngestionService` goes out of scope. Once the with-block
is exited then a final call to the aforementioned `flush` method will be made to ingest the remaining data.
Once the with-block is exited then a final call to the aforementioned `flush` method will be made
to ingest the remaining data. If a `buffer_size` is not provided then it will default to
`sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE`.
Buffered ingestion works by automatically flushing and ingesting data into Sift whenever the buffer is filled.
The size of the buffer is configured via the `buffer_size` argument and defaults to `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE`.
It is also possible to configure buffered ingestion to periodically flush the buffer regardless of whether or not the buffer
is filled. The interval between flushes is set via the `flush_interval_sec` argument which is the number of seconds between each flush.
If a flush were to occur due to the buffer being filled, then the timer will restart. If `flush_interval_sec` is `None`, then flushes will only
occur once the buffer is filled and at the end of the scope of the with-block.
Example usage:
Expand All @@ -171,8 +179,6 @@ def buffered_ingestion(self, buffer_size: Optional[int] = None) -> BufferedInges
"channel_values": [
{
"channel_name": "my-channel",
"value": double_value(3)
}
],
})
Expand All @@ -184,9 +190,18 @@ def buffered_ingestion(self, buffer_size: Optional[int] = None) -> BufferedInges
"timestamp": datetime.now(timezone.utc),
"channel_values": [double_value(3)]
})
# With default buffer size and periodic flushes of 3.2 seconds
with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion:
for _ in range(6_000):
buffered_ingestion.ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [double_value(3)]
})
```
"""
return BufferedIngestionService(self, buffer_size)
return BufferedIngestionService(self, buffer_size, flush_interval_sec)

def create_flow(self, flow_config: FlowConfig):
"""
Expand Down

0 comments on commit 720f678

Please sign in to comment.