From c49940ed01c21e9d0381adc36223f4de0d0c8273 Mon Sep 17 00:00:00 2001 From: Ezequiel Keimel Date: Mon, 28 Oct 2024 09:53:13 +0100 Subject: [PATCH 1/2] BE: Closes #71 Messages: Show headers duplicates --- .../io/kafbat/ui/emitter/MessageFilters.java | 2 +- .../ui/serdes/ConsumerRecordDeserializer.java | 18 ++++--- .../ui/serdes/ProducerRecordCreator.java | 21 ++++++-- .../ConsumerRecordDeserializerTest.java | 48 ++++++++++++++++- .../ui/serdes/ProducerRecordCreatorTest.java | 51 +++++++++++++++++++ .../main/resources/swagger/kafbat-ui-api.yaml | 9 ++-- .../Topic/Messages/Filters/InfoModal.tsx | 7 +++ .../MessageContent/MessageContent.tsx | 2 +- .../__tests__/MessageContent.spec.tsx | 2 +- 9 files changed, 143 insertions(+), 17 deletions(-) create mode 100644 api/src/test/java/io/kafbat/ui/serdes/ProducerRecordCreatorTest.java diff --git a/api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java b/api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java index 402b5a43e..c7a4f4ba9 100644 --- a/api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java +++ b/api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java @@ -123,7 +123,7 @@ private static CelCompiler createCompiler() { "timestampMs", SimpleType.INT, "keyAsText", SimpleType.STRING, "valueAsText", SimpleType.STRING, - "headers", MapType.create(SimpleType.STRING, SimpleType.STRING), + "headers", MapType.create(SimpleType.STRING, SimpleType.DYN), "key", SimpleType.DYN, "value", SimpleType.DYN ); diff --git a/api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java b/api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java index b4eb51493..3c8e0dedb 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java +++ b/api/src/main/java/io/kafbat/ui/serdes/ConsumerRecordDeserializer.java @@ -6,8 +6,10 @@ import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneId; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.UnaryOperator; import lombok.RequiredArgsConstructor; @@ -63,14 +65,16 @@ private static TimestampTypeEnum mapToTimestampType(TimestampType timestampType) } private void fillHeaders(TopicMessageDTO message, ConsumerRecord rec) { - Map headers = new HashMap<>(); + Map> headersMap = new HashMap<>(); rec.headers().iterator() - .forEachRemaining(header -> - headers.put( - header.key(), - header.value() != null ? new String(header.value()) : null - )); - message.setHeaders(headers); + .forEachRemaining(header -> { + String key = header.key(); + String value = header.value() != null ? new String(header.value()) : null; + headersMap.computeIfAbsent(key, k -> new ArrayList<>()).add(value); + }); + Map finalHeadersMap = new HashMap<>(); + headersMap.forEach((key, values) -> finalHeadersMap.put(key, values.size() == 1 ? values.get(0) : values)); + message.setHeaders(finalHeadersMap); } private void fillKey(TopicMessageDTO message, ConsumerRecord rec) { diff --git a/api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java b/api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java index 359c871d6..cf230cc85 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java +++ b/api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java @@ -1,6 +1,8 @@ package io.kafbat.ui.serdes; +import io.kafbat.ui.exception.ValidationException; import io.kafbat.ui.serde.api.Serde; +import java.util.List; import java.util.Map; import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; @@ -19,7 +21,7 @@ public ProducerRecord create(String topic, @Nullable Integer partition, @Nullable String key, @Nullable String value, - @Nullable Map headers) { + @Nullable Map headers) { return new ProducerRecord<>( topic, partition, @@ -29,10 +31,23 @@ public ProducerRecord create(String topic, ); } - private Iterable
createHeaders(Map clientHeaders) { + private Iterable
createHeaders(Map clientHeaders) { RecordHeaders headers = new RecordHeaders(); - clientHeaders.forEach((k, v) -> headers.add(new RecordHeader(k, v.getBytes()))); + clientHeaders.forEach((k, v) -> { + if (v instanceof List valueList) { + valueList.forEach(value -> headers.add(new RecordHeader(k, valueToBytes(value)))); + } else { + headers.add(new RecordHeader(k, valueToBytes(v))); + } + }); return headers; } + private byte[] valueToBytes(Object value) { + if (value instanceof List || value instanceof Map) { + throw new ValidationException("Header values can only be string or list of strings"); + } + return value != null ? String.valueOf(value).getBytes() : null; + } + } diff --git a/api/src/test/java/io/kafbat/ui/serdes/ConsumerRecordDeserializerTest.java b/api/src/test/java/io/kafbat/ui/serdes/ConsumerRecordDeserializerTest.java index 649ac203c..9d853e823 100644 --- a/api/src/test/java/io/kafbat/ui/serdes/ConsumerRecordDeserializerTest.java +++ b/api/src/test/java/io/kafbat/ui/serdes/ConsumerRecordDeserializerTest.java @@ -1,13 +1,17 @@ package io.kafbat.ui.serdes; import static io.kafbat.ui.serde.api.DeserializeResult.Type.STRING; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import io.kafbat.ui.model.TopicMessageDTO; import io.kafbat.ui.serde.api.DeserializeResult; import io.kafbat.ui.serde.api.Serde; +import java.util.List; import java.util.Map; import java.util.function.UnaryOperator; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -22,9 +26,51 @@ void dataMaskingAppliedOnDeserializedMessage() { Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of()); var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock); - recordDeser.deserialize(new ConsumerRecord<>("t", 1, 1L, Bytes.wrap("t".getBytes()), Bytes.wrap("t".getBytes()))); + recordDeser.deserialize(record()); verify(maskerMock).apply(any(TopicMessageDTO.class)); } + @Test + void deserializeWithMultipleHeaderValues() { + UnaryOperator maskerMock = mock(); + when(maskerMock.apply(any(TopicMessageDTO.class))).thenAnswer(invocation -> invocation.getArgument(0)); + Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of()); + + var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock); + ConsumerRecord record = record(); + record.headers().add("headerKey", "headerValue1".getBytes()); + record.headers().add("headerKey", "headerValue2".getBytes()); + TopicMessageDTO message = recordDeser.deserialize(record); + + Map headers = message.getHeaders(); + assertEquals(1, headers.size()); + assertTrue(headers.get("headerKey") instanceof List); + assertEquals(List.of("headerValue1", "headerValue2"), headers.get("headerKey")); + } + + @Test + void deserializeWithMixedSingleAndMultipleHeaderValues() { + UnaryOperator maskerMock = mock(); + when(maskerMock.apply(any(TopicMessageDTO.class))).thenAnswer(invocation -> invocation.getArgument(0)); + Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of()); + + var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock); + ConsumerRecord record = record(); + record.headers().add("headerKey1", "singleValue".getBytes()); + record.headers().add("headerKey2", "multiValue1".getBytes()); + record.headers().add("headerKey2", "multiValue2".getBytes()); + TopicMessageDTO message = recordDeser.deserialize(record); + + Map headers = message.getHeaders(); + assertEquals("singleValue", headers.get("headerKey1")); + assertTrue(headers.get("headerKey2") instanceof List); + assertEquals(2, ((List) headers.get("headerKey2")).size()); + assertEquals(List.of("multiValue1", "multiValue2"), headers.get("headerKey2")); + } + + private ConsumerRecord record() { + return new ConsumerRecord<>("t", 1, 1L, Bytes.wrap("t".getBytes()), Bytes.wrap("t".getBytes())); + } + } diff --git a/api/src/test/java/io/kafbat/ui/serdes/ProducerRecordCreatorTest.java b/api/src/test/java/io/kafbat/ui/serdes/ProducerRecordCreatorTest.java new file mode 100644 index 000000000..81b449343 --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/serdes/ProducerRecordCreatorTest.java @@ -0,0 +1,51 @@ +package io.kafbat.ui.serdes; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; + +import io.kafbat.ui.exception.ValidationException; +import io.kafbat.ui.serde.api.Serde; +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.junit.jupiter.api.Test; + +class ProducerRecordCreatorTest { + + @Test + void createWithHeaders() { + Serde.Serializer keySerializer = mock(Serde.Serializer.class); + Serde.Serializer valueSerializer = mock(Serde.Serializer.class); + + ProducerRecordCreator recordCreator = new ProducerRecordCreator(keySerializer, valueSerializer); + Map headersMap = Map.of( + "headerKey1", "headerValue1", + "headerKey2", List.of("headerValue2", "headerValue3") + ); + ProducerRecord record = recordCreator.create("topic", 1, "key", "value", headersMap); + + assertNotNull(record.headers()); + assertEquals(3, record.headers().toArray().length); + assertThat(record.headers()).containsExactlyInAnyOrder( + new RecordHeader("headerKey1", "headerValue1".getBytes()), + new RecordHeader("headerKey2", "headerValue2".getBytes()), + new RecordHeader("headerKey2", "headerValue3".getBytes()) + ); + } + + @Test + void createWithInvalidHeaderValue() { + Serde.Serializer keySerializer = mock(Serde.Serializer.class); + Serde.Serializer valueSerializer = mock(Serde.Serializer.class); + + ProducerRecordCreator recordCreator = new ProducerRecordCreator(keySerializer, valueSerializer); + Map invalidHeaders = Map.of("headerKey", Map.of("invalid", "value")); + + assertThrows(ValidationException.class, () -> + recordCreator.create("topic", 1, "key", "value", invalidHeaders)); + } +} diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index 5eede6cef..37fc18584 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -2919,8 +2919,9 @@ components: type: string headers: type: object + description: header values can be string or list of strings (for keys with multiple values) additionalProperties: - type: string + type: object partition: type: integer offset: @@ -2948,8 +2949,9 @@ components: nullable: true headers: type: object + description: header values can be string or list of strings (for keys with multiple values) additionalProperties: - type: string + type: object content: type: string nullable: true @@ -3038,8 +3040,9 @@ components: type: string headers: type: object + description: header values can be string or list of strings (for keys with multiple values) additionalProperties: - type: string + type: object content: type: string keyFormat: diff --git a/frontend/src/components/Topics/Topic/Messages/Filters/InfoModal.tsx b/frontend/src/components/Topics/Topic/Messages/Filters/InfoModal.tsx index bb134862f..39c8868ef 100644 --- a/frontend/src/components/Topics/Topic/Messages/Filters/InfoModal.tsx +++ b/frontend/src/components/Topics/Topic/Messages/Filters/InfoModal.tsx @@ -54,6 +54,13 @@ const InfoModal: React.FC = ({ toggleIsOpen }) => { record.headers['k2'] == 'v2' + + + "v1" in (type(record.headers.k1) == list ? + record.headers.k1 : [record.headers.k1]) + - in case a header can hold either a single value (string) or + multiple values (list of strings) +