diff --git a/src/sentry/sentry_metrics/consumers/indexer/common.py b/src/sentry/sentry_metrics/consumers/indexer/common.py index 96736fa3ca12de..b1b8caa218290a 100644 --- a/src/sentry/sentry_metrics/consumers/indexer/common.py +++ b/src/sentry/sentry_metrics/consumers/indexer/common.py @@ -1,17 +1,13 @@ import logging -import time -from typing import Any, List, MutableMapping, MutableSequence, Optional, Union +from typing import Any, List, MutableMapping, MutableSequence, Union from arroyo.backends.kafka import KafkaPayload from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration -from arroyo.processing.strategies import MessageRejected -from arroyo.processing.strategies import ProcessingStrategy -from arroyo.processing.strategies import ProcessingStrategy as ProcessingStep -from arroyo.types import Message, Value +from arroyo.types import Message from django.conf import settings from sentry.sentry_metrics.consumers.indexer.routing_producer import RoutingPayload -from sentry.utils import kafka_config, metrics +from sentry.utils import kafka_config MessageBatch = List[Message[KafkaPayload]] IndexerOutputMessageBatch = MutableSequence[Message[Union[RoutingPayload, KafkaPayload]]] @@ -37,123 +33,3 @@ def get_config( queued_min_messages=DEFAULT_QUEUED_MIN_MESSAGES, ) return consumer_config - - -class MetricsBatchBuilder: - """ - Batches up individual messages - type: Message[KafkaPayload] - into a - list, which will later be the payload for the big outer message - that gets passed through to the ParallelTransformStep. - - See `__flush` method of BatchMessages for when that happens. - """ - - def __init__(self, max_batch_size: int, max_batch_time: float) -> None: - self.__messages: MessageBatch = [] - self.__max_batch_size = max_batch_size - self.__deadline = time.time() + max_batch_time / 1000.0 - - def __len__(self) -> int: - return len(self.__messages) - - @property - def messages(self) -> MessageBatch: - return self.__messages - - def append(self, message: Message[KafkaPayload]) -> None: - self.__messages.append(message) - - def ready(self) -> bool: - if len(self.messages) >= self.__max_batch_size: - return True - elif time.time() >= self.__deadline: - return True - else: - return False - - -class BatchMessages(ProcessingStep[KafkaPayload]): - """ - First processing step in the MetricsConsumerStrategyFactory. - Keeps track of a batch of messages (using the MetricsBatchBuilder) - and then when at capacity, either max_batch_time or max_batch_size, - flushes the batch. - - Flushing the batch here means wrapping the batch in a Message, the batch - itself being the payload. This is what the ParallelTransformStep will - process in the process_message function. - """ - - def __init__( - self, - next_step: ProcessingStrategy[MessageBatch], - max_batch_time: float, - max_batch_size: int, - ): - self.__max_batch_size = max_batch_size - self.__max_batch_time = max_batch_time - - self.__next_step = next_step - self.__batch: Optional[MetricsBatchBuilder] = None - self.__closed = False - self.__batch_start: Optional[float] = None - # If we received MessageRejected from subsequent steps, this is set to true. - # It is reset to false upon the next successful submit. - self.__apply_backpressure = False - - def poll(self) -> None: - assert not self.__closed - - self.__next_step.poll() - - if self.__batch and self.__batch.ready(): - self.__flush() - - def submit(self, message: Message[KafkaPayload]) -> None: - if self.__apply_backpressure is True: - raise MessageRejected - - if self.__batch is None: - self.__batch_start = time.time() - self.__batch = MetricsBatchBuilder(self.__max_batch_size, self.__max_batch_time) - - self.__batch.append(message) - - if self.__batch and self.__batch.ready(): - self.__flush() - - def __flush(self) -> None: - if not self.__batch: - return - last = self.__batch.messages[-1] - - new_message = Message(Value(self.__batch.messages, last.committable)) - if self.__batch_start is not None: - elapsed_time = time.time() - self.__batch_start - metrics.timing("batch_messages.build_time", elapsed_time) - - try: - self.__next_step.submit(new_message) - if self.__apply_backpressure is True: - self.__apply_backpressure = False - self.__batch_start = None - self.__batch = None - except MessageRejected: - self.__apply_backpressure = True - - def terminate(self) -> None: - self.__closed = True - self.__next_step.terminate() - - def close(self) -> None: - self.__closed = True - - def join(self, timeout: Optional[float] = None) -> None: - if self.__batch: - last = self.__batch.messages[-1] - logger.debug( - f"Abandoning batch of {len(self.__batch)} messages...latest offset: {last.committable}" - ) - - self.__next_step.close() - self.__next_step.join(timeout) diff --git a/src/sentry/sentry_metrics/consumers/indexer/parallel.py b/src/sentry/sentry_metrics/consumers/indexer/parallel.py index 1edc5b03aa5e98..9d38efa6b6f331 100644 --- a/src/sentry/sentry_metrics/consumers/indexer/parallel.py +++ b/src/sentry/sentry_metrics/consumers/indexer/parallel.py @@ -7,11 +7,9 @@ from arroyo.backends.kafka import KafkaConsumer, KafkaPayload from arroyo.commit import CommitPolicy from arroyo.processing import StreamProcessor -from arroyo.processing.strategies import ProcessingStrategy -from arroyo.processing.strategies import ProcessingStrategy as ProcessingStep -from arroyo.processing.strategies import ProcessingStrategyFactory +from arroyo.processing.strategies import ProcessingStrategy, ProcessingStrategyFactory, Reduce from arroyo.processing.strategies.transform import ParallelTransformStep -from arroyo.types import Commit, Message, Partition, Topic +from arroyo.types import BaseValue, Commit, Message, Partition, Topic from django.conf import settings from sentry.sentry_metrics.configuration import ( @@ -19,8 +17,8 @@ initialize_sentry_and_global_consumer_state, ) from sentry.sentry_metrics.consumers.indexer.common import ( - BatchMessages, IndexerOutputMessageBatch, + MessageBatch, get_config, ) from sentry.sentry_metrics.consumers.indexer.multiprocess import SimpleProduceStep @@ -35,10 +33,10 @@ logger = logging.getLogger(__name__) -class Unbatcher(ProcessingStep[IndexerOutputMessageBatch]): +class Unbatcher(ProcessingStrategy[IndexerOutputMessageBatch]): def __init__( self, - next_step: ProcessingStep[Union[KafkaPayload, RoutingPayload]], + next_step: ProcessingStrategy[Union[KafkaPayload, RoutingPayload]], ) -> None: self.__next_step = next_step self.__closed = False @@ -146,8 +144,19 @@ def create_with_partitions( ), ) - strategy = BatchMessages( - parallel_strategy, self.__max_msg_batch_time, self.__max_msg_batch_size + def accumulator(result: MessageBatch, value: BaseValue[KafkaPayload]) -> MessageBatch: + result.append(Message(value)) + return result + + def initializer() -> MessageBatch: + return [] + + strategy = Reduce( + self.__max_msg_batch_size, + self.__max_msg_batch_time, + accumulator, + initializer, + parallel_strategy, ) return strategy @@ -218,4 +227,5 @@ def get_parallel_metrics_consumer( min_commit_frequency_sec=max_batch_time / 1000, min_commit_messages=max_batch_size, ), + join_timeout=0.0, ) diff --git a/tests/sentry/sentry_metrics/test_multiprocess_steps.py b/tests/sentry/sentry_metrics/test_multiprocess_steps.py index bf41d067c276e8..49db5f986d2c52 100644 --- a/tests/sentry/sentry_metrics/test_multiprocess_steps.py +++ b/tests/sentry/sentry_metrics/test_multiprocess_steps.py @@ -1,20 +1,16 @@ import logging import pickle -import time from copy import deepcopy from datetime import datetime, timezone from typing import Dict, List, MutableMapping, Sequence, Union -from unittest.mock import Mock, call import pytest from arroyo.backends.kafka import KafkaPayload -from arroyo.processing.strategies import MessageRejected from arroyo.types import BrokerValue, Message, Partition, Topic, Value from sentry.ratelimits.cardinality import CardinalityLimiter from sentry.sentry_metrics.configuration import IndexerStorage, UseCaseKey, get_ingest_config from sentry.sentry_metrics.consumers.indexer.batch import invalid_metric_tags, valid_metric_name -from sentry.sentry_metrics.consumers.indexer.common import BatchMessages, MetricsBatchBuilder from sentry.sentry_metrics.consumers.indexer.processing import MessageProcessor from sentry.sentry_metrics.indexer.limiters.cardinality import ( TimeseriesCardinalityLimiter, @@ -61,156 +57,6 @@ def compare_message_batches_ignoring_metadata( compare_messages_ignoring_mapping_metadata(a, e) -def _batch_message_set_up(next_step: Mock, max_batch_time: float = 100.0, max_batch_size: int = 2): - # batch time is in seconds - batch_messages_step = BatchMessages( - next_step=next_step, max_batch_time=max_batch_time, max_batch_size=max_batch_size - ) - - message1 = Message( - BrokerValue( - KafkaPayload(None, b"some value", []), Partition(Topic("topic"), 0), 1, datetime.now() - ) - ) - message2 = Message( - BrokerValue( - KafkaPayload(None, b"another value", []), - Partition(Topic("topic"), 0), - 2, - datetime.now(), - ) - ) - return (batch_messages_step, message1, message2) - - -def test_batch_messages() -> None: - next_step = Mock() - - batch_messages_step, message1, message2 = _batch_message_set_up(next_step) - - # submit the first message, batch builder should should be created - # and the messaged added to the batch - batch_messages_step.submit(message=message1) - - assert len(batch_messages_step._BatchMessages__batch) == 1 - - # neither batch_size or batch_time as been met so poll shouldn't - # do anything yet (aka shouldn't flush and call next_step.submit) - batch_messages_step.poll() - - assert len(batch_messages_step._BatchMessages__batch) == 1 - assert not next_step.submit.called - - # submit the second message, message should be added to the batch - # which will now saturate the batch_size (2). This will trigger - # __flush which in turn calls next.submit and reset the batch to None - batch_messages_step.submit(message=message2) - - assert next_step.submit.call_args == call( - Message(Value([message1, message2], message2.committable)), - ) - - assert batch_messages_step._BatchMessages__batch is None - - -def test_batch_messages_rejected_message(): - next_step = Mock() - next_step.submit.side_effect = MessageRejected() - - batch_messages_step, message1, message2 = _batch_message_set_up(next_step) - - batch_messages_step.poll() - batch_messages_step.submit(message=message1) - - # if we try to submit a batch when the next step is - # not ready to accept more messages we'll get a - # MessageRejected error. This will be reraised for - # to the stream processor on the subsequent call to submit - batch_messages_step.submit(message=message2) - - with pytest.raises(MessageRejected): - batch_messages_step.submit(message=message2) - - # when poll is called, we still try to flush the batch - # caust its full but we handled the MessageRejected error - batch_messages_step.poll() - assert next_step.submit.called - - -def test_batch_messages_join(): - next_step = Mock() - - batch_messages_step, message1, _ = _batch_message_set_up(next_step) - - batch_messages_step.poll() - batch_messages_step.submit(message=message1) - # A rebalance, restart, scale up or any other event - # that causes partitions to be revoked will call join - batch_messages_step.join(timeout=3) - # we don't flush the batch - assert not next_step.submit.called - - -def test_metrics_batch_builder(): - max_batch_time = 3.0 # seconds - max_batch_size = 2 - - # 1. Ready when max_batch_size is reached - batch_builder_size = MetricsBatchBuilder( - max_batch_size=max_batch_size, max_batch_time=max_batch_time - ) - - assert not batch_builder_size.ready() - - message1 = Message( - BrokerValue( - KafkaPayload(None, b"some value", []), Partition(Topic("topic"), 0), 1, datetime.now() - ) - ) - batch_builder_size.append(message1) - assert not batch_builder_size.ready() - - message2 = Message( - BrokerValue( - KafkaPayload(None, b"another value", []), - Partition(Topic("topic"), 0), - 2, - datetime.now(), - ) - ) - batch_builder_size.append(message2) - assert batch_builder_size.ready() - - # 2. Ready when max_batch_time is reached - batch_builder_time = MetricsBatchBuilder( - max_batch_size=max_batch_size, max_batch_time=max_batch_time - ) - - assert not batch_builder_time.ready() - - message1 = Message( - BrokerValue( - KafkaPayload(None, b"some value", []), Partition(Topic("topic"), 0), 1, datetime.now() - ) - ) - batch_builder_time.append(message1) - assert not batch_builder_time.ready() - - time.sleep(3) - assert batch_builder_time.ready() - - # 3. Adding the same message twice to the same batch - batch_builder_time = MetricsBatchBuilder( - max_batch_size=max_batch_size, max_batch_time=max_batch_time - ) - message1 = Message( - BrokerValue( - KafkaPayload(None, b"some value", []), Partition(Topic("topic"), 0), 1, datetime.now() - ) - ) - batch_builder_time.append(message1) - - ts = int(datetime.now(tz=timezone.utc).timestamp()) counter_payload = { "name": SessionMRI.SESSION.value,