From a38b4f04919c04bef2aaa9d6ffd4bcb3a653653a Mon Sep 17 00:00:00 2001 From: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com> Date: Mon, 21 Aug 2023 21:32:34 +0530 Subject: [PATCH] Add topic name as a column in the Kafka Input format (#14857) This PR adds a way to store the topic name in a column. Such a column can be used to distinguish messages coming from different topics in multi-topic ingestion. --- .../input/kafkainput/KafkaInputFormat.java | 21 ++++++-- .../input/kafkainput/KafkaInputReader.java | 8 ++- .../kafkainput/KafkaInputFormatTest.java | 50 +++++++++++++++---- .../indexing/kafka/KafkaIndexTaskTest.java | 2 +- .../indexing/kafka/KafkaSamplerSpecTest.java | 1 + 5 files changed, 65 insertions(+), 17 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java index 8f86ff07ded9..129966705740 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java @@ -40,6 +40,7 @@ public class KafkaInputFormat implements InputFormat { private static final String DEFAULT_HEADER_COLUMN_PREFIX = "kafka.header."; private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = "kafka.timestamp"; + private static final String DEFAULT_TOPIC_COLUMN_NAME = "kafka.topic"; private static final String DEFAULT_KEY_COLUMN_NAME = "kafka.key"; public static final String DEFAULT_AUTO_TIMESTAMP_STRING = "__kif_auto_timestamp"; @@ -54,6 +55,7 @@ public class KafkaInputFormat implements InputFormat private final String headerColumnPrefix; private final String keyColumnName; private final String timestampColumnName; + private final String topicColumnName; public KafkaInputFormat( @JsonProperty("headerFormat") @Nullable KafkaHeaderFormat headerFormat, @@ -61,7 +63,8 @@ public KafkaInputFormat( @JsonProperty("valueFormat") InputFormat valueFormat, @JsonProperty("headerColumnPrefix") @Nullable String headerColumnPrefix, @JsonProperty("keyColumnName") @Nullable String keyColumnName, - @JsonProperty("timestampColumnName") @Nullable String timestampColumnName + @JsonProperty("timestampColumnName") @Nullable String timestampColumnName, + @JsonProperty("topicColumnName") @Nullable String topicColumnName ) { this.headerFormat = headerFormat; @@ -70,6 +73,7 @@ public KafkaInputFormat( this.headerColumnPrefix = headerColumnPrefix != null ? headerColumnPrefix : DEFAULT_HEADER_COLUMN_PREFIX; this.keyColumnName = keyColumnName != null ? keyColumnName : DEFAULT_KEY_COLUMN_NAME; this.timestampColumnName = timestampColumnName != null ? timestampColumnName : DEFAULT_TIMESTAMP_COLUMN_NAME; + this.topicColumnName = topicColumnName != null ? topicColumnName : DEFAULT_TOPIC_COLUMN_NAME; } @Override @@ -116,7 +120,8 @@ record -> temporaryDirectory ), keyColumnName, - timestampColumnName + timestampColumnName, + topicColumnName ); } @@ -161,6 +166,13 @@ public String getTimestampColumnName() return timestampColumnName; } + @Nullable + @JsonProperty + public String getTopicColumnName() + { + return topicColumnName; + } + @Override public boolean equals(Object o) { @@ -176,14 +188,15 @@ public boolean equals(Object o) && Objects.equals(keyFormat, that.keyFormat) && Objects.equals(headerColumnPrefix, that.headerColumnPrefix) && Objects.equals(keyColumnName, that.keyColumnName) - && Objects.equals(timestampColumnName, that.timestampColumnName); + && Objects.equals(timestampColumnName, that.timestampColumnName) + && Objects.equals(topicColumnName, that.topicColumnName); } @Override public int hashCode() { return Objects.hash(headerFormat, valueFormat, keyFormat, - headerColumnPrefix, keyColumnName, timestampColumnName + headerColumnPrefix, keyColumnName, timestampColumnName, topicColumnName ); } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java index 6d43a2e96fef..31b7cf66be19 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java @@ -56,6 +56,7 @@ public class KafkaInputReader implements InputEntityReader private final InputEntityReader valueParser; private final String keyColumnName; private final String timestampColumnName; + private final String topicColumnName; /** * @@ -74,7 +75,8 @@ public KafkaInputReader( @Nullable Function keyParserSupplier, InputEntityReader valueParser, String keyColumnName, - String timestampColumnName + String timestampColumnName, + String topicColumnName ) { this.inputRowSchema = inputRowSchema; @@ -84,6 +86,7 @@ public KafkaInputReader( this.valueParser = valueParser; this.keyColumnName = keyColumnName; this.timestampColumnName = timestampColumnName; + this.topicColumnName = topicColumnName; } @Override @@ -128,6 +131,9 @@ private Map extractHeader(KafkaRecordEntity record) // the header list mergedHeaderMap.putIfAbsent(timestampColumnName, record.getRecord().timestamp()); + // Add kafka record topic to the mergelist, only if the key doesn't already exist + mergedHeaderMap.putIfAbsent(topicColumnName, record.getRecord().topic()); + return mergedHeaderMap; } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java index f65f335df9a1..21a0550f53e7 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java @@ -59,6 +59,7 @@ public class KafkaInputFormatTest { private KafkaRecordEntity inputEntity; private final long timestamp = DateTimes.of("2021-06-24").getMillis(); + private static final String TOPIC = "sample"; private static final Iterable
SAMPLE_HEADERS = ImmutableList.of( new Header() { @@ -126,7 +127,8 @@ public void setUp() ), "kafka.newheader.", "kafka.newkey.key", - "kafka.newts.timestamp" + "kafka.newts.timestamp", + "kafka.newtopic.topic" ); } @@ -166,7 +168,8 @@ public void testSerde() throws JsonProcessingException ), "kafka.newheader.", "kafka.newkey.key", - "kafka.newts.timestamp" + "kafka.newts.timestamp", + "kafka.newtopic.topic" ); Assert.assertEquals(format, kif); @@ -209,7 +212,8 @@ public void testWithHeaderKeyAndValue() throws IOException "foo", "kafka.newheader.encoding", "kafka.newheader.kafkapkc", - "kafka.newts.timestamp" + "kafka.newts.timestamp", + "kafka.newtopic.topic" ) ) ), @@ -231,7 +235,8 @@ public void testWithHeaderKeyAndValue() throws IOException "foo", "kafka.newheader.encoding", "kafka.newheader.kafkapkc", - "kafka.newts.timestamp" + "kafka.newts.timestamp", + "kafka.newtopic.topic" ), row.getDimensions() ); @@ -254,6 +259,10 @@ public void testWithHeaderKeyAndValue() throws IOException String.valueOf(DateTimes.of("2021-06-24").getMillis()), Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp")) ); + Assert.assertEquals( + TOPIC, + Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic")) + ); Assert.assertEquals( "2021-06-25", Iterables.getOnlyElement(row.getDimension("timestamp")) @@ -302,7 +311,8 @@ public void testWithOutKey() throws IOException "foo", "kafka.newheader.encoding", "kafka.newheader.kafkapkc", - "kafka.newts.timestamp" + "kafka.newts.timestamp", + "kafka.newtopic.topic" ) ) ), @@ -478,7 +488,7 @@ public void testWithOutKeyAndHeaderSpecs() throws IOException null, null, false, //make sure JsonReader is used false, false ), - "kafka.newheader.", "kafka.newkey.", "kafka.newts." + "kafka.newheader.", "kafka.newkey.", "kafka.newts.", "kafka.newtopic." ); final InputEntityReader reader = localFormat.createReader( @@ -489,7 +499,8 @@ public void testWithOutKeyAndHeaderSpecs() throws IOException ImmutableList.of( "bar", "foo", - "kafka.newts.timestamp" + "kafka.newts.timestamp", + "kafka.newtopic.topic" ) ) ), @@ -567,7 +578,8 @@ public void testWithMultipleMixedRecords() throws IOException "foo", "kafka.newheader.encoding", "kafka.newheader.kafkapkc", - "kafka.newts.timestamp" + "kafka.newts.timestamp", + "kafka.newtopic.topic" ) ) ), @@ -613,6 +625,10 @@ public void testWithMultipleMixedRecords() throws IOException String.valueOf(DateTimes.of("2021-06-24").getMillis()), Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp")) ); + Assert.assertEquals( + TOPIC, + Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic")) + ); Assert.assertEquals(String.valueOf(i), Iterables.getOnlyElement(row.getDimension("kafka.newheader.indexH"))); @@ -669,7 +685,8 @@ public void testMissingTimestampThrowsException() throws IOException "foo", "kafka.newheader.encoding", "kafka.newheader.kafkapkc", - "kafka.newts.timestamp" + "kafka.newts.timestamp", + "kafka.newtopic.topic" ) ) ), @@ -683,7 +700,8 @@ public void testMissingTimestampThrowsException() throws IOException while (iterator.hasNext()) { Throwable t = Assert.assertThrows(ParseException.class, () -> iterator.next()); Assert.assertEquals( - "Timestamp[null] is unparseable! Event: {foo=x, kafka.newts.timestamp=1624492800000, kafka.newkey.key=sampleKey, root_baz=4, bar=null, kafka...", + "Timestamp[null] is unparseable! Event: {kafka.newtopic.topic=sample, foo=x, kafka.newts" + + ".timestamp=1624492800000, kafka.newkey.key=sampleKey...", t.getMessage() ); } @@ -733,6 +751,7 @@ public void testWithSchemaDiscovery() throws IOException final InputRow row = iterator.next(); Assert.assertEquals( Arrays.asList( + "kafka.newtopic.topic", "foo", "kafka.newts.timestamp", "kafka.newkey.key", @@ -767,6 +786,10 @@ public void testWithSchemaDiscovery() throws IOException String.valueOf(DateTimes.of("2021-06-24").getMillis()), Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp")) ); + Assert.assertEquals( + TOPIC, + Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic")) + ); Assert.assertEquals( "2021-06-25", Iterables.getOnlyElement(row.getDimension("timestamp")) @@ -834,6 +857,7 @@ public void testWithPartialDeclarationSchemaDiscovery() throws IOException Arrays.asList( "bar", "kafka.newheader.kafkapkc", + "kafka.newtopic.topic", "foo", "kafka.newts.timestamp", "kafka.newkey.key", @@ -866,6 +890,10 @@ public void testWithPartialDeclarationSchemaDiscovery() throws IOException String.valueOf(DateTimes.of("2021-06-24").getMillis()), Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp")) ); + Assert.assertEquals( + TOPIC, + Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic")) + ); Assert.assertEquals( "2021-06-25", Iterables.getOnlyElement(row.getDimension("timestamp")) @@ -889,7 +917,7 @@ private KafkaRecordEntity makeInputEntity(byte[] key, byte[] payload, Headers he { return new KafkaRecordEntity( new ConsumerRecord<>( - "sample", + TOPIC, 0, 0, timestamp, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 135c87c4e1f2..04393cb914b9 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -171,7 +171,7 @@ public byte[] value() new KafkaStringHeaderFormat(null), INPUT_FORMAT, INPUT_FORMAT, - "kafka.testheader.", "kafka.key", "kafka.timestamp" + "kafka.testheader.", "kafka.key", "kafka.timestamp", "kafka.topic" ); private static TestingCluster zkServer; diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java index 9a6fd03726d0..0a0b64396a66 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -277,6 +277,7 @@ public void testSampleKafkaInputFormat() new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null, null, + null, null ),