From c9f1492b5c3d3f5deb2884b1abfe83e3035f1d30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ty=C3=A1s=20Kuti?= Date: Wed, 6 Dec 2023 14:47:19 +0100 Subject: [PATCH] Do not skip None keyed headers on restore Previously in https://github.com/Aiven-Open/karapace/pull/765 skipping None header keys was added. While now it's not causing any problems, in the future it could be an issue if we suppress/skip error-cases instead of explicit failure. --- karapace/backup/api.py | 2 +- karapace/kafka/producer.py | 2 +- stubs/confluent_kafka/cimpl.pyi | 2 +- tests/integration/kafka/test_producer.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/karapace/backup/api.py b/karapace/backup/api.py index 04cc8f989..4bcc72131 100644 --- a/karapace/backup/api.py +++ b/karapace/backup/api.py @@ -390,7 +390,7 @@ def _handle_producer_send( key=instruction.key, value=instruction.value, partition=instruction.partition_index, - headers=[(key.decode(), value) for key, value in instruction.headers if key is not None], + headers=[(key.decode() if key is not None else None, value) for key, value in instruction.headers], timestamp=instruction.timestamp, ).add_done_callback(producer_callback) except (KafkaError, AssertionError) as exc: diff --git a/karapace/kafka/producer.py b/karapace/kafka/producer.py index 0caecb4c2..2ee677c3b 100644 --- a/karapace/kafka/producer.py +++ b/karapace/kafka/producer.py @@ -32,7 +32,7 @@ class ProducerSendParams(TypedDict, total=False): key: str | bytes | None partition: int timestamp: int | None - headers: dict[str, bytes | None] | list[tuple[str, bytes | None]] | None + headers: dict[str | None, bytes | None] | list[tuple[str | None, bytes | None]] | None class KafkaProducer(_KafkaConfigMixin, Producer): diff --git a/stubs/confluent_kafka/cimpl.pyi b/stubs/confluent_kafka/cimpl.pyi index 9b573c5b9..ecf6d4420 100644 --- a/stubs/confluent_kafka/cimpl.pyi +++ b/stubs/confluent_kafka/cimpl.pyi @@ -51,7 +51,7 @@ class Producer: partition: int = -1, on_delivery: Callable[[KafkaError, Message], Any] | None = None, timestamp: int | None = -1, - headers: dict[str, bytes | None] | list[tuple[str, bytes | None]] | None = None, + headers: dict[str | None, bytes | None] | list[tuple[str | None, bytes | None]] | None = None, ) -> None: ... def flush(self, timeout: float = -1) -> None: ... def list_topics(self, topic: str | None = None, timeout: float = -1) -> ClusterMetadata: ... diff --git a/tests/integration/kafka/test_producer.py b/tests/integration/kafka/test_producer.py index 69e06ea08..bd31e83f8 100644 --- a/tests/integration/kafka/test_producer.py +++ b/tests/integration/kafka/test_producer.py @@ -19,7 +19,7 @@ def test_send(self, producer: KafkaProducer, new_topic: NewTopic) -> None: value = b"value" partition = 0 timestamp = int(time.time() * 1000) - headers = [(b"something", b"123")] + headers = [("something", b"123"), (None, "foobar")] fut = producer.send( new_topic.topic,