diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json index b5c5f8b1ce4b..2e4aca39ab53 100644 --- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json @@ -52,6 +52,7 @@ "ingest/handoff/failed" : { "dimensions" : ["dataSource"], "type" : "count" }, "ingest/merge/time" : { "dimensions" : ["dataSource"], "type" : "timer" }, "ingest/merge/cpu" : { "dimensions" : ["dataSource"], "type" : "timer" }, + "ingest/segments/count" : { "dimensions" : ["dataSource"], "type" : "count" }, "ingest/kafka/lag" : { "dimensions" : ["dataSource", "stream"], "type" : "gauge" }, "ingest/kafka/maxLag" : { "dimensions" : ["dataSource", "stream"], "type" : "gauge" }, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java index e09ac9ebd6ca..0a32158cf9bb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java @@ -25,6 +25,7 @@ import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.server.DruidNode; import java.util.Map; @@ -36,6 +37,8 @@ */ public interface ControllerContext { + ServiceEmitter emitter(); + ObjectMapper jsonMapper(); /** diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index f81b1e8a8271..5cdd7b0ffa6e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1331,6 +1331,7 @@ private void publishAllSegments(final Set segments) throws IOExcept final DataSourceMSQDestination destination = (DataSourceMSQDestination) task.getQuerySpec().getDestination(); final Set segmentsWithTombstones = new HashSet<>(segments); + int numTombstones = 0; if (destination.isReplaceTimeChunks()) { final List intervalsToDrop = findIntervalsToDrop(Preconditions.checkNotNull(segments, "segments")); @@ -1345,6 +1346,7 @@ private void publishAllSegments(final Set segments) throws IOExcept destination.getSegmentGranularity() ); segmentsWithTombstones.addAll(tombstones); + numTombstones = tombstones.size(); } catch (IllegalStateException e) { throw new MSQException(e, InsertLockPreemptedFault.instance()); @@ -1392,6 +1394,10 @@ private void publishAllSegments(final Set segments) throws IOExcept SegmentTransactionalInsertAction.appendAction(segments, null, null) ); } + + task.emitMetric(context.emitter(), "ingest/tombstones/count", numTombstones); + // Include tombstones in the reported segments count + task.emitMetric(context.emitter(), "ingest/segments/count", segmentsWithTombstones.size()); } /** diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index 5d17b005b941..401d6af7072c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.msq.exec.Controller; import org.apache.druid.msq.exec.ControllerContext; import org.apache.druid.msq.exec.WorkerClient; @@ -67,6 +68,12 @@ public IndexerControllerContext( this.workerManager = new IndexerWorkerManagerClient(overlordClient); } + @Override + public ServiceEmitter emitter() + { + return toolbox.getEmitter(); + } + @Override public ObjectMapper jsonMapper() { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index 2ee2207fd83a..027d2a913b21 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.msq.exec.Controller; import org.apache.druid.msq.exec.ControllerContext; import org.apache.druid.msq.exec.Worker; @@ -48,6 +49,7 @@ import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.QueryContext; import org.apache.druid.server.DruidNode; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; @@ -82,6 +84,7 @@ public class MSQTestControllerContext implements ControllerContext ); private final Injector injector; private final ObjectMapper mapper; + private final ServiceEmitter emitter = new NoopServiceEmitter(); private Controller controller; private Map report = null; @@ -215,6 +218,12 @@ public void close() } }; + @Override + public ServiceEmitter emitter() + { + return emitter; + } + @Override public ObjectMapper jsonMapper() {