diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index 596d914b9fdda..8e572f3061839 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -207,16 +207,16 @@ protected void possiblyResetDataSourceMetadata( } @Override - public void scheduleIOTimeReporting(RecordSupplier recordSupplier, ServiceEmitter emitter) + public void scheduleNonIOTimeReporting(RecordSupplier recordSupplier, ServiceEmitter emitter) { - Execs.scheduledSingleThreaded("KafkaIOTimeReporter-%d") - .scheduleAtFixedRate(() -> reportIOTime(recordSupplier, emitter), 1, 1, TimeUnit.MINUTES); + Execs.scheduledSingleThreaded("KafkaNonIOTimeReporter-%d") + .scheduleAtFixedRate(() -> reportNonIOTimePct(recordSupplier, emitter), 1, 1, TimeUnit.MINUTES); } - private void reportIOTime(RecordSupplier recordSupplier, ServiceEmitter emitter) { + private void reportNonIOTimePct(RecordSupplier recordSupplier, ServiceEmitter emitter) { double result = recordSupplier.pctIOTimeSpent(); if (result > 0) { - task.emitMetric(emitter, "kafka/io/time/pct", result); + task.emitMetric(emitter, "time/non-io/pct", 100.0 * (1 - result)); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 414708bc30693..0094b0816e28a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -426,7 +426,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception toolbox.getDataSegmentServerAnnouncer().announce(); toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode); } - scheduleIOTimeReporting(recordSupplier, toolbox.getEmitter()); + scheduleNonIOTimeReporting(recordSupplier, toolbox.getEmitter()); appenderator = task.newAppenderator(toolbox, segmentGenerationMetrics, rowIngestionMeters, parseExceptionHandler); driver = task.newDriver(appenderator, toolbox, segmentGenerationMetrics); @@ -2020,7 +2020,7 @@ protected abstract void possiblyResetDataSourceMetadata( throw new UnsupportedOperationException("Method not supported"); } - public void scheduleIOTimeReporting(RecordSupplier recordSupplier, ServiceEmitter emitter) + public void scheduleNonIOTimeReporting(RecordSupplier recordSupplier, ServiceEmitter emitter) { log.debug("No IO time reporting configured"); }