diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 6b4047227..88600f70c 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -58,6 +58,16 @@ KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS: Final = 2.0 SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS: Final = 5.0 +# For good startup performance the consumption of multiple +# records for each consume round is essential. +# Consumer default is 1 message for each consume call and after +# startup the default is a good value. If consumer would expect +# more messages it would return control back after timeout and +# making schema storing latency to be `processing time + timeout`. +MAX_MESSAGES_TO_CONSUME_ON_STARTUP: Final = 1000 +MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP: Final = 1 +MESSAGE_CONSUME_TIMEOUT_SECONDS: Final = 0.2 + # Metric names METRIC_SCHEMA_TOPIC_RECORDS_PROCESSED_COUNT: Final = "karapace_schema_reader_records_processed" METRIC_SCHEMA_TOPIC_RECORDS_PER_KEYMODE_GAUGE: Final = "karapace_schema_reader_records_per_keymode" @@ -120,11 +130,8 @@ def __init__( ) -> None: Thread.__init__(self, name="schema-reader") self.master_coordinator = master_coordinator - self.timeout_s = 0.2 - # Consumer default is 1 message for each consume call - # For good startup performance the consumption of multiple - # records for each consume round is essential. - self.max_messages_to_process = 1000 + self.timeout_s = MESSAGE_CONSUME_TIMEOUT_SECONDS + self.max_messages_to_process = MAX_MESSAGES_TO_CONSUME_ON_STARTUP self.config = config self.database = database @@ -301,6 +308,7 @@ def _is_ready(self) -> bool: self.startup_previous_processed_offset = self.offset ready = self.offset >= self._highest_offset if ready: + self.max_messages_to_process = MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP LOG.info("Ready in %s seconds", time.monotonic() - self.start_time) return ready diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index 12a404ac5..052c3ef7f 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -10,7 +10,13 @@ from karapace.config import DEFAULTS from karapace.in_memory_database import InMemoryDatabase from karapace.offset_watcher import OffsetWatcher -from karapace.schema_reader import KafkaSchemaReader, OFFSET_EMPTY, OFFSET_UNINITIALIZED +from karapace.schema_reader import ( + KafkaSchemaReader, + MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP, + MAX_MESSAGES_TO_CONSUME_ON_STARTUP, + OFFSET_EMPTY, + OFFSET_UNINITIALIZED, +) from tests.base_testcase import BaseTestCase from unittest.mock import Mock @@ -154,3 +160,27 @@ def test_readiness_check(testcase: ReadinessTestCase) -> None: schema_reader.handle_messages() assert schema_reader.ready is testcase.expected + + +def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None: + key_formatter_mock = Mock() + consumer_mock = Mock() + consumer_mock.consume.return_value = [] + # Return tuple (beginning, end), end offset is the next upcoming record offset + consumer_mock.get_watermark_offsets.return_value = (0, 1) + + offset_watcher = OffsetWatcher() + schema_reader = KafkaSchemaReader( + config=DEFAULTS, + offset_watcher=offset_watcher, + key_formatter=key_formatter_mock, + master_coordinator=None, + database=InMemoryDatabase(), + ) + schema_reader.consumer = consumer_mock + schema_reader.offset = 0 + assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_ON_STARTUP + + schema_reader.handle_messages() + assert schema_reader.ready is True + assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP