Skip to content

Commit

Permalink
Replace sync Kafka consumers with confluent_kafka one
Browse files Browse the repository at this point in the history
This change replaces all synchronous Kafka consumers (from the
kafka-python library) with a new implementation based on
confluent-kafka-python's `Consumer`, keeping the same interface as much
as possible.

The PyTest timeout is raised from 60s to 90s to accomodate for the
default poll timeout for backups consumers (otherwise the tests would
time out while still waiting for messages to arrive).o

Since the `conluent_kafka.Consumer` implementation does not allow for
consumers to be without a group ID, if the new `KafkaConsumer` client is
not given one, we'll generate one on the fly to mimic a groupless
behaviour.

Resources:
* confluent-kafka-python documentation:
  https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#
* librdkafka configuration documentation:
  https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
  • Loading branch information
Mátyás Kuti committed Dec 13, 2023
1 parent da85083 commit 640a9d0
Show file tree
Hide file tree
Showing 29 changed files with 459 additions and 208 deletions.
27 changes: 15 additions & 12 deletions karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,26 @@
from .poll_timeout import PollTimeout
from .topic_configurations import ConfigSource, get_topic_configurations
from concurrent.futures import Future
from confluent_kafka import Message, TopicPartition
from enum import Enum
from functools import partial
from kafka import KafkaConsumer
from kafka.consumer.fetcher import ConsumerRecord
from kafka.errors import KafkaError, TopicAlreadyExistsError
from kafka.structs import TopicPartition
from karapace import constants
from karapace.backup.backends.v1 import SchemaBackupV1Reader
from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer, V2_MARKER
from karapace.backup.backends.v3.backend import SchemaBackupV3Reader, SchemaBackupV3Writer, VerifyFailure, VerifySuccess
from karapace.config import Config
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.common import translate_from_kafkaerror
from karapace.kafka.consumer import KafkaConsumer
from karapace.kafka.producer import KafkaProducer
from karapace.kafka_utils import kafka_admin_from_config, kafka_consumer_from_config, kafka_producer_from_config
from karapace.key_format import KeyFormatter
from karapace.utils import assert_never
from pathlib import Path
from rich.console import Console
from tenacity import retry, retry_if_exception_type, RetryCallState, stop_after_delay, wait_fixed
from typing import Callable, Collection, Iterator, Literal, Mapping, NewType, Sized, TypeVar
from typing import Callable, Iterator, Literal, Mapping, NewType, Sized, TypeVar

import contextlib
import datetime
Expand Down Expand Up @@ -282,9 +282,8 @@ def _consume_records(
consumer: KafkaConsumer,
topic_partition: TopicPartition,
poll_timeout: PollTimeout,
) -> Iterator[ConsumerRecord]:
start_offset: int = consumer.beginning_offsets([topic_partition])[topic_partition]
end_offset: int = consumer.end_offsets([topic_partition])[topic_partition]
) -> Iterator[Message]:
start_offset, end_offset = consumer.get_watermark_offsets(topic_partition)
last_offset = start_offset

LOG.info(
Expand All @@ -301,12 +300,15 @@ def _consume_records(
end_offset -= 1 # high watermark to actual end offset

while True:
records: Collection[ConsumerRecord] = consumer.poll(poll_timeout.milliseconds).get(topic_partition, [])
if len(records) == 0:
record: Message | None = consumer.poll(timeout=poll_timeout.seconds)
if record is None:
raise StaleConsumerError(topic_partition, start_offset, end_offset, last_offset, poll_timeout)
for record in records:
yield record
last_offset = record.offset # pylint: disable=undefined-loop-variable
error = record.error()
if error is not None:
raise translate_from_kafkaerror(error)

yield record
last_offset = record.offset()
if last_offset >= end_offset:
break

Expand Down Expand Up @@ -528,6 +530,7 @@ def create_backup(
with _consumer(config, topic_name) as consumer:
(partition,) = consumer.partitions_for_topic(topic_name)
topic_partition = TopicPartition(topic_name, partition)
consumer.assign([topic_partition])

try:
data_file = _write_partition(
Expand Down
22 changes: 13 additions & 9 deletions karapace/backup/backends/v3/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from .readers import read_metadata, read_records
from .schema import ChecksumAlgorithm, DataFile, Header, Metadata, Record
from .writers import write_metadata, write_record
from confluent_kafka import Message
from dataclasses import dataclass
from kafka.consumer.fetcher import ConsumerRecord
from karapace.backup.backends.reader import BaseBackupReader, Instruction, ProducerSend, RestoreTopic
from karapace.backup.backends.writer import BytesBackupWriter, StdOut
from karapace.backup.safe_writer import bytes_writer, staging_directory
Expand Down Expand Up @@ -334,27 +334,31 @@ def store_metadata(
def store_record(
self,
buffer: IO[bytes],
record: ConsumerRecord,
record: Message,
) -> None:
stats: Final = self._partition_stats[record.partition]
stats: Final = self._partition_stats[record.partition()]
checksum_checkpoint: Final = stats.get_checkpoint(
records_threshold=self._max_records_per_checkpoint,
bytes_threshold=self._max_bytes_per_checkpoint,
)
offset_start: Final = buffer.tell()

record_key = record.key()
record_value = record.value()

write_record(
buffer,
record=Record(
key=record.key,
value=record.value,
headers=tuple(Header(key=key.encode(), value=value) for key, value in record.headers),
offset=record.offset,
timestamp=record.timestamp,
key=record_key.encode() if isinstance(record_key, str) else record_key,
value=record_value.encode() if isinstance(record_value, str) else record_value,
headers=tuple(Header(key=key.encode(), value=value) for key, value in record.headers() or []),
offset=record.offset(),
timestamp=record.timestamp()[1],
checksum_checkpoint=checksum_checkpoint,
),
running_checksum=stats.running_checksum,
)
stats.update(
bytes_offset=buffer.tell() - offset_start,
record_offset=record.offset,
record_offset=record.offset(),
)
15 changes: 11 additions & 4 deletions karapace/backup/backends/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
from __future__ import annotations

from kafka.consumer.fetcher import ConsumerRecord
from confluent_kafka import Message
from karapace.backup.safe_writer import bytes_writer, str_writer
from pathlib import Path
from typing import ContextManager, Generic, IO, Iterator, Literal, Mapping, Sequence, TypeVar
Expand Down Expand Up @@ -98,7 +98,7 @@ def store_metadata(
def store_record(
self,
buffer: IO[B],
record: ConsumerRecord,
record: Message,
) -> None:
"""
Called in order for each record read from a topic to be backed up. It's safe to
Expand Down Expand Up @@ -154,9 +154,16 @@ class BaseKVBackupWriter(StrBackupWriter, abc.ABC):
def store_record(
self,
buffer: IO[str],
record: ConsumerRecord,
record: Message,
) -> None:
buffer.write(self.serialize_record(record.key, record.value))
record_key = record.key()
record_value = record.value()
buffer.write(
self.serialize_record(
record_key.encode() if isinstance(record_key, str) else record_key,
record_value.encode() if isinstance(record_value, str) else record_value,
)
)

@staticmethod
@abc.abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion karapace/backup/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from kafka.structs import TopicPartition
from confluent_kafka import TopicPartition
from karapace.backup.poll_timeout import PollTimeout

__all__ = ["BackupError", "BackupTopicAlreadyExists", "EmptyPartition", "PartitionCountError", "StaleConsumerError"]
Expand Down
5 changes: 5 additions & 0 deletions karapace/backup/poll_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,8 @@ def __repr__(self) -> str:
def milliseconds(self) -> int:
"""Returns this poll timeout in milliseconds, anything smaller than a milliseconds is ignored (no rounding)."""
return self.__value // timedelta(milliseconds=1)

@cached_property
def seconds(self) -> float:
"""Returns this poll timeout in seconds."""
return self.__value / timedelta(seconds=1)
3 changes: 2 additions & 1 deletion karapace/backup/topic_configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"""
from __future__ import annotations

from karapace.kafka.admin import ConfigSource, KafkaAdminClient
from confluent_kafka.admin import ConfigSource
from karapace.kafka.admin import KafkaAdminClient
from typing import Container, Final

ALL_CONFIG_SOURCES: Final = ConfigSource
Expand Down
14 changes: 13 additions & 1 deletion karapace/kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from concurrent.futures import Future
from confluent_kafka.error import KafkaError, KafkaException
from kafka.errors import AuthenticationFailedError, for_code, NoBrokersAvailable, UnknownTopicOrPartitionError
from typing import Any, Callable, NoReturn, Protocol, TypedDict, TypeVar
from typing import Any, Callable, Literal, NoReturn, Protocol, TypedDict, TypeVar
from typing_extensions import Unpack

import logging
Expand Down Expand Up @@ -85,6 +85,12 @@ class KafkaClientParams(TypedDict, total=False):
ssl_certfile: str | None
ssl_keyfile: str | None
sasl_oauth_token_provider: TokenWithExpiryProvider
# Consumer-only
auto_offset_reset: Literal["smallest", "earliest", "beginning", "largest", "latest", "end", "error"]
enable_auto_commit: bool
fetch_max_wait_ms: int
group_id: str
session_timeout_ms: int


class _KafkaConfigMixin:
Expand Down Expand Up @@ -128,6 +134,12 @@ def _get_config_from_params(self, bootstrap_servers: Iterable[str] | str, **para
"ssl.certificate.location": params.get("ssl_certfile"),
"ssl.key.location": params.get("ssl_keyfile"),
"error_cb": self._error_callback,
# Consumer-only
"auto.offset.reset": params.get("auto_offset_reset"),
"enable.auto.commit": params.get("enable_auto_commit"),
"fetch.wait.max.ms": params.get("fetch_max_wait_ms"),
"group.id": params.get("group_id"),
"session.timeout.ms": params.get("session_timeout_ms"),
}
config = {key: value for key, value in config.items() if value is not None}

Expand Down
68 changes: 68 additions & 0 deletions karapace/kafka/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""

from __future__ import annotations

from confluent_kafka import Consumer, TopicPartition
from confluent_kafka.admin import PartitionMetadata
from confluent_kafka.error import KafkaException
from kafka.errors import KafkaTimeoutError
from karapace.kafka.common import _KafkaConfigMixin, KafkaClientParams, raise_from_kafkaexception
from typing import Iterable
from typing_extensions import Unpack

import secrets


class KafkaConsumer(_KafkaConfigMixin, Consumer):
def __init__(
self,
topic: str,
bootstrap_servers: Iterable[str] | str,
verify_connection: bool = True,
**params: Unpack[KafkaClientParams],
) -> None:
# The `confluent_kafka.Consumer` does not allow for a missing group id
# if the client of this class does not provide one, we'll generate a
# unique group id to achieve the groupless behaviour
if "group_id" not in params:
params["group_id"] = self._create_group_id()

super().__init__(bootstrap_servers, verify_connection, **params)

self.subscribe([topic])

@staticmethod
def _create_group_id() -> str:
return f"karapace-autogenerated-{secrets.token_hex(6)}"

def partitions_for_topic(self, topic: str) -> dict[int, PartitionMetadata]:
"""Returns all partition metadata for the given topic."""
try:
return self.list_topics(topic).topics[topic].partitions
except KafkaException as exc:
raise_from_kafkaexception(exc)

def get_watermark_offsets(
self, partition: TopicPartition, timeout: float | None = None, cached: bool = False
) -> tuple[int, int]:
"""Wrapper around `Consumer.get_watermark_offsets` to handle error cases and exceptions.
confluent-kafka is somewhat inconsistent with error-related behaviours,
`get_watermark_offsets` returns `None` on timeouts, so we are translating it to an
exception.
"""
try:
if timeout is not None:
result = super().get_watermark_offsets(partition, timeout, cached)
else:
result = super().get_watermark_offsets(partition, cached=cached)

if result is None:
raise KafkaTimeoutError()

return result
except KafkaException as exc:
raise_from_kafkaexception(exc)
19 changes: 19 additions & 0 deletions karapace/kafka/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""

from confluent_kafka import TIMESTAMP_CREATE_TIME, TIMESTAMP_LOG_APPEND_TIME, TIMESTAMP_NOT_AVAILABLE
from typing import Final

import enum

# A constant that corresponds to the default value of request.timeout.ms in
# the librdkafka C library
DEFAULT_REQUEST_TIMEOUT_MS: Final = 30000


class Timestamp(enum.IntEnum):
NOT_AVAILABLE = TIMESTAMP_NOT_AVAILABLE
CREATE_TIME = TIMESTAMP_CREATE_TIME
LOG_APPEND_TIME = TIMESTAMP_LOG_APPEND_TIME
3 changes: 2 additions & 1 deletion karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from aiokafka.errors import KafkaConnectionError
from binascii import Error as B64DecodeError
from collections import namedtuple
from confluent_kafka.error import KafkaException
from contextlib import AsyncExitStack
from http import HTTPStatus
from kafka.errors import (
Expand All @@ -14,7 +15,7 @@
)
from karapace.config import Config, create_client_ssl_context
from karapace.errors import InvalidSchema
from karapace.kafka.admin import KafkaAdminClient, KafkaException
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka_rest_apis.authentication import (
get_auth_config_from_header,
get_expiration_time_from_header,
Expand Down
4 changes: 1 addition & 3 deletions karapace/kafka_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
See LICENSE for details
"""
from .config import Config
from .utils import KarapaceKafkaClient
from kafka import KafkaConsumer
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.consumer import KafkaConsumer
from karapace.kafka.producer import KafkaProducer
from typing import Iterator

Expand Down Expand Up @@ -42,7 +41,6 @@ def kafka_consumer_from_config(config: Config, topic: str) -> Iterator[KafkaCons
sasl_plain_password=config["sasl_plain_password"],
auto_offset_reset="earliest",
metadata_max_age_ms=config["metadata_max_age_ms"],
kafka_client=KarapaceKafkaClient,
)
try:
yield consumer
Expand Down
4 changes: 2 additions & 2 deletions karapace/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
See LICENSE for details
"""
from dataclasses import dataclass
from kafka import KafkaConsumer
from kafka.coordinator.base import BaseCoordinator
from kafka.errors import NoBrokersAvailable, NodeNotReadyError
from kafka.metrics import MetricConfig, Metrics
from karapace import constants
from karapace.config import Config
from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS
from karapace.typing import JsonData, JsonObject
from karapace.utils import json_decode, json_encode, KarapaceKafkaClient
from karapace.version import __version__
Expand Down Expand Up @@ -238,7 +238,7 @@ def init_schema_coordinator(self) -> None:
election_strategy=self.config.get("master_election_strategy", "lowest"),
group_id=self.config["group_id"],
session_timeout_ms=session_timeout_ms,
request_timeout_ms=max(session_timeout_ms, KafkaConsumer.DEFAULT_CONFIG["request_timeout_ms"]),
request_timeout_ms=max(session_timeout_ms, DEFAULT_REQUEST_TIMEOUT_MS),
)
self.schema_coordinator_ready.set()

Expand Down
Loading

0 comments on commit 640a9d0

Please sign in to comment.