From 43145a448442efa3e34db9e80fa8a3a5c3ae675f Mon Sep 17 00:00:00 2001 From: Arun Ramani Date: Tue, 19 Nov 2024 17:16:56 +0100 Subject: [PATCH 1/2] Emit production rate --- .../kafka/supervisor/KafkaSupervisor.java | 44 +++++++++++++++++++ .../supervisor/SeekableStreamSupervisor.java | 39 ++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index aebacecff662..9477c644c62b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -60,6 +60,7 @@ import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.joda.time.DateTime; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; @@ -96,6 +97,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor previousLatestSequenceFromStream; private volatile Map latestSequenceFromStream; @@ -277,6 +279,29 @@ protected Map getPartitionRecordLag() return getRecordLagPerPartitionInLatestSequences(highestCurrentOffsets); } + @Nullable + @Override + @SuppressWarnings("SSBasedInspection") + protected Map getPartitionProductionRate() + { + Map diff = calculateDiff( + latestSequenceFromStream, + previousLatestSequenceFromStream + ); + + previousLatestSequenceFromStream = latestSequenceFromStream + .entrySet() + .stream() + .collect( + Collectors.toMap( + Entry::getKey, + Entry::getValue + ) + ); + + return diff; + } + @Nullable @Override protected Map getPartitionTimeLag() @@ -524,4 +549,23 @@ private KafkaTopicPartition getMatchingKafkaTopicPartition( return match ? new KafkaTopicPartition(isMultiTopic(), streamMatchValue, kafkaTopicPartition.partition()) : null; } + + @SuppressWarnings("SSBasedInspection") + private Map calculateDiff( + @Nonnull Map left, + @Nonnull Map right + ) + { + return left + .entrySet() + .stream() + .collect( + Collectors.toMap( + Entry::getKey, + e -> e.getValue() != null + ? e.getValue() - Optional.ofNullable(right.get(e.getKey())).orElse(0L) + : 0 + ) + ); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 8b4845b08d07..aeb0eacb864b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -4126,6 +4126,12 @@ private void updateCurrentOffsets() throws InterruptedException, ExecutionExcept @Nullable protected abstract Map getPartitionTimeLag(); + @Nullable + protected Map getPartitionProductionRate() + { + return null; + } + /** * Gets highest current offsets of all the tasks (actively reading and publishing) for all partitions of the stream. * In case if no task is reading for a partition, returns offset stored in metadata storage for that partition. @@ -4509,6 +4515,7 @@ protected void emitLag() try { Map partitionRecordLags = getPartitionRecordLag(); Map partitionTimeLags = getPartitionTimeLag(); + Map partitionProductionRate = getPartitionProductionRate(); if (partitionRecordLags == null && partitionTimeLags == null) { throw new ISE("Latest offsets have not been fetched"); @@ -4573,9 +4580,41 @@ protected void emitLag() ); }; + if (productionRates == null) { + return; + } + + Map metricTags = spec.getContextValue(DruidMetrics.TAGS); + for (Map.Entry entry : productionRates.entrySet()) { + emitter.emit( + ServiceMetricEvent.builder() + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .setDimension(DruidMetrics.STREAM, getIoConfig().getStream()) + .setDimension(DruidMetrics.PARTITION, entry.getKey()) + .setDimensionIfNotNull(DruidMetrics.TAGS, metricTags) + .setMetric( + StringUtils.format("ingest/%s/partitionProduction%s", type, suffix), + entry.getValue() + ) + ); + } + emitter.emit( + ServiceMetricEvent.builder() + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .setDimension(DruidMetrics.STREAM, getIoConfig().getStream()) + .setDimensionIfNotNull(DruidMetrics.TAGS, metricTags) + .setMetric( + StringUtils.format("ingest/%s/production%s", type, suffix), + productionRates.values().stream().mapToLong(e -> e).sum() + ) + ); + }; + // this should probably really be /count or /records or something.. but keeping like this for backwards compat emitFn.accept(partitionRecordLags, ""); emitFn.accept(partitionTimeLags, "/time"); + + productionEmitFn.accept(partitionProductionRate, ""); } catch (Exception e) { log.warn(e, "Unable to compute lag"); From fe72dc2ba4d1ab61479d6797fa186b8fc0960c7e Mon Sep 17 00:00:00 2001 From: Arun Ramani Date: Fri, 22 Nov 2024 17:24:00 +0100 Subject: [PATCH 2/2] Add some docs --- docs/operations/metrics.md | 13 +++++++------ .../supervisor/SeekableStreamSupervisor.java | 1 + 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 4df9e7987ccc..b943ff436794 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -206,12 +206,13 @@ field in the `context` field of the ingestion spec. `tags` is expected to be a m These metrics apply to the [Kafka indexing service](../ingestion/kafka-ingestion.md). -|Metric|Description|Dimensions|Normal value| -|------|-----------|----------|------------| -|`ingest/kafka/lag`|Total lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, should not be a very high number. | -|`ingest/kafka/maxLag`|Max lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, should not be a very high number. | -|`ingest/kafka/avgLag`|Average lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, should not be a very high number. | -|`ingest/kafka/partitionLag`|Partition-wise lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `partition`, `tags`|Greater than 0, should not be a very high number. | +|Metric| Description |Dimensions|Normal value| +|------|----------------------------------------------------------------------------------------------------------------------------------------------------------|----------|------------| +|`ingest/kafka/lag`| Total lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute. |`dataSource`, `stream`, `tags`|Greater than 0, should not be a very high number. | +|`ingest/kafka/maxLag`| Max lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute. |`dataSource`, `stream`, `tags`|Greater than 0, should not be a very high number. | +|`ingest/kafka/avgLag`| Average lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers across all partitions. Minimum emission period for this metric is a minute. |`dataSource`, `stream`, `tags`|Greater than 0, should not be a very high number. | +|`ingest/kafka/partitionLag`| Partition-wise lag between the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka brokers. Minimum emission period for this metric is a minute. |`dataSource`, `stream`, `partition`, `tags`|Greater than 0, should not be a very high number. | +|`ingest/kafka/partitionProduction`| Partition-wise difference between the latest offsets in Kafka brokers since the previous collection. Minimum emission period for this metric is a minute.|`dataSource`, `stream`, `partition`, `tags`|Greater than 0. | ### Ingestion metrics for Kinesis diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index aeb0eacb864b..900b2b179a84 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -4580,6 +4580,7 @@ protected void emitLag() ); }; + BiConsumer, String> productionEmitFn = (productionRates, suffix) -> { if (productionRates == null) { return; }