Skip to content

Commit

Permalink
ref(indexer): Retry indexer refactor (#45018)
Browse files Browse the repository at this point in the history
Brings back #44721 with a small
difference.

Now we pass a join_timeout of 0.0 to the stream processor, so it shuts
down immediately dropping any in process batches.

This matches the current behaviour in the batching step which
immediately drops any in flight batches during shutdown
https://github.com/getsentry/sentry/blob/be6286e1d40e445760dd6f5c209574090cc05add/src/sentry/sentry_metrics/consumers/indexer/common.py#L151-L156

The previous version of the PR took a much long time to revoke
partitions as it attempted to finish processing all in flight batches.
This caused timeouts and other weird behaviour during deployment /
rebalancing.
  • Loading branch information
lynnagara authored Feb 23, 2023
1 parent a88f766 commit cda307a
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 290 deletions.
130 changes: 3 additions & 127 deletions src/sentry/sentry_metrics/consumers/indexer/common.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
import logging
import time
from typing import Any, List, MutableMapping, MutableSequence, Optional, Union
from typing import Any, List, MutableMapping, MutableSequence, Union

from arroyo.backends.kafka import KafkaPayload
from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration
from arroyo.processing.strategies import MessageRejected
from arroyo.processing.strategies import ProcessingStrategy
from arroyo.processing.strategies import ProcessingStrategy as ProcessingStep
from arroyo.types import Message, Value
from arroyo.types import Message
from django.conf import settings

from sentry.sentry_metrics.consumers.indexer.routing_producer import RoutingPayload
from sentry.utils import kafka_config, metrics
from sentry.utils import kafka_config

MessageBatch = List[Message[KafkaPayload]]
IndexerOutputMessageBatch = MutableSequence[Message[Union[RoutingPayload, KafkaPayload]]]
Expand All @@ -37,123 +33,3 @@ def get_config(
queued_min_messages=DEFAULT_QUEUED_MIN_MESSAGES,
)
return consumer_config


class MetricsBatchBuilder:
"""
Batches up individual messages - type: Message[KafkaPayload] - into a
list, which will later be the payload for the big outer message
that gets passed through to the ParallelTransformStep.
See `__flush` method of BatchMessages for when that happens.
"""

def __init__(self, max_batch_size: int, max_batch_time: float) -> None:
self.__messages: MessageBatch = []
self.__max_batch_size = max_batch_size
self.__deadline = time.time() + max_batch_time / 1000.0

def __len__(self) -> int:
return len(self.__messages)

@property
def messages(self) -> MessageBatch:
return self.__messages

def append(self, message: Message[KafkaPayload]) -> None:
self.__messages.append(message)

def ready(self) -> bool:
if len(self.messages) >= self.__max_batch_size:
return True
elif time.time() >= self.__deadline:
return True
else:
return False


class BatchMessages(ProcessingStep[KafkaPayload]):
"""
First processing step in the MetricsConsumerStrategyFactory.
Keeps track of a batch of messages (using the MetricsBatchBuilder)
and then when at capacity, either max_batch_time or max_batch_size,
flushes the batch.
Flushing the batch here means wrapping the batch in a Message, the batch
itself being the payload. This is what the ParallelTransformStep will
process in the process_message function.
"""

def __init__(
self,
next_step: ProcessingStrategy[MessageBatch],
max_batch_time: float,
max_batch_size: int,
):
self.__max_batch_size = max_batch_size
self.__max_batch_time = max_batch_time

self.__next_step = next_step
self.__batch: Optional[MetricsBatchBuilder] = None
self.__closed = False
self.__batch_start: Optional[float] = None
# If we received MessageRejected from subsequent steps, this is set to true.
# It is reset to false upon the next successful submit.
self.__apply_backpressure = False

def poll(self) -> None:
assert not self.__closed

self.__next_step.poll()

if self.__batch and self.__batch.ready():
self.__flush()

def submit(self, message: Message[KafkaPayload]) -> None:
if self.__apply_backpressure is True:
raise MessageRejected

if self.__batch is None:
self.__batch_start = time.time()
self.__batch = MetricsBatchBuilder(self.__max_batch_size, self.__max_batch_time)

self.__batch.append(message)

if self.__batch and self.__batch.ready():
self.__flush()

def __flush(self) -> None:
if not self.__batch:
return
last = self.__batch.messages[-1]

new_message = Message(Value(self.__batch.messages, last.committable))
if self.__batch_start is not None:
elapsed_time = time.time() - self.__batch_start
metrics.timing("batch_messages.build_time", elapsed_time)

try:
self.__next_step.submit(new_message)
if self.__apply_backpressure is True:
self.__apply_backpressure = False
self.__batch_start = None
self.__batch = None
except MessageRejected:
self.__apply_backpressure = True

def terminate(self) -> None:
self.__closed = True
self.__next_step.terminate()

def close(self) -> None:
self.__closed = True

def join(self, timeout: Optional[float] = None) -> None:
if self.__batch:
last = self.__batch.messages[-1]
logger.debug(
f"Abandoning batch of {len(self.__batch)} messages...latest offset: {last.committable}"
)

self.__next_step.close()
self.__next_step.join(timeout)
28 changes: 19 additions & 9 deletions src/sentry/sentry_metrics/consumers/indexer/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,18 @@
from arroyo.backends.kafka import KafkaConsumer, KafkaPayload
from arroyo.commit import CommitPolicy
from arroyo.processing import StreamProcessor
from arroyo.processing.strategies import ProcessingStrategy
from arroyo.processing.strategies import ProcessingStrategy as ProcessingStep
from arroyo.processing.strategies import ProcessingStrategyFactory
from arroyo.processing.strategies import ProcessingStrategy, ProcessingStrategyFactory, Reduce
from arroyo.processing.strategies.transform import ParallelTransformStep
from arroyo.types import Commit, Message, Partition, Topic
from arroyo.types import BaseValue, Commit, Message, Partition, Topic
from django.conf import settings

from sentry.sentry_metrics.configuration import (
MetricsIngestConfiguration,
initialize_sentry_and_global_consumer_state,
)
from sentry.sentry_metrics.consumers.indexer.common import (
BatchMessages,
IndexerOutputMessageBatch,
MessageBatch,
get_config,
)
from sentry.sentry_metrics.consumers.indexer.multiprocess import SimpleProduceStep
Expand All @@ -35,10 +33,10 @@
logger = logging.getLogger(__name__)


class Unbatcher(ProcessingStep[IndexerOutputMessageBatch]):
class Unbatcher(ProcessingStrategy[IndexerOutputMessageBatch]):
def __init__(
self,
next_step: ProcessingStep[Union[KafkaPayload, RoutingPayload]],
next_step: ProcessingStrategy[Union[KafkaPayload, RoutingPayload]],
) -> None:
self.__next_step = next_step
self.__closed = False
Expand Down Expand Up @@ -146,8 +144,19 @@ def create_with_partitions(
),
)

