From dd0ec0f22db4d75e9588d4f7791a4a6d54b7878d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ty=C3=A1s=20Kuti?= Date: Wed, 29 Nov 2023 16:30:39 +0100 Subject: [PATCH] Replace sync Kafka consumers with confluent_kafka one This change replaces all synchronous Kafka consumers (from the kafka-python library) with a new implementation based on confluent-kafka-python's `Consumer`, keeping the same interface as much as possible. The PyTest timeout is raised from 60s to 90s to accomodate for the default poll timeout for backups consumers (otherwise the tests would time out while still waiting for messages to arrive).o Since the `conluent_kafka.Consumer` implementation does not allow for consumers to be without a group ID, if the new `KafkaConsumer` client is not given one, we'll generate one on the fly to mimic a groupless behaviour. Resources: * confluent-kafka-python documentation: https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html# * librdkafka configuration documentation: https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md --- karapace/backup/api.py | 27 +++-- karapace/backup/backends/v3/backend.py | 22 ++-- karapace/backup/backends/writer.py | 15 ++- karapace/backup/cli.py | 9 ++ karapace/backup/errors.py | 2 +- karapace/backup/poll_timeout.py | 5 + karapace/backup/topic_configurations.py | 3 +- karapace/kafka/common.py | 14 ++- karapace/kafka/consumer.py | 68 +++++++++++ karapace/kafka/types.py | 19 +++ karapace/kafka_rest_apis/__init__.py | 3 +- karapace/kafka_utils.py | 4 +- karapace/master_coordinator.py | 4 +- karapace/schema_reader.py | 112 +++++++++--------- pytest.ini | 2 +- stubs/confluent_kafka/__init__.pyi | 21 +++- stubs/confluent_kafka/cimpl.pyi | 28 ++++- .../backup/test_get_topic_configurations.py | 3 +- .../integration/backup/test_legacy_backup.py | 53 ++++++--- tests/integration/backup/test_v3_backup.py | 43 +++---- tests/integration/conftest.py | 3 +- tests/integration/kafka/test_admin.py | 3 +- tests/integration/kafka/test_consumer.py | 79 ++++++++++++ tests/integration/kafka/test_producer.py | 4 +- tests/unit/backup/backends/test_v2.py | 47 +++----- tests/unit/backup/backends/v3/test_backend.py | 50 +++----- tests/unit/backup/test_api.py | 14 ++- tests/unit/backup/test_poll_timeout.py | 3 + tests/unit/test_schema_reader.py | 6 +- tests/utils.py | 19 ++- 30 files changed, 477 insertions(+), 208 deletions(-) create mode 100644 karapace/kafka/consumer.py create mode 100644 karapace/kafka/types.py create mode 100644 tests/integration/kafka/test_consumer.py diff --git a/karapace/backup/api.py b/karapace/backup/api.py index 4bcc72131..0d9248fbd 100644 --- a/karapace/backup/api.py +++ b/karapace/backup/api.py @@ -22,18 +22,18 @@ from .poll_timeout import PollTimeout from .topic_configurations import ConfigSource, get_topic_configurations from concurrent.futures import Future +from confluent_kafka import Message, TopicPartition from enum import Enum from functools import partial -from kafka import KafkaConsumer -from kafka.consumer.fetcher import ConsumerRecord from kafka.errors import KafkaError, TopicAlreadyExistsError -from kafka.structs import TopicPartition from karapace import constants from karapace.backup.backends.v1 import SchemaBackupV1Reader from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer, V2_MARKER from karapace.backup.backends.v3.backend import SchemaBackupV3Reader, SchemaBackupV3Writer, VerifyFailure, VerifySuccess from karapace.config import Config from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.common import translate_from_kafkaerror +from karapace.kafka.consumer import KafkaConsumer from karapace.kafka.producer import KafkaProducer from karapace.kafka_utils import kafka_admin_from_config, kafka_consumer_from_config, kafka_producer_from_config from karapace.key_format import KeyFormatter @@ -41,7 +41,7 @@ from pathlib import Path from rich.console import Console from tenacity import retry, retry_if_exception_type, RetryCallState, stop_after_delay, wait_fixed -from typing import Callable, Collection, Iterator, Literal, Mapping, NewType, Sized, TypeVar +from typing import Callable, Iterator, Literal, Mapping, NewType, Sized, TypeVar import contextlib import datetime @@ -282,9 +282,8 @@ def _consume_records( consumer: KafkaConsumer, topic_partition: TopicPartition, poll_timeout: PollTimeout, -) -> Iterator[ConsumerRecord]: - start_offset: int = consumer.beginning_offsets([topic_partition])[topic_partition] - end_offset: int = consumer.end_offsets([topic_partition])[topic_partition] +) -> Iterator[Message]: + start_offset, end_offset = consumer.get_watermark_offsets(topic_partition) last_offset = start_offset LOG.info( @@ -301,12 +300,15 @@ def _consume_records( end_offset -= 1 # high watermark to actual end offset while True: - records: Collection[ConsumerRecord] = consumer.poll(poll_timeout.milliseconds).get(topic_partition, []) - if len(records) == 0: + record: Message | None = consumer.poll(timeout=poll_timeout.seconds) + if record is None: raise StaleConsumerError(topic_partition, start_offset, end_offset, last_offset, poll_timeout) - for record in records: - yield record - last_offset = record.offset # pylint: disable=undefined-loop-variable + error = record.error() + if error is not None: + raise translate_from_kafkaerror(error) + + yield record + last_offset = record.offset() if last_offset >= end_offset: break @@ -528,6 +530,7 @@ def create_backup( with _consumer(config, topic_name) as consumer: (partition,) = consumer.partitions_for_topic(topic_name) topic_partition = TopicPartition(topic_name, partition) + consumer.assign([topic_partition]) try: data_file = _write_partition( diff --git a/karapace/backup/backends/v3/backend.py b/karapace/backup/backends/v3/backend.py index bd9b35dbd..25e08cf42 100644 --- a/karapace/backup/backends/v3/backend.py +++ b/karapace/backup/backends/v3/backend.py @@ -9,8 +9,8 @@ from .readers import read_metadata, read_records from .schema import ChecksumAlgorithm, DataFile, Header, Metadata, Record from .writers import write_metadata, write_record +from confluent_kafka import Message from dataclasses import dataclass -from kafka.consumer.fetcher import ConsumerRecord from karapace.backup.backends.reader import BaseBackupReader, Instruction, ProducerSend, RestoreTopic from karapace.backup.backends.writer import BytesBackupWriter, StdOut from karapace.backup.safe_writer import bytes_writer, staging_directory @@ -334,27 +334,31 @@ def store_metadata( def store_record( self, buffer: IO[bytes], - record: ConsumerRecord, + record: Message, ) -> None: - stats: Final = self._partition_stats[record.partition] + stats: Final = self._partition_stats[record.partition()] checksum_checkpoint: Final = stats.get_checkpoint( records_threshold=self._max_records_per_checkpoint, bytes_threshold=self._max_bytes_per_checkpoint, ) offset_start: Final = buffer.tell() + + record_key = record.key() + record_value = record.value() + write_record( buffer, record=Record( - key=record.key, - value=record.value, - headers=tuple(Header(key=key.encode(), value=value) for key, value in record.headers), - offset=record.offset, - timestamp=record.timestamp, + key=record_key.encode() if isinstance(record_key, str) else record_key, + value=record_value.encode() if isinstance(record_value, str) else record_value, + headers=tuple(Header(key=key.encode(), value=value) for key, value in record.headers() or []), + offset=record.offset(), + timestamp=record.timestamp()[1], checksum_checkpoint=checksum_checkpoint, ), running_checksum=stats.running_checksum, ) stats.update( bytes_offset=buffer.tell() - offset_start, - record_offset=record.offset, + record_offset=record.offset(), ) diff --git a/karapace/backup/backends/writer.py b/karapace/backup/backends/writer.py index c2079eb04..7d5ddc287 100644 --- a/karapace/backup/backends/writer.py +++ b/karapace/backup/backends/writer.py @@ -4,7 +4,7 @@ """ from __future__ import annotations -from kafka.consumer.fetcher import ConsumerRecord +from confluent_kafka import Message from karapace.backup.safe_writer import bytes_writer, str_writer from pathlib import Path from typing import ContextManager, Generic, IO, Iterator, Literal, Mapping, Sequence, TypeVar @@ -98,7 +98,7 @@ def store_metadata( def store_record( self, buffer: IO[B], - record: ConsumerRecord, + record: Message, ) -> None: """ Called in order for each record read from a topic to be backed up. It's safe to @@ -154,9 +154,16 @@ class BaseKVBackupWriter(StrBackupWriter, abc.ABC): def store_record( self, buffer: IO[str], - record: ConsumerRecord, + record: Message, ) -> None: - buffer.write(self.serialize_record(record.key, record.value)) + record_key = record.key() + record_value = record.value() + buffer.write( + self.serialize_record( + record_key.encode() if isinstance(record_key, str) else record_key, + record_value.encode() if isinstance(record_value, str) else record_value, + ) + ) @staticmethod @abc.abstractmethod diff --git a/karapace/backup/cli.py b/karapace/backup/cli.py index 2aeaafc2d..d8af4eaba 100644 --- a/karapace/backup/cli.py +++ b/karapace/backup/cli.py @@ -9,6 +9,7 @@ from . import api from .errors import BackupDataRestorationError, StaleConsumerError from .poll_timeout import PollTimeout +from kafka.errors import BrokerResponseError from karapace.backup.api import VerifyLevel from karapace.config import Config, read_config from typing import Iterator @@ -163,6 +164,14 @@ def main() -> None: "Try increasing --poll-timeout to give the broker more time.", ) raise SystemExit(1) from e + except BrokerResponseError as exc: + logger.exception( + "An unexpected Kafka error occurred during consuming messages with error code %s: %s - %s", + exc.errno, + exc.message, + exc.description, + ) + raise SystemExit(1) from exc if __name__ == "__main__": diff --git a/karapace/backup/errors.py b/karapace/backup/errors.py index 364052cc5..78d406626 100644 --- a/karapace/backup/errors.py +++ b/karapace/backup/errors.py @@ -2,7 +2,7 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from kafka.structs import TopicPartition +from confluent_kafka import TopicPartition from karapace.backup.poll_timeout import PollTimeout __all__ = ["BackupError", "BackupTopicAlreadyExists", "EmptyPartition", "PartitionCountError", "StaleConsumerError"] diff --git a/karapace/backup/poll_timeout.py b/karapace/backup/poll_timeout.py index 0a1b9e157..91d5871f1 100644 --- a/karapace/backup/poll_timeout.py +++ b/karapace/backup/poll_timeout.py @@ -49,3 +49,8 @@ def __repr__(self) -> str: def milliseconds(self) -> int: """Returns this poll timeout in milliseconds, anything smaller than a milliseconds is ignored (no rounding).""" return self.__value // timedelta(milliseconds=1) + + @cached_property + def seconds(self) -> float: + """Returns this poll timeout in seconds.""" + return self.__value / timedelta(seconds=1) diff --git a/karapace/backup/topic_configurations.py b/karapace/backup/topic_configurations.py index 93b9ceacf..320e2e6ee 100644 --- a/karapace/backup/topic_configurations.py +++ b/karapace/backup/topic_configurations.py @@ -4,7 +4,8 @@ """ from __future__ import annotations -from karapace.kafka.admin import ConfigSource, KafkaAdminClient +from confluent_kafka.admin import ConfigSource +from karapace.kafka.admin import KafkaAdminClient from typing import Container, Final ALL_CONFIG_SOURCES: Final = ConfigSource diff --git a/karapace/kafka/common.py b/karapace/kafka/common.py index cb38165c8..78dd1b78a 100644 --- a/karapace/kafka/common.py +++ b/karapace/kafka/common.py @@ -9,7 +9,7 @@ from concurrent.futures import Future from confluent_kafka.error import KafkaError, KafkaException from kafka.errors import AuthenticationFailedError, for_code, NoBrokersAvailable, UnknownTopicOrPartitionError -from typing import Any, Callable, NoReturn, Protocol, TypedDict, TypeVar +from typing import Any, Callable, Literal, NoReturn, Protocol, TypedDict, TypeVar from typing_extensions import Unpack import logging @@ -85,6 +85,12 @@ class KafkaClientParams(TypedDict, total=False): ssl_certfile: str | None ssl_keyfile: str | None sasl_oauth_token_provider: TokenWithExpiryProvider + # Consumer-only + auto_offset_reset: Literal["smallest", "earliest", "beginning", "largest", "latest", "end", "error"] + enable_auto_commit: bool + fetch_max_wait_ms: int + group_id: str + session_timeout_ms: int class _KafkaConfigMixin: @@ -128,6 +134,12 @@ def _get_config_from_params(self, bootstrap_servers: Iterable[str] | str, **para "ssl.certificate.location": params.get("ssl_certfile"), "ssl.key.location": params.get("ssl_keyfile"), "error_cb": self._error_callback, + # Consumer-only + "auto.offset.reset": params.get("auto_offset_reset"), + "enable.auto.commit": params.get("enable_auto_commit"), + "fetch.wait.max.ms": params.get("fetch_max_wait_ms"), + "group.id": params.get("group_id"), + "session.timeout.ms": params.get("session_timeout_ms"), } config = {key: value for key, value in config.items() if value is not None} diff --git a/karapace/kafka/consumer.py b/karapace/kafka/consumer.py new file mode 100644 index 000000000..d5174c737 --- /dev/null +++ b/karapace/kafka/consumer.py @@ -0,0 +1,68 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" + +from __future__ import annotations + +from confluent_kafka import Consumer, TopicPartition +from confluent_kafka.admin import PartitionMetadata +from confluent_kafka.error import KafkaException +from kafka.errors import KafkaTimeoutError +from karapace.kafka.common import _KafkaConfigMixin, KafkaClientParams, raise_from_kafkaexception +from typing import Iterable +from typing_extensions import Unpack + +import secrets + + +class KafkaConsumer(_KafkaConfigMixin, Consumer): + def __init__( + self, + topic: str, + bootstrap_servers: Iterable[str] | str, + verify_connection: bool = True, + **params: Unpack[KafkaClientParams], + ) -> None: + # The `confluent_kafka.Consumer` does not allow for a missing group id + # if the client of this class does not provide one, we'll generate a + # unique group id to achieve the groupless behaviour + if "group_id" not in params: + params["group_id"] = self._create_group_id() + + super().__init__(bootstrap_servers, verify_connection, **params) + + self.subscribe([topic]) + + @staticmethod + def _create_group_id() -> str: + return f"karapace-autogenerated-{secrets.token_hex(6)}" + + def partitions_for_topic(self, topic: str) -> dict[int, PartitionMetadata]: + """Returns all partition metadata for the given topic.""" + try: + return self.list_topics(topic).topics[topic].partitions + except KafkaException as exc: + raise_from_kafkaexception(exc) + + def get_watermark_offsets( + self, partition: TopicPartition, timeout: float | None = None, cached: bool = False + ) -> tuple[int, int]: + """Wrapper around `Consumer.get_watermark_offsets` to handle error cases and exceptions. + + confluent-kafka is somewhat inconsistent with error-related behaviours, + `get_watermark_offsets` returns `None` on timeouts, so we are translating it to an + exception. + """ + try: + if timeout is not None: + result = super().get_watermark_offsets(partition, timeout, cached) + else: + result = super().get_watermark_offsets(partition, cached=cached) + + if result is None: + raise KafkaTimeoutError() + + return result + except KafkaException as exc: + raise_from_kafkaexception(exc) diff --git a/karapace/kafka/types.py b/karapace/kafka/types.py new file mode 100644 index 000000000..a844e74ba --- /dev/null +++ b/karapace/kafka/types.py @@ -0,0 +1,19 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" + +from confluent_kafka import TIMESTAMP_CREATE_TIME, TIMESTAMP_LOG_APPEND_TIME, TIMESTAMP_NOT_AVAILABLE +from typing import Final + +import enum + +# A constant that corresponds to the default value of request.timeout.ms in +# the librdkafka C library +DEFAULT_REQUEST_TIMEOUT_MS: Final = 30000 + + +class Timestamp(enum.IntEnum): + NOT_AVAILABLE = TIMESTAMP_NOT_AVAILABLE + CREATE_TIME = TIMESTAMP_CREATE_TIME + LOG_APPEND_TIME = TIMESTAMP_LOG_APPEND_TIME diff --git a/karapace/kafka_rest_apis/__init__.py b/karapace/kafka_rest_apis/__init__.py index b9b7b28aa..d16fcd2ab 100644 --- a/karapace/kafka_rest_apis/__init__.py +++ b/karapace/kafka_rest_apis/__init__.py @@ -2,6 +2,7 @@ from aiokafka.errors import KafkaConnectionError from binascii import Error as B64DecodeError from collections import namedtuple +from confluent_kafka.error import KafkaException from contextlib import AsyncExitStack from http import HTTPStatus from kafka.errors import ( @@ -14,7 +15,7 @@ ) from karapace.config import Config, create_client_ssl_context from karapace.errors import InvalidSchema -from karapace.kafka.admin import KafkaAdminClient, KafkaException +from karapace.kafka.admin import KafkaAdminClient from karapace.kafka_rest_apis.authentication import ( get_auth_config_from_header, get_expiration_time_from_header, diff --git a/karapace/kafka_utils.py b/karapace/kafka_utils.py index c70cd530c..0e88026d7 100644 --- a/karapace/kafka_utils.py +++ b/karapace/kafka_utils.py @@ -3,9 +3,8 @@ See LICENSE for details """ from .config import Config -from .utils import KarapaceKafkaClient -from kafka import KafkaConsumer from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.consumer import KafkaConsumer from karapace.kafka.producer import KafkaProducer from typing import Iterator @@ -42,7 +41,6 @@ def kafka_consumer_from_config(config: Config, topic: str) -> Iterator[KafkaCons sasl_plain_password=config["sasl_plain_password"], auto_offset_reset="earliest", metadata_max_age_ms=config["metadata_max_age_ms"], - kafka_client=KarapaceKafkaClient, ) try: yield consumer diff --git a/karapace/master_coordinator.py b/karapace/master_coordinator.py index 45497daa6..9ff823290 100644 --- a/karapace/master_coordinator.py +++ b/karapace/master_coordinator.py @@ -5,12 +5,12 @@ See LICENSE for details """ from dataclasses import dataclass -from kafka import KafkaConsumer from kafka.coordinator.base import BaseCoordinator from kafka.errors import NoBrokersAvailable, NodeNotReadyError from kafka.metrics import MetricConfig, Metrics from karapace import constants from karapace.config import Config +from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS from karapace.typing import JsonData, JsonObject from karapace.utils import json_decode, json_encode, KarapaceKafkaClient from karapace.version import __version__ @@ -238,7 +238,7 @@ def init_schema_coordinator(self) -> None: election_strategy=self.config.get("master_election_strategy", "lowest"), group_id=self.config["group_id"], session_timeout_ms=session_timeout_ms, - request_timeout_ms=max(session_timeout_ms, KafkaConsumer.DEFAULT_CONFIG["request_timeout_ms"]), + request_timeout_ms=max(session_timeout_ms, DEFAULT_REQUEST_TIMEOUT_MS), ) self.schema_coordinator_ready.set() diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index b6870fd22..51d10cd4a 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -7,10 +7,10 @@ from __future__ import annotations from avro.schema import Schema as AvroSchema +from confluent_kafka import Message, TopicPartition from contextlib import closing, ExitStack from enum import Enum from jsonschema.validators import Draft7Validator -from kafka import KafkaConsumer, TopicPartition from kafka.errors import ( InvalidReplicationFactorError, KafkaConfigurationError, @@ -25,6 +25,7 @@ from karapace.errors import InvalidReferences, InvalidSchema from karapace.in_memory_database import InMemoryDatabase from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.consumer import KafkaConsumer from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode from karapace.master_coordinator import MasterCoordinator from karapace.offset_watcher import OffsetWatcher @@ -33,7 +34,7 @@ from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents from karapace.statsd import StatsClient from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject -from karapace.utils import json_decode, JSONDecodeError, KarapaceKafkaClient +from karapace.utils import json_decode, JSONDecodeError from threading import Event, Thread from typing import Final, Mapping, Sequence @@ -69,11 +70,9 @@ class MessageType(Enum): def _create_consumer_from_config(config: Config) -> KafkaConsumer: # Group not set on purpose, all consumers read the same data session_timeout_ms = config["session_timeout_ms"] - request_timeout_ms = max(session_timeout_ms, KafkaConsumer.DEFAULT_CONFIG["request_timeout_ms"]) return KafkaConsumer( config["topic_name"], enable_auto_commit=False, - api_version=(1, 0, 0), bootstrap_servers=config["bootstrap_uri"], client_id=config["client_id"], fetch_max_wait_ms=50, @@ -86,8 +85,6 @@ def _create_consumer_from_config(config: Config) -> KafkaConsumer: sasl_plain_password=config["sasl_plain_password"], auto_offset_reset="earliest", session_timeout_ms=session_timeout_ms, - request_timeout_ms=request_timeout_ms, - kafka_client=KarapaceKafkaClient, metadata_max_age_ms=config["metadata_max_age_ms"], ) @@ -117,7 +114,7 @@ def __init__( ) -> None: Thread.__init__(self, name="schema-reader") self.master_coordinator = master_coordinator - self.timeout_ms = 200 + self.timeout_s = 0.2 self.config = config self.database = database @@ -234,11 +231,13 @@ def _get_beginning_offset(self) -> int: assert self.consumer is not None, "Thread must be started" try: - offsets = self.consumer.beginning_offsets([TopicPartition(self.config["topic_name"], 0)]) - # Offset in the response is the offset for last offset. - # Reduce by one for matching on startup. - beginning_offset = list(offsets.values())[0] - 1 - return beginning_offset + beginning_offset, _ = self.consumer.get_watermark_offsets(TopicPartition(self.config["topic_name"], 0)) + # The `-1` decrement here is due to historical reasons (evolution of schema reader and offset watcher): + # * The first `OffsetWatcher` implementation neeeded this for flagging empty offsets + # * Then synchronization and locking was changed and this remained + # * See https://github.com/Aiven-Open/karapace/pull/364/files + # * See `OFFSET_EMPTY` and `OFFSET_UNINITIALIZED` + return beginning_offset - 1 except KafkaTimeoutError: LOG.exception("Reading begin offsets timed out.") except Exception as e: # pylint: disable=broad-except @@ -253,7 +252,7 @@ def _is_ready(self) -> bool: assert self.consumer is not None, "Thread must be started" try: - offsets = self.consumer.end_offsets([TopicPartition(self.config["topic_name"], 0)]) + _, end_offset = self.consumer.get_watermark_offsets(TopicPartition(self.config["topic_name"], 0)) except KafkaTimeoutError: LOG.exception("Reading end offsets timed out.") return False @@ -263,7 +262,7 @@ def _is_ready(self) -> bool: return False # Offset in the response is the offset for the next upcoming message. # Reduce by one for actual highest offset. - self._highest_offset = list(offsets.values())[0] - 1 + self._highest_offset = end_offset - 1 cur_time = time.monotonic() time_from_last_check = cur_time - self.last_check progress_pct = 0 if not self._highest_offset else round((self.offset / self._highest_offset) * 100, 2) @@ -281,7 +280,7 @@ def highest_offset(self) -> int: return max(self._highest_offset, self._offset_watcher.greatest_offset()) @staticmethod - def _parse_message_value(raw_value: str) -> JsonObject | None: + def _parse_message_value(raw_value: str | bytes) -> JsonObject | None: value = json_decode(raw_value) if isinstance(value, dict): return value @@ -292,7 +291,7 @@ def _parse_message_value(raw_value: str) -> JsonObject | None: def handle_messages(self) -> None: assert self.consumer is not None, "Thread must be started" - raw_msgs = self.consumer.poll(timeout_ms=self.timeout_ms) + msgs: list[Message] = self.consumer.consume(timeout=self.timeout_s) if self.ready is False: self.ready = self._is_ready() @@ -306,49 +305,52 @@ def handle_messages(self) -> None: if are_we_master is True: watch_offsets = True - for _, msgs in raw_msgs.items(): - schema_records_processed_keymode_canonical = 0 - schema_records_processed_keymode_deprecated_karapace = 0 - for msg in msgs: + schema_records_processed_keymode_canonical = 0 + schema_records_processed_keymode_deprecated_karapace = 0 + for msg in msgs: + try: + message_key = msg.key() + if message_key is None: + continue + key = json_decode(message_key) + except JSONDecodeError: + LOG.exception("Invalid JSON in msg.key() at offset %s", msg.offset()) + continue + + assert isinstance(key, dict) + msg_keymode = KeyMode.CANONICAL if is_key_in_canonical_format(key) else KeyMode.DEPRECATED_KARAPACE + # Key mode detection happens on startup. + # Default keymode is CANONICAL and preferred unless any data consumed + # has key in non-canonical format. If keymode is set to DEPRECATED_KARAPACE + # the subsequent keys are omitted from detection. + if not self.ready and self.key_formatter.get_keymode() == KeyMode.CANONICAL: + if msg_keymode == KeyMode.DEPRECATED_KARAPACE: + self.key_formatter.set_keymode(KeyMode.DEPRECATED_KARAPACE) + + value = None + message_value = msg.value() + if message_value: try: - key = json_decode(msg.key) + value = self._parse_message_value(message_value) except JSONDecodeError: - LOG.exception("Invalid JSON in msg.key") + LOG.exception("Invalid JSON in msg.value() at offset %s", msg.offset()) continue - assert isinstance(key, dict) - msg_keymode = KeyMode.CANONICAL if is_key_in_canonical_format(key) else KeyMode.DEPRECATED_KARAPACE - # Key mode detection happens on startup. - # Default keymode is CANONICAL and preferred unless any data consumed - # has key in non-canonical format. If keymode is set to DEPRECATED_KARAPACE - # the subsequent keys are omitted from detection. - if not self.ready and self.key_formatter.get_keymode() == KeyMode.CANONICAL: - if msg_keymode == KeyMode.DEPRECATED_KARAPACE: - self.key_formatter.set_keymode(KeyMode.DEPRECATED_KARAPACE) - - value = None - if msg.value: - try: - value = self._parse_message_value(msg.value) - except JSONDecodeError: - LOG.exception("Invalid JSON in msg.value") - continue - - self.handle_msg(key, value) - self.offset = msg.offset - - if msg_keymode == KeyMode.CANONICAL: - schema_records_processed_keymode_canonical += 1 - else: - schema_records_processed_keymode_deprecated_karapace += 1 - - if self.ready and watch_offsets: - self._offset_watcher.offset_seen(self.offset) - - self._report_schema_metrics( - schema_records_processed_keymode_canonical, - schema_records_processed_keymode_deprecated_karapace, - ) + self.handle_msg(key, value) + self.offset = msg.offset() + + if msg_keymode == KeyMode.CANONICAL: + schema_records_processed_keymode_canonical += 1 + else: + schema_records_processed_keymode_deprecated_karapace += 1 + + if self.ready and watch_offsets: + self._offset_watcher.offset_seen(self.offset) + + self._report_schema_metrics( + schema_records_processed_keymode_canonical, + schema_records_processed_keymode_deprecated_karapace, + ) def _report_schema_metrics( self, diff --git a/pytest.ini b/pytest.ini index 0f19813c2..8ed1116d2 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,4 +1,4 @@ [pytest] addopts = -ra --numprocesses auto --import-mode=importlib -timeout = 60 +timeout = 90 timeout_func_only = true diff --git a/stubs/confluent_kafka/__init__.pyi b/stubs/confluent_kafka/__init__.pyi index 5762cb52a..3d26b0393 100644 --- a/stubs/confluent_kafka/__init__.pyi +++ b/stubs/confluent_kafka/__init__.pyi @@ -1,4 +1,21 @@ from ._model import IsolationLevel -from .cimpl import Message, Producer, TopicPartition +from .cimpl import ( + Consumer, + Message, + Producer, + TIMESTAMP_CREATE_TIME, + TIMESTAMP_LOG_APPEND_TIME, + TIMESTAMP_NOT_AVAILABLE, + TopicPartition, +) -__all__ = ("IsolationLevel", "Message", "Producer", "TopicPartition") +__all__ = ( + "Consumer", + "IsolationLevel", + "Message", + "Producer", + "TIMESTAMP_CREATE_TIME", + "TIMESTAMP_LOG_APPEND_TIME", + "TIMESTAMP_NOT_AVAILABLE", + "TopicPartition", +) diff --git a/stubs/confluent_kafka/cimpl.pyi b/stubs/confluent_kafka/cimpl.pyi index ecf6d4420..48d67c155 100644 --- a/stubs/confluent_kafka/cimpl.pyi +++ b/stubs/confluent_kafka/cimpl.pyi @@ -1,5 +1,5 @@ from confluent_kafka.admin._metadata import ClusterMetadata -from typing import Any, Callable +from typing import Any, Callable, Final class KafkaError: _NOENT: int @@ -31,8 +31,13 @@ class TopicPartition: partition: int = -1, offset: int = -1001, metadata: str | None = None, - leader_epoc: int | None = None, - ) -> None: ... + leader_epoch: int | None = None, + ) -> None: + self.topic: str + self.partition: int + self.offset: int + self.metadata: str | None + self.leader_epoch: int | None class Message: def offset(self) -> int: ... @@ -41,6 +46,8 @@ class Message: def value(self) -> str | bytes | None: ... def topic(self) -> str: ... def partition(self) -> int: ... + def headers(self) -> list[tuple[str, bytes]] | None: ... + def error(self) -> KafkaError | None: ... class Producer: def produce( @@ -56,3 +63,18 @@ class Producer: def flush(self, timeout: float = -1) -> None: ... def list_topics(self, topic: str | None = None, timeout: float = -1) -> ClusterMetadata: ... def poll(self, timeout: float = -1) -> int: ... + +class Consumer: + def subscribe(self, topics: list[str]) -> None: ... + def get_watermark_offsets( + self, partition: TopicPartition, timeout: float | None = None, cached: bool = False + ) -> tuple[int, int] | None: ... + def close(self) -> None: ... + def list_topics(self, topic: str | None = None, timeout: float = -1) -> ClusterMetadata: ... + def consume(self, num_messages: int = 1, timeout: float = -1) -> list[Message]: ... + def poll(self, timeout: float = -1) -> Message | None: ... + def assign(self, partitions: list[TopicPartition]) -> None: ... + +TIMESTAMP_CREATE_TIME: Final = ... +TIMESTAMP_NOT_AVAILABLE: Final = ... +TIMESTAMP_LOG_APPEND_TIME: Final = ... diff --git a/tests/integration/backup/test_get_topic_configurations.py b/tests/integration/backup/test_get_topic_configurations.py index 16592aed3..688c21b67 100644 --- a/tests/integration/backup/test_get_topic_configurations.py +++ b/tests/integration/backup/test_get_topic_configurations.py @@ -4,8 +4,9 @@ """ from __future__ import annotations +from confluent_kafka.admin import NewTopic from karapace.backup.topic_configurations import ALL_CONFIG_SOURCES, ConfigSource, DEFAULT_CONFIGS, get_topic_configurations -from karapace.kafka.admin import KafkaAdminClient, NewTopic +from karapace.kafka.admin import KafkaAdminClient import pytest diff --git a/tests/integration/backup/test_legacy_backup.py b/tests/integration/backup/test_legacy_backup.py index 3d732bd58..9f7d4a77d 100644 --- a/tests/integration/backup/test_legacy_backup.py +++ b/tests/integration/backup/test_legacy_backup.py @@ -4,8 +4,7 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from datetime import timedelta -from kafka import KafkaConsumer +from kafka.errors import InvalidTopicError from karapace.backup import api from karapace.backup.api import BackupVersion from karapace.backup.errors import StaleConsumerError @@ -13,12 +12,14 @@ from karapace.client import Client from karapace.config import set_config_defaults from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.common import KafkaError +from karapace.kafka.consumer import KafkaConsumer from karapace.key_format import is_key_in_canonical_format from karapace.utils import Expiration from pathlib import Path from tests.integration.utils.cluster import RegistryDescription from tests.integration.utils.kafka_server import KafkaServers -from tests.utils import new_random_name +from tests.utils import new_random_name, StubMessage from unittest import mock import asyncio @@ -131,18 +132,16 @@ def _assert_canonical_key_format( schemas_topic, group_id="assert-canonical-key-format-consumer", enable_auto_commit=False, - api_version=(1, 0, 0), bootstrap_servers=bootstrap_servers, auto_offset_reset="earliest", ) - raw_msgs = consumer.poll(timeout_ms=2000) - while raw_msgs: - for _, messages in raw_msgs.items(): - for message in messages: - key = json.loads(message.key) - assert is_key_in_canonical_format(key), f"Not in canonical format: {key}" - raw_msgs = consumer.poll() + messages = consumer.consume(timeout=30) + while messages: + for message in messages: + key = json.loads(message.key()) + assert is_key_in_canonical_format(key), f"Not in canonical format: {key}" + messages = consumer.consume(timeout=30) consumer.close() @@ -174,7 +173,7 @@ async def test_backup_restore( # The restored karapace should have the previously created subject all_subjects = [] - expiration = Expiration.from_timeout(timeout=10) + expiration = Expiration.from_timeout(timeout=30) while subject not in all_subjects: expiration.raise_timeout_if_expired( msg_format="{subject} not in {all_subjects}", @@ -184,7 +183,7 @@ async def test_backup_restore( res = await registry_async_client.get("subjects") assert res.status_code == 200 all_subjects = res.json() - time.sleep(0.1) + time.sleep(1) # Test a few exotic scenarios @@ -260,13 +259,35 @@ async def test_stale_consumer( # The proper way to test this would be with quotas by throttling our client to death while using a very short # poll timeout. However, we have no way to set up quotas because all Kafka clients available to us do not # implement the necessary APIs. - with mock.patch(f"{KafkaConsumer.__module__}.{KafkaConsumer.__qualname__}._poll_once") as poll_once_mock: - poll_once_mock.return_value = {} + with mock.patch(f"{KafkaConsumer.__module__}.{KafkaConsumer.__qualname__}.poll") as poll_mock: + poll_mock.return_value = None api.create_backup( config=config, backup_location=tmp_path / "backup", topic_name=api.normalize_topic_name(None, config), version=BackupVersion.V2, - poll_timeout=PollTimeout(timedelta(seconds=1)), + poll_timeout=PollTimeout.of(seconds=1), ) assert str(e.value) == f"{registry_cluster.schemas_topic}:0#0 (0,0) after PT1S" + + +async def test_message_error( + kafka_servers: KafkaServers, + registry_async_client: Client, + registry_cluster: RegistryDescription, + tmp_path: Path, +) -> None: + await insert_data(registry_async_client) + config = set_config_defaults( + {"bootstrap_uri": kafka_servers.bootstrap_servers, "topic_name": registry_cluster.schemas_topic} + ) + with pytest.raises(InvalidTopicError): + with mock.patch(f"{KafkaConsumer.__module__}.{KafkaConsumer.__qualname__}.poll") as poll_mock: + poll_mock.return_value = StubMessage(error=KafkaError(KafkaError.TOPIC_EXCEPTION)) + api.create_backup( + config=config, + backup_location=tmp_path / "backup", + topic_name=api.normalize_topic_name(None, config), + version=BackupVersion.V2, + poll_timeout=PollTimeout.of(seconds=1), + ) diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index be336afcf..054be6c7a 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -4,9 +4,9 @@ """ from __future__ import annotations +from confluent_kafka import Message, TopicPartition +from confluent_kafka.admin import NewTopic from dataclasses import fields -from kafka import TopicPartition -from kafka.consumer.fetcher import ConsumerRecord from kafka.errors import UnknownTopicOrPartitionError from karapace.backup import api from karapace.backup.api import _consume_records, TopicName @@ -16,8 +16,9 @@ from karapace.backup.poll_timeout import PollTimeout from karapace.backup.topic_configurations import ConfigSource, get_topic_configurations from karapace.config import Config, set_config_defaults -from karapace.kafka.admin import KafkaAdminClient, NewTopic +from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.producer import KafkaProducer +from karapace.kafka.types import Timestamp from karapace.kafka_utils import kafka_consumer_from_config, kafka_producer_from_config from karapace.version import __version__ from pathlib import Path @@ -182,28 +183,28 @@ def test_roundtrip_from_kafka_state( ) # First record. - assert isinstance(first_record, ConsumerRecord) - assert first_record.topic == new_topic.topic - assert first_record.partition == partition + assert isinstance(first_record, Message) + assert first_record.topic() == new_topic.topic + assert first_record.partition() == partition # Note: This might be unreliable due to not using idempotent producer, i.e. we have # no guarantee against duplicates currently. - assert first_record.offset == 0 - assert first_record.timestamp == 1683474641 - assert first_record.timestamp_type == 0 - assert first_record.key == b"bar" - assert first_record.value == b"foo" - assert first_record.headers == [] + assert first_record.offset() == 0 + assert first_record.timestamp()[1] == 1683474641 + assert first_record.timestamp()[0] == Timestamp.CREATE_TIME + assert first_record.key() == b"bar" + assert first_record.value() == b"foo" + assert first_record.headers() is None # Second record. - assert isinstance(second_record, ConsumerRecord) - assert second_record.topic == new_topic.topic - assert second_record.partition == partition - assert second_record.offset == 1 - assert second_record.timestamp == 1683474657 - assert second_record.timestamp_type == 0 - assert second_record.key == b"foo" - assert second_record.value == b"bar" - assert second_record.headers == [ + assert isinstance(second_record, Message) + assert second_record.topic() == new_topic.topic + assert second_record.partition() == partition + assert second_record.offset() == 1 + assert second_record.timestamp()[1] == 1683474657 + assert second_record.timestamp()[0] == Timestamp.CREATE_TIME + assert second_record.key() == b"foo" + assert second_record.value() == b"bar" + assert second_record.headers() == [ ("some-header", b"some header value"), ("other-header", b"some other header value"), ] diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index d9ab70d40..466e5b5c9 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -7,12 +7,13 @@ from _pytest.fixtures import SubRequest from aiohttp.pytest_plugin import AiohttpClient from aiohttp.test_utils import TestClient +from confluent_kafka.admin import NewTopic from contextlib import ExitStack from dataclasses import asdict from filelock import FileLock from karapace.client import Client from karapace.config import Config, set_config_defaults, write_config -from karapace.kafka.admin import KafkaAdminClient, NewTopic +from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.producer import KafkaProducer from karapace.kafka_rest_apis import KafkaRest from pathlib import Path diff --git a/tests/integration/kafka/test_admin.py b/tests/integration/kafka/test_admin.py index d6d586a6c..84d8a3e7f 100644 --- a/tests/integration/kafka/test_admin.py +++ b/tests/integration/kafka/test_admin.py @@ -5,8 +5,9 @@ from __future__ import annotations +from confluent_kafka.admin import ConfigSource, NewTopic from kafka.errors import InvalidReplicationFactorError, TopicAlreadyExistsError, UnknownTopicOrPartitionError -from karapace.kafka.admin import ConfigSource, KafkaAdminClient, NewTopic +from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.producer import KafkaProducer from tests.utils import new_topic as create_new_topic diff --git a/tests/integration/kafka/test_consumer.py b/tests/integration/kafka/test_consumer.py new file mode 100644 index 000000000..21c547415 --- /dev/null +++ b/tests/integration/kafka/test_consumer.py @@ -0,0 +1,79 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from __future__ import annotations + +from confluent_kafka import TopicPartition +from confluent_kafka.admin import NewTopic +from kafka.errors import UnknownTopicOrPartitionError +from karapace.kafka.consumer import KafkaConsumer +from karapace.kafka.producer import KafkaProducer +from tests.integration.utils.kafka_server import KafkaServers + +import pytest + + +class TestPartitionsForTopic: + def test_partitions_for_returns_empty_for_unknown_topic(self, kafka_servers: KafkaServers) -> None: + consumer = KafkaConsumer( + bootstrap_servers=kafka_servers.bootstrap_servers, + topic="nonexistent", + ) + + assert consumer.partitions_for_topic("nonexistent") == {} + + def test_partitions_for(self, kafka_servers: KafkaServers, new_topic: NewTopic) -> None: + consumer = KafkaConsumer( + bootstrap_servers=kafka_servers.bootstrap_servers, + topic=new_topic.topic, + ) + + partitions = consumer.partitions_for_topic(new_topic.topic) + + assert len(partitions) == 1 + assert partitions[0].id == 0 + assert partitions[0].replicas == [1] + assert partitions[0].isrs == [1] + + +class TestGetWatermarkOffsets: + def test_get_watermark_offsets_unkown_topic(self, kafka_servers: KafkaServers) -> None: + consumer = KafkaConsumer( + bootstrap_servers=kafka_servers.bootstrap_servers, + topic="nonexistent", + ) + + with pytest.raises(UnknownTopicOrPartitionError): + _, _ = consumer.get_watermark_offsets(TopicPartition("nonexistent", 0)) + + def test_get_watermark_offsets_empty_topic(self, kafka_servers: KafkaServers, new_topic: NewTopic) -> None: + consumer = KafkaConsumer( + bootstrap_servers=kafka_servers.bootstrap_servers, + topic=new_topic.topic, + auto_offset_reset="earliest", + ) + + beginning, end = consumer.get_watermark_offsets(TopicPartition(new_topic.topic, 0)) + + assert beginning == 0 + assert end == 0 + + def test_get_watermark_offsets_topic_with_one_message( + self, + kafka_servers: KafkaServers, + producer: KafkaProducer, + new_topic: NewTopic, + ) -> None: + consumer = KafkaConsumer( + bootstrap_servers=kafka_servers.bootstrap_servers, + topic=new_topic.topic, + auto_offset_reset="earliest", + ) + producer.send(new_topic.topic) + producer.flush() + + beginning, end = consumer.get_watermark_offsets(TopicPartition(new_topic.topic, 0)) + + assert beginning == 0 + assert end == 1 diff --git a/tests/integration/kafka/test_producer.py b/tests/integration/kafka/test_producer.py index bd31e83f8..d82c6cce5 100644 --- a/tests/integration/kafka/test_producer.py +++ b/tests/integration/kafka/test_producer.py @@ -5,9 +5,10 @@ from __future__ import annotations +from confluent_kafka.admin import NewTopic from kafka.errors import MessageSizeTooLargeError, UnknownTopicOrPartitionError -from karapace.kafka.admin import NewTopic from karapace.kafka.producer import KafkaProducer +from karapace.kafka.types import Timestamp import pytest import time @@ -37,6 +38,7 @@ def test_send(self, producer: KafkaProducer, new_topic: NewTopic) -> None: assert message.topic() == new_topic.topic assert message.key() == key assert message.value() == value + assert message.timestamp()[0] == Timestamp.CREATE_TIME assert message.timestamp()[1] == timestamp def test_send_raises_for_unknown_topic(self, producer: KafkaProducer) -> None: diff --git a/tests/unit/backup/backends/test_v2.py b/tests/unit/backup/backends/test_v2.py index b38f83469..b2ad273b2 100644 --- a/tests/unit/backup/backends/test_v2.py +++ b/tests/unit/backup/backends/test_v2.py @@ -5,12 +5,13 @@ from __future__ import annotations from functools import partial -from kafka.consumer.fetcher import ConsumerRecord from karapace.backup.backends.reader import ProducerSend, RestoreTopicLegacy from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer from karapace.backup.encoders import encode_key, encode_value +from karapace.kafka.types import Timestamp from karapace.key_format import KeyFormatter from pathlib import Path +from tests.utils import StubMessage import datetime import json @@ -29,7 +30,7 @@ def test_schema_backup_v2_roundtrip(tmp_path: Path) -> None: topic_name = "a-topic" partition_index = 123 records = ( - ConsumerRecord( + StubMessage( key=json.dumps( { "keytype": "SCHEMA", @@ -50,15 +51,10 @@ def test_schema_backup_v2_roundtrip(tmp_path: Path) -> None: topic=topic_name, partition=partition_index, offset=0, - timestamp=round(time.time()), - timestamp_type=None, - headers=(), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(Timestamp.CREATE_TIME, round(time.time())), + headers=None, ), - ConsumerRecord( + StubMessage( key=json.dumps( { "keytype": "SCHEMA", @@ -79,13 +75,8 @@ def test_schema_backup_v2_roundtrip(tmp_path: Path) -> None: topic=topic_name, partition=partition_index, offset=0, - timestamp=round(time.time()), - timestamp_type=None, - headers=(), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(Timestamp.CREATE_TIME, round(time.time())), + headers=None, ), ) @@ -166,7 +157,7 @@ def test_anonymize_avro_roundtrip(tmp_path: Path) -> None: topic_name = "a-topic" partition_index = 123 records = ( - ConsumerRecord( + StubMessage( key=json.dumps( { "keytype": "SCHEMA", @@ -194,15 +185,10 @@ def test_anonymize_avro_roundtrip(tmp_path: Path) -> None: topic=topic_name, partition=partition_index, offset=0, - timestamp=round(time.time()), - timestamp_type=None, - headers=(), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(Timestamp.CREATE_TIME, round(time.time())), + headers=None, ), - ConsumerRecord( + StubMessage( key=json.dumps( { "keytype": "SCHEMA", @@ -230,13 +216,8 @@ def test_anonymize_avro_roundtrip(tmp_path: Path) -> None: topic=topic_name, partition=partition_index, offset=0, - timestamp=round(time.time()), - timestamp_type=None, - headers=(), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(Timestamp.CREATE_TIME, round(time.time())), + headers=None, ), ) diff --git a/tests/unit/backup/backends/v3/test_backend.py b/tests/unit/backup/backends/v3/test_backend.py index 4f8562311..7ea6811f7 100644 --- a/tests/unit/backup/backends/v3/test_backend.py +++ b/tests/unit/backup/backends/v3/test_backend.py @@ -3,7 +3,6 @@ See LICENSE for details """ from dataclasses import replace -from kafka.consumer.fetcher import ConsumerRecord from karapace.backup.backends.reader import ProducerSend, RestoreTopic from karapace.backup.backends.v3.backend import _PartitionStats, SchemaBackupV3Reader, SchemaBackupV3Writer from karapace.backup.backends.v3.errors import ( @@ -16,7 +15,9 @@ ) from karapace.backup.backends.v3.readers import read_records from karapace.backup.backends.v3.schema import ChecksumAlgorithm, DataFile +from karapace.kafka.types import Timestamp from pathlib import Path +from tests.utils import StubMessage from unittest import mock import datetime @@ -33,33 +34,23 @@ def test_writer_reader_roundtrip(tmp_path: Path) -> None: finished_at = datetime.datetime.now(datetime.timezone.utc) records = ( - ConsumerRecord( + StubMessage( key=b"foo", value=b"bar", topic=topic_name, partition=partition_index, offset=10, - timestamp=round(time.time()), - timestamp_type=None, - headers=(), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(Timestamp.CREATE_TIME, round(time.time())), + headers=None, ), - ConsumerRecord( + StubMessage( key=b"foo", value=b"bar", topic=topic_name, partition=partition_index, offset=14, - timestamp=round(time.time()), - timestamp_type=None, - headers=(("some-key", b"some-value"),), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(Timestamp.CREATE_TIME, round(time.time())), + headers=[("some-key", b"some-value")], ), ) topic_configurations = {"max.message.bytes": "1024"} @@ -124,36 +115,31 @@ def test_writer_reader_roundtrip(tmp_path: Path) -> None: ProducerSend( topic_name=topic_name, partition_index=partition_index, - key=records[0].key, - value=records[0].value, + key=records[0].key(), + value=records[0].value(), headers=(), - timestamp=records[0].timestamp, + timestamp=records[0].timestamp()[1], ), ProducerSend( topic_name=topic_name, partition_index=partition_index, - key=records[1].key, - value=records[1].value, + key=records[1].key(), + value=records[1].value(), headers=((b"some-key", b"some-value"),), - timestamp=records[0].timestamp, + timestamp=records[0].timestamp()[1], ), ) -def make_record(topic_name: str, partition_index: int, offset: int) -> ConsumerRecord: - return ConsumerRecord( +def make_record(topic_name: str, partition_index: int, offset: int) -> StubMessage: + return StubMessage( key=b"foo", value=b"bar", topic=topic_name, partition=partition_index, offset=offset, - timestamp=round(time.time()), - timestamp_type=None, - headers=(("some-key", b"some-value"),), - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - serialized_header_size=None, + timestamp=(Timestamp.CREATE_TIME, round(time.time())), + headers=[("some-key", b"some-value")], ) diff --git a/tests/unit/backup/test_api.py b/tests/unit/backup/test_api.py index c112d5ffc..820287cc3 100644 --- a/tests/unit/backup/test_api.py +++ b/tests/unit/backup/test_api.py @@ -4,9 +4,7 @@ """ from __future__ import annotations -from kafka import KafkaConsumer from kafka.errors import KafkaError, TopicAlreadyExistsError -from kafka.structs import PartitionMetadata from karapace import config from karapace.backup.api import ( _admin, @@ -24,6 +22,7 @@ from karapace.backup.errors import BackupError, PartitionCountError from karapace.config import Config from karapace.constants import DEFAULT_SCHEMA_TOPIC +from karapace.kafka.consumer import KafkaConsumer, PartitionMetadata from karapace.kafka.producer import KafkaProducer from pathlib import Path from types import FunctionType @@ -148,7 +147,16 @@ def test_skip_topic_creation( class TestClients: @staticmethod def _partition_metadata(c: int = 1) -> set[PartitionMetadata]: - return {PartitionMetadata("topic", i, 0, tuple(), tuple(), None) for i in range(0, c)} + def create(partition) -> PartitionMetadata: + metadata = PartitionMetadata() + metadata.id = partition + metadata.leader = 1 + metadata.replicas = () + metadata.isrs = () + + return metadata + + return {create(i) for i in range(c)} @pytest.mark.parametrize( "ctx_mng,client_class,partitions_method,close_method_name", diff --git a/tests/unit/backup/test_poll_timeout.py b/tests/unit/backup/test_poll_timeout.py index 9a0614038..ecd9bfce4 100644 --- a/tests/unit/backup/test_poll_timeout.py +++ b/tests/unit/backup/test_poll_timeout.py @@ -37,3 +37,6 @@ def test__repr__(self) -> None: def test_milliseconds(self) -> None: assert PollTimeout(timedelta(milliseconds=1000.5)).milliseconds == 1000 + + def test_seconds(self) -> None: + assert PollTimeout(timedelta(milliseconds=1500)).seconds == 1.5 diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index b6566e927..12a404ac5 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -137,9 +137,9 @@ class ReadinessTestCase(BaseTestCase): def test_readiness_check(testcase: ReadinessTestCase) -> None: key_formatter_mock = Mock() consumer_mock = Mock() - consumer_mock.poll.return_value = {} - # Return dict {partition: offsets}, end offset is the next upcoming record offset - consumer_mock.end_offsets.return_value = {0: testcase.end_offset} + consumer_mock.consume.return_value = [] + # Return tuple (beginning, end), end offset is the next upcoming record offset + consumer_mock.get_watermark_offsets.return_value = (0, testcase.end_offset) offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( diff --git a/tests/utils.py b/tests/utils.py index 12e544408..bf17d9c8b 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -10,7 +10,7 @@ from karapace.utils import Expiration from pathlib import Path from subprocess import Popen -from typing import Callable, IO, List, Union +from typing import Any, Callable, IO, List, Union from urllib.parse import quote import asyncio @@ -307,3 +307,20 @@ def popen_karapace_all(config_path: Union[Path, str], stdout: IO, stderr: IO, ** kwargs["stdout"] = stdout kwargs["stderr"] = stderr return Popen([python_exe(), "-m", "karapace.karapace_all", str(config_path)], **kwargs) + + +class StubMessage: + """A stub to stand-in for `confluent_kafka.Message` in unittests. + + Since that class cannot be instantiated, thus this is a liberal simulation + of its behaviour ie. its attributes are accessible via getter functions: + `message.offset()`.""" + + def __init__(self, **attrs: Any) -> None: + self._attrs = attrs + + def __getattr__(self, key: str) -> None: + try: + return lambda: self._attrs[key] + except KeyError as exc: + raise AttributeError from exc