strategy = BatchMessages(
parallel_strategy, self.__max_msg_batch_time, self.__max_msg_batch_size
def accumulator(result: MessageBatch, value: BaseValue[KafkaPayload]) -> MessageBatch:
result.append(Message(value))
return result

def initializer() -> MessageBatch:
return []

strategy = Reduce(
self.__max_msg_batch_size,
self.__max_msg_batch_time,
accumulator,
initializer,
parallel_strategy,
)

return strategy
Expand Down Expand Up @@ -218,4 +227,5 @@ def get_parallel_metrics_consumer(
min_commit_frequency_sec=max_batch_time / 1000,
min_commit_messages=max_batch_size,
),
join_timeout=0.0,
)
154 changes: 0 additions & 154 deletions tests/sentry/sentry_metrics/test_multiprocess_steps.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
import logging
import pickle
import time
from copy import deepcopy
from datetime import datetime, timezone
from typing import Dict, List, MutableMapping, Sequence, Union
from unittest.mock import Mock, call

import pytest
from arroyo.backends.kafka import KafkaPayload
from arroyo.processing.strategies import MessageRejected
from arroyo.types import BrokerValue, Message, Partition, Topic, Value

from sentry.ratelimits.cardinality import CardinalityLimiter
from sentry.sentry_metrics.configuration import IndexerStorage, UseCaseKey, get_ingest_config
from sentry.sentry_metrics.consumers.indexer.batch import invalid_metric_tags, valid_metric_name
from sentry.sentry_metrics.consumers.indexer.common import BatchMessages, MetricsBatchBuilder
from sentry.sentry_metrics.consumers.indexer.processing import MessageProcessor
from sentry.sentry_metrics.indexer.limiters.cardinality import (
TimeseriesCardinalityLimiter,
Expand Down Expand Up @@ -61,156 +57,6 @@ def compare_message_batches_ignoring_metadata(
compare_messages_ignoring_mapping_metadata(a, e)


def _batch_message_set_up(next_step: Mock, max_batch_time: float = 100.0, max_batch_size: int = 2):
# batch time is in seconds
batch_messages_step = BatchMessages(
next_step=next_step, max_batch_time=max_batch_time, max_batch_size=max_batch_size
)

message1 = Message(
BrokerValue(
KafkaPayload(None, b"some value", []), Partition(Topic("topic"), 0), 1, datetime.now()
)
)
message2 = Message(
BrokerValue(
KafkaPayload(None, b"another value", []),
Partition(Topic("topic"), 0),
2,
datetime.now(),
)
)
return (batch_messages_step, message1, message2)


def test_batch_messages() -> None:
next_step = Mock()

batch_messages_step, message1, message2 = _batch_message_set_up(next_step)

# submit the first message, batch builder should should be created
# and the messaged added to the batch
batch_messages_step.submit(message=message1)

assert len(batch_messages_step._BatchMessages__batch) == 1

# neither batch_size or batch_time as been met so poll shouldn't
# do anything yet (aka shouldn't flush and call next_step.submit)
batch_messages_step.poll()

assert len(batch_messages_step._BatchMessages__batch) == 1
assert not next_step.submit.called

# submit the second message, message should be added to the batch
# which will now saturate the batch_size (2). This will trigger
# __flush which in turn calls next.submit and reset the batch to None
batch_messages_step.submit(message=message2)

assert next_step.submit.call_args == call(
Message(Value([message1, message2], message2.committable)),
)

assert batch_messages_step._BatchMessages__batch is None


def test_batch_messages_rejected_message():
next_step = Mock()
next_step.submit.side_effect = MessageRejected()

batch_messages_step, message1, message2 = _batch_message_set_up(next_step)

batch_messages_step.poll()
batch_messages_step.submit(message=message1)

# if we try to submit a batch when the next step is
# not ready to accept more messages we'll get a
# MessageRejected error. This will be reraised for
# to the stream processor on the subsequent call to submit
batch_messages_step.submit(message=message2)

with pytest.raises(MessageRejected):
batch_messages_step.submit(message=message2)

# when poll is called, we still try to flush the batch
# caust its full but we handled the MessageRejected error
batch_messages_step.poll()
assert next_step.submit.called


def test_batch_messages_join():
next_step = Mock()

batch_messages_step, message1, _ = _batch_message_set_up(next_step)

batch_messages_step.poll()
batch_messages_step.submit(message=message1)
# A rebalance, restart, scale up or any other event
# that causes partitions to be revoked will call join
batch_messages_step.join(timeout=3)
# we don't flush the batch
assert not next_step.submit.called


def test_metrics_batch_builder():
max_batch_time = 3.0 # seconds
max_batch_size = 2

# 1. Ready when max_batch_size is reached
batch_builder_size = MetricsBatchBuilder(
max_batch_size=max_batch_size, max_batch_time=max_batch_time
)

assert not batch_builder_size.ready()

message1 = Message(
BrokerValue(
KafkaPayload(None, b"some value", []), Partition(Topic("topic"), 0), 1, datetime.now()
)
)
batch_builder_size.append(message1)
assert not batch_builder_size.ready()

message2 = Message(
BrokerValue(
KafkaPayload(None, b"another value", []),
Partition(Topic("topic"), 0),
2,
datetime.now(),
)
)
batch_builder_size.append(message2)
assert batch_builder_size.ready()

# 2. Ready when max_batch_time is reached
batch_builder_time = MetricsBatchBuilder(
max_batch_size=max_batch_size, max_batch_time=max_batch_time
)

assert not batch_builder_time.ready()

message1 = Message(
BrokerValue(
KafkaPayload(None, b"some value", []), Partition(Topic("topic"), 0), 1, datetime.now()
)
)
batch_builder_time.append(message1)
assert not batch_builder_time.ready()

time.sleep(3)
assert batch_builder_time.ready()

# 3. Adding the same message twice to the same batch
batch_builder_time = MetricsBatchBuilder(
max_batch_size=max_batch_size, max_batch_time=max_batch_time
)
message1 = Message(
BrokerValue(
KafkaPayload(None, b"some value", []), Partition(Topic("topic"), 0), 1, datetime.now()
)
)
batch_builder_time.append(message1)


ts = int(datetime.now(tz=timezone.utc).timestamp())
counter_payload = {
"name": SessionMRI.SESSION.value,
Expand Down

0 comments on commit cda307a

Please sign in to comment.