diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/SinkQuerySegmentWalkerBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/SinkQuerySegmentWalkerBenchmark.java new file mode 100644 index 000000000000..8342dd565ca1 --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/SinkQuerySegmentWalkerBenchmark.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark; + +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.core.LoggingEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.Result; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.query.timeseries.TimeseriesResultValue; +import org.apache.druid.segment.realtime.appenderator.Appenderator; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorTester; +import org.apache.druid.segment.realtime.sink.Committers; +import org.apache.druid.timeline.partition.LinearShardSpec; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.File; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class SinkQuerySegmentWalkerBenchmark +{ + static { + NullHandling.initializeForTests(); + } + + @Param({"10", "50", "100", "200"}) + private int numFireHydrants; + + private final LoggingEmitter loggingEmitter = new LoggingEmitter(new Logger(LoggingEmitter.class), LoggingEmitter.Level.INFO, new DefaultObjectMapper()); + private final ServiceEmitter serviceEmitter = new ServiceEmitter("test", "test", loggingEmitter); + private File cacheDir; + + private Appenderator appenderator; + + @Setup(Level.Trial) + public void setup() throws Exception + { + final String userConfiguredCacheDir = System.getProperty("druid.benchmark.cacheDir", System.getenv("DRUID_BENCHMARK_CACHE_DIR")); + cacheDir = new File(userConfiguredCacheDir); + final StreamAppenderatorTester tester = + new StreamAppenderatorTester.Builder().maxRowsInMemory(1) + .basePersistDirectory(cacheDir) + .withServiceEmitter(serviceEmitter) + .build(); + + appenderator = tester.getAppenderator(); + appenderator.startJob(); + + final SegmentIdWithShardSpec segmentIdWithShardSpec = new SegmentIdWithShardSpec( + StreamAppenderatorTester.DATASOURCE, + Intervals.of("2000/2001"), + "A", + new LinearShardSpec(0) + ); + + for (int i = 0; i < numFireHydrants; i++) { + final MapBasedInputRow inputRow = new MapBasedInputRow( + DateTimes.of("2000").getMillis(), + ImmutableList.of("dim"), + ImmutableMap.of( + "dim", + "bar_" + i, + "met", + 1 + ) + ); + appenderator.add(segmentIdWithShardSpec, inputRow, Suppliers.ofInstance(Committers.nil())); + } + } + + @TearDown(Level.Trial) + public void tearDown() throws Exception + { + appenderator.close(); + FileUtils.deleteDirectory(cacheDir); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void emitSinkMetrics(Blackhole blackhole) throws Exception + { + { + final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder() + .dataSource(StreamAppenderatorTester.DATASOURCE) + .intervals(ImmutableList.of(Intervals.of("2000/2001"))) + .aggregators( + Arrays.asList( + new LongSumAggregatorFactory("count", "count"), + new LongSumAggregatorFactory("met", "met") + ) + ) + .granularity(Granularities.DAY) + .build(); + + final List> results = + QueryPlus.wrap(query1).run(appenderator, ResponseContext.createEmpty()).toList(); + blackhole.consume(results); + + serviceEmitter.flush(); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java index 1273aebc0198..9f349484153a 100644 --- a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java @@ -40,6 +40,10 @@ */ public class DefaultQueryMetrics> implements QueryMetrics { + public static final String QUERY_WAIT_TIME = "query/wait/time"; + public static final String QUERY_SEGMENT_TIME = "query/segment/time"; + public static final String QUERY_SEGMENT_AND_CACHE_TIME = "query/segmentAndCache/time"; + protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); protected final Map metrics = new HashMap<>(); @@ -235,19 +239,19 @@ public QueryMetrics reportQueryBytes(long byteCount) @Override public QueryMetrics reportWaitTime(long timeNs) { - return reportMillisTimeMetric("query/wait/time", timeNs); + return reportMillisTimeMetric(QUERY_WAIT_TIME, timeNs); } @Override public QueryMetrics reportSegmentTime(long timeNs) { - return reportMillisTimeMetric("query/segment/time", timeNs); + return reportMillisTimeMetric(QUERY_SEGMENT_TIME, timeNs); } @Override public QueryMetrics reportSegmentAndCacheTime(long timeNs) { - return reportMillisTimeMetric("query/segmentAndCache/time", timeNs); + return reportMillisTimeMetric(QUERY_SEGMENT_AND_CACHE_TIME, timeNs); } @Override diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java index e4a8b9403dd0..323d8cd308c9 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java @@ -38,7 +38,7 @@ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifie { private final List events = new ArrayList<>(); private final List alertEvents = new ArrayList<>(); - private final ConcurrentHashMap> metricEvents = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> metricEvents = new ConcurrentHashMap<>(); public StubServiceEmitter() { @@ -56,7 +56,7 @@ public void emit(Event event) if (event instanceof ServiceMetricEvent) { ServiceMetricEvent metricEvent = (ServiceMetricEvent) event; metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new ArrayList<>()) - .add(metricEvent); + .add(new ServiceMetricEventSnapshot(metricEvent)); } else if (event instanceof AlertEvent) { alertEvents.add((AlertEvent) event); } @@ -76,7 +76,7 @@ public List getEvents() * * @return Map from metric name to list of events emitted for that metric. */ - public Map> getMetricEvents() + public Map> getMetricEvents() { return metricEvents; } @@ -96,18 +96,18 @@ public List getMetricValues( ) { final List values = new ArrayList<>(); - final List events = + final List events = metricEvents.getOrDefault(metricName, Collections.emptyList()); final Map filters = dimensionFilters == null ? Collections.emptyMap() : dimensionFilters; - for (ServiceMetricEvent event : events) { + for (ServiceMetricEventSnapshot event : events) { final Map userDims = event.getUserDims(); boolean match = filters.keySet().stream() .map(d -> filters.get(d).equals(userDims.get(d))) .reduce((a, b) -> a && b) .orElse(true); if (match) { - values.add(event.getValue()); + values.add(event.getMetricEvent().getValue()); } } @@ -131,4 +131,32 @@ public void flush() public void close() { } + + /** + * Helper class to encapsulate a ServiceMetricEvent and its user dimensions. + * Since {@link StubServiceEmitter} doesn't actually emit metrics and saves the emitted metrics in-memory, + * this helper class saves a copy of {@link ServiceMetricEvent#userDims} of emitted metrics + * via {@link ServiceMetricEvent#getUserDims()} as it can get mutated. + */ + public static class ServiceMetricEventSnapshot + { + private final ServiceMetricEvent metricEvent; + private final Map userDims; + + public ServiceMetricEventSnapshot(ServiceMetricEvent metricEvent) + { + this.metricEvent = metricEvent; + this.userDims = metricEvent.getUserDims(); + } + + public ServiceMetricEvent getMetricEvent() + { + return metricEvent; + } + + public Map getUserDims() + { + return userDims; + } + } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 10c775aa0a30..c256e82c6d2d 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.druid.client.CachingQueryRunner; import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; @@ -30,18 +32,23 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.FunctionalIterable; +import org.apache.druid.java.util.common.guava.LazySequence; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.SequenceWrapper; +import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.BySegmentQueryRunner; import org.apache.druid.query.CPUTimeMetricQueryRunner; import org.apache.druid.query.DataSource; +import org.apache.druid.query.DefaultQueryMetrics; import org.apache.druid.query.DirectQueryProcessingPool; import org.apache.druid.query.FinalizeResultsQueryRunner; -import org.apache.druid.query.MetricsEmittingQueryRunner; import org.apache.druid.query.NoopQueryRunner; import org.apache.druid.query.Query; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryMetrics; +import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; @@ -52,6 +59,7 @@ import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.SinkQueryRunners; +import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; @@ -69,6 +77,7 @@ import org.apache.druid.utils.CloseableUtils; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -76,8 +85,11 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import java.util.function.ObjLongConsumer; import java.util.stream.Collectors; /** @@ -88,6 +100,19 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker private static final EmittingLogger log = new EmittingLogger(SinkQuerySegmentWalker.class); private static final String CONTEXT_SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment"; + private static final Set SEGMENT_QUERY_METRIC = ImmutableSet.of(DefaultQueryMetrics.QUERY_SEGMENT_TIME); + private static final Set SEGMENT_CACHE_AND_WAIT_METRICS = ImmutableSet.of( + DefaultQueryMetrics.QUERY_WAIT_TIME, + DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME + ); + + private static final Map>> METRICS_TO_REPORT = + ImmutableMap.of( + DefaultQueryMetrics.QUERY_SEGMENT_TIME, QueryMetrics::reportSegmentTime, + DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME, QueryMetrics::reportSegmentAndCacheTime, + DefaultQueryMetrics.QUERY_WAIT_TIME, QueryMetrics::reportWaitTime + ); + private final String dataSource; // Maintain a timeline of ids and Sinks for all the segments including the base and upgraded versions @@ -193,6 +218,7 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final final List allSegmentReferences = new ArrayList<>(); final Map segmentIdMap = new HashMap<>(); final LinkedHashMap>> allRunners = new LinkedHashMap<>(); + final ConcurrentHashMap segmentMetricsAccumulator = new ConcurrentHashMap<>(); try { for (final SegmentDescriptor descriptor : specs) { @@ -231,12 +257,13 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final descriptor, sinkSegmentReferences.stream().map( segmentReference -> { - QueryRunner runner = new MetricsEmittingQueryRunner<>( + QueryRunner runner = new SinkMetricsEmittingQueryRunner<>( emitter, factory.getToolchest(), factory.createRunner(segmentReference.getSegment()), - QueryMetrics::reportSegmentTime, - queryMetrics -> queryMetrics.segment(sinkSegmentId.toString()) + segmentMetricsAccumulator, + SEGMENT_QUERY_METRIC, + sinkSegmentId.toString() ); // 1) Only use caching if data is immutable @@ -273,13 +300,14 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final // Regardless of whether caching is enabled, do reportSegmentAndCacheTime outside the // *possible* caching. - runner = new MetricsEmittingQueryRunner<>( + runner = new SinkMetricsEmittingQueryRunner<>( emitter, factory.getToolchest(), runner, - QueryMetrics::reportSegmentAndCacheTime, - queryMetrics -> queryMetrics.segment(sinkSegmentId.toString()) - ).withWaitMeasuredFromNow(); + segmentMetricsAccumulator, + SEGMENT_CACHE_AND_WAIT_METRICS, + sinkSegmentId.toString() + ); // Emit CPU time metrics. runner = CPUTimeMetricQueryRunner.safeBuild( @@ -344,7 +372,17 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final return new ResourceIdPopulatingQueryRunner<>( QueryRunnerHelper.makeClosingQueryRunner( CPUTimeMetricQueryRunner.safeBuild( - new FinalizeResultsQueryRunner<>(toolChest.mergeResults(mergedRunner, true), toolChest), + new SinkMetricsEmittingQueryRunner<>( + emitter, + toolChest, + new FinalizeResultsQueryRunner<>( + toolChest.mergeResults(mergedRunner, true), + toolChest + ), + segmentMetricsAccumulator, + Collections.emptySet(), + null + ), toolChest, emitter, cpuTimeAccumulator, @@ -415,7 +453,153 @@ public static String makeHydrantCacheIdentifier(final SegmentId segmentId, final // with subsegments (hydrants). return segmentId + "_H" + hydrantNumber; } + + /** + * This class is responsible for emitting query/segment/time, query/wait/time and query/segmentAndCache/Time metrics for a Sink. + * It accumulates query/segment/time and query/segmentAndCache/time metric for each FireHydrant at the level of Sink. + * query/wait/time metric is the time taken to process the first FireHydrant for the Sink. + *

+ * This class operates in two distinct modes based on whether {@link SinkMetricsEmittingQueryRunner#segmentId} is null or non-null. + * When segmentId is non-null, it accumulates the metrics. When segmentId is null, it emits the accumulated metrics. + *

+ * This class is derived from {@link org.apache.druid.query.MetricsEmittingQueryRunner}. + */ + private static class SinkMetricsEmittingQueryRunner implements QueryRunner + { + private final ServiceEmitter emitter; + private final QueryToolChest> queryToolChest; + private final QueryRunner queryRunner; + private final ConcurrentHashMap segmentMetricsAccumulator; + private final Set metricsToCompute; + @Nullable + private final String segmentId; + private final long creationTimeNs; + + private SinkMetricsEmittingQueryRunner( + ServiceEmitter emitter, + QueryToolChest> queryToolChest, + QueryRunner queryRunner, + ConcurrentHashMap segmentMetricsAccumulator, + Set metricsToCompute, + @Nullable String segmentId + ) + { + this.emitter = emitter; + this.queryToolChest = queryToolChest; + this.queryRunner = queryRunner; + this.segmentMetricsAccumulator = segmentMetricsAccumulator; + this.metricsToCompute = metricsToCompute; + this.segmentId = segmentId; + this.creationTimeNs = System.nanoTime(); + } + @Override + public Sequence run(final QueryPlus queryPlus, final ResponseContext responseContext) + { + QueryPlus queryWithMetrics = queryPlus.withQueryMetrics(queryToolChest); + return Sequences.wrap( + // Use LazySequence because we want to account execution time of queryRunner.run() (it prepares the underlying + // Sequence) as part of the reported query time, i.e. we want to execute queryRunner.run() after + // `startTimeNs = System.nanoTime();` + new LazySequence<>(() -> queryRunner.run(queryWithMetrics, responseContext)), + new SequenceWrapper() + { + private long startTimeNs; + + @Override + public void before() + { + startTimeNs = System.nanoTime(); + } + + @Override + public void after(boolean isDone, Throwable thrown) + { + if (segmentId != null) { + // accumulate metrics + final SegmentMetrics metrics = segmentMetricsAccumulator.computeIfAbsent(segmentId, id -> new SegmentMetrics()); + if (metricsToCompute.contains(DefaultQueryMetrics.QUERY_WAIT_TIME)) { + metrics.setWaitTime(startTimeNs - creationTimeNs); + } + if (metricsToCompute.contains(DefaultQueryMetrics.QUERY_SEGMENT_TIME)) { + metrics.addSegmentTime(System.nanoTime() - startTimeNs); + } + if (metricsToCompute.contains(DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME)) { + metrics.addSegmentAndCacheTime(System.nanoTime() - startTimeNs); + } + } else { + final QueryMetrics queryMetrics = queryWithMetrics.getQueryMetrics(); + // report accumulated metrics + for (Map.Entry segmentAndMetrics : segmentMetricsAccumulator.entrySet()) { + queryMetrics.segment(segmentAndMetrics.getKey()); + + for (Map.Entry>> reportMetric : METRICS_TO_REPORT.entrySet()) { + final String metricName = reportMetric.getKey(); + switch (metricName) { + case DefaultQueryMetrics.QUERY_SEGMENT_TIME: + reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().getSegmentTime()); + case DefaultQueryMetrics.QUERY_WAIT_TIME: + reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().getWaitTime()); + case DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME: + reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().getSegmentAndCacheTime()); + } + } + + try { + queryMetrics.emit(emitter); + } + catch (Exception e) { + // Query should not fail, because of emitter failure. Swallowing the exception. + log.error(e, "Failed to emit metrics for segment[%s]", segmentAndMetrics.getKey()); + } + } + } + } + } + ); + } + + /** + * Class to track segment related metrics during query execution. + */ + private static class SegmentMetrics + { + private final AtomicLong querySegmentTime = new AtomicLong(0); + private final AtomicLong queryWaitTime = new AtomicLong(0); + private final AtomicLong querySegmentAndCacheTime = new AtomicLong(0); + + private void addSegmentTime(long time) + { + querySegmentTime.addAndGet(time); + } + + private void setWaitTime(long time) + { + queryWaitTime.set(time); + } + + private void addSegmentAndCacheTime(long time) + { + querySegmentAndCacheTime.addAndGet(time); + } + + private long getSegmentTime() + { + return querySegmentTime.get(); + } + + private long getWaitTime() + { + return queryWaitTime.get(); + } + + private long getSegmentAndCacheTime() + { + return querySegmentAndCacheTime.get(); + } + } + } + private static class SinkHolder implements Overshadowable { private final Sink sink; diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java index eaa24af3d6c5..b8190e0c1f9d 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java @@ -34,6 +34,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.query.Druids; import org.apache.druid.query.Order; @@ -70,8 +71,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -1861,9 +1864,11 @@ public void testQueryByIntervals_withSegmentVersionUpgrades() throws Exception public void testQueryByIntervals() throws Exception { try ( + final StubServiceEmitter serviceEmitter = new StubServiceEmitter(); final StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(2) .basePersistDirectory(temporaryFolder.newFolder()) + .withServiceEmitter(serviceEmitter) .build()) { final Appenderator appenderator = tester.getAppenderator(); @@ -1902,36 +1907,18 @@ public void testQueryByIntervals() throws Exception results1 ); - // Query2: 2000/2002 - final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder() - .dataSource(StreamAppenderatorTester.DATASOURCE) - .intervals(ImmutableList.of(Intervals.of("2000/2002"))) - .aggregators( - Arrays.asList( - new LongSumAggregatorFactory("count", "count"), - new LongSumAggregatorFactory("met", "met") - ) - ) - .granularity(Granularities.DAY) - .build(); - - final List> results2 = - QueryPlus.wrap(query2).run(appenderator, ResponseContext.createEmpty()).toList(); - Assert.assertEquals( - "query2", - ImmutableList.of( - new Result<>( - DateTimes.of("2000"), - new TimeseriesResultValue(ImmutableMap.of("count", 3L, "met", 7L)) - ), - new Result<>( - DateTimes.of("2001"), - new TimeseriesResultValue(ImmutableMap.of("count", 4L, "met", 120L)) + verifySinkMetrics( + serviceEmitter, + new HashSet<>( + Arrays.asList( + IDENTIFIERS.get(0).asSegmentId().toString(), + IDENTIFIERS.get(1).asSegmentId().toString() ) - ), - results2 + ) ); + serviceEmitter.flush(); + // Query3: 2000/2001T01 final TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder() .dataSource(StreamAppenderatorTester.DATASOURCE) @@ -1961,6 +1948,19 @@ public void testQueryByIntervals() throws Exception results3 ); + verifySinkMetrics( + serviceEmitter, + new HashSet<>( + Arrays.asList( + IDENTIFIERS.get(0).asSegmentId().toString(), + IDENTIFIERS.get(1).asSegmentId().toString(), + IDENTIFIERS.get(2).asSegmentId().toString() + ) + ) + ); + + serviceEmitter.flush(); + // Query4: 2000/2001T01, 2001T03/2001T04 final TimeseriesQuery query4 = Druids.newTimeseriesQueryBuilder() .dataSource(StreamAppenderatorTester.DATASOURCE) @@ -1994,6 +1994,16 @@ public void testQueryByIntervals() throws Exception ), results4 ); + verifySinkMetrics( + serviceEmitter, + new HashSet<>( + Arrays.asList( + IDENTIFIERS.get(0).asSegmentId().toString(), + IDENTIFIERS.get(1).asSegmentId().toString(), + IDENTIFIERS.get(2).asSegmentId().toString() + ) + ) + ); } } @@ -2001,9 +2011,11 @@ public void testQueryByIntervals() throws Exception public void testQueryBySegments() throws Exception { try ( + StubServiceEmitter serviceEmitter = new StubServiceEmitter(); final StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder().maxRowsInMemory(2) .basePersistDirectory(temporaryFolder.newFolder()) + .withServiceEmitter(serviceEmitter) .build()) { final Appenderator appenderator = tester.getAppenderator(); @@ -2052,6 +2064,17 @@ public void testQueryBySegments() throws Exception results1 ); + verifySinkMetrics( + serviceEmitter, + new HashSet<>( + Collections.singletonList( + IDENTIFIERS.get(2).asSegmentId().toString() + ) + ) + ); + + serviceEmitter.flush(); + // Query2: segment #2, partial final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder() .dataSource(StreamAppenderatorTester.DATASOURCE) @@ -2088,6 +2111,17 @@ public void testQueryBySegments() throws Exception results2 ); + verifySinkMetrics( + serviceEmitter, + new HashSet<>( + Collections.singletonList( + IDENTIFIERS.get(2).asSegmentId().toString() + ) + ) + ); + + serviceEmitter.flush(); + // Query3: segment #2, two disjoint intervals final TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder() .dataSource(StreamAppenderatorTester.DATASOURCE) @@ -2129,6 +2163,17 @@ public void testQueryBySegments() throws Exception results3 ); + verifySinkMetrics( + serviceEmitter, + new HashSet<>( + Collections.singletonList( + IDENTIFIERS.get(2).asSegmentId().toString() + ) + ) + ); + + serviceEmitter.flush(); + final ScanQuery query4 = Druids.newScanQueryBuilder() .dataSource(StreamAppenderatorTester.DATASOURCE) .intervals( @@ -2164,6 +2209,33 @@ public void testQueryBySegments() throws Exception new Object[]{DateTimes.of("2001T03").getMillis(), "foo", 1L, 64L}, ((List) ((List) results4.get(1).getEvents()).get(0)).toArray() ); + + verifySinkMetrics( + serviceEmitter, + new HashSet<>( + Collections.singletonList( + IDENTIFIERS.get(2).asSegmentId().toString() + ) + ) + ); + + serviceEmitter.flush(); + } + } + + private void verifySinkMetrics(StubServiceEmitter emitter, Set segmentIds) + { + Map> events = emitter.getMetricEvents(); + int segments = segmentIds.size(); + Assert.assertEquals(4, events.size()); + Assert.assertTrue(events.containsKey("query/cpu/time")); + Assert.assertEquals(segments, events.get("query/segment/time").size()); + Assert.assertEquals(segments, events.get("query/segmentAndCache/time").size()); + Assert.assertEquals(segments, events.get("query/wait/time").size()); + for (String id : segmentIds) { + Assert.assertTrue(events.get("query/segment/time").stream().anyMatch(value -> value.getUserDims().containsValue(id))); + Assert.assertTrue(events.get("query/segmentAndCache/time").stream().anyMatch(value -> value.getUserDims().containsValue(id))); + Assert.assertTrue(events.get("query/wait/time").stream().anyMatch(value -> value.getUserDims().containsValue(id))); } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java index c305ada342ae..88cd05376aef 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java @@ -109,7 +109,8 @@ public StreamAppenderatorTester( final RowIngestionMeters rowIngestionMeters, final boolean skipBytesInMemoryOverheadCheck, final DataSegmentAnnouncer announcer, - final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig + final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig, + final ServiceEmitter serviceEmitter ) { objectMapper = new DefaultObjectMapper(); @@ -145,18 +146,18 @@ public StreamAppenderatorTester( .withObjectMapper(objectMapper) .build(); tuningConfig = new TestAppenderatorConfig( - TuningConfig.DEFAULT_APPENDABLE_INDEX, - maxRowsInMemory, - maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes, - skipBytesInMemoryOverheadCheck, - IndexSpec.DEFAULT, - 0, - false, - 0L, - OffHeapMemorySegmentWriteOutMediumFactory.instance(), - IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE, - basePersistDirectory - ); + TuningConfig.DEFAULT_APPENDABLE_INDEX, + maxRowsInMemory, + maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes, + skipBytesInMemoryOverheadCheck, + IndexSpec.DEFAULT, + 0, + false, + 0L, + OffHeapMemorySegmentWriteOutMediumFactory.instance(), + IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE, + basePersistDirectory + ); metrics = new SegmentGenerationMetrics(); queryExecutor = Execs.singleThreaded("queryExecutor(%d)"); @@ -174,11 +175,12 @@ public StreamAppenderatorTester( OffHeapMemorySegmentWriteOutMediumFactory.instance() ); - emitter = new ServiceEmitter( + emitter = serviceEmitter == null ? new ServiceEmitter( "test", "test", new NoopEmitter() - ); + ) : serviceEmitter; + emitter.start(); EmittingLogger.registerEmitter(emitter); dataSegmentPusher = new DataSegmentPusher() @@ -350,6 +352,7 @@ public static class Builder private RowIngestionMeters rowIngestionMeters; private boolean skipBytesInMemoryOverheadCheck; private int delayInMilli = 0; + private ServiceEmitter serviceEmitter; public Builder maxRowsInMemory(final int maxRowsInMemory) { @@ -393,6 +396,12 @@ public Builder withSegmentDropDelayInMilli(int delayInMilli) return this; } + public Builder withServiceEmitter(ServiceEmitter serviceEmitter) + { + this.serviceEmitter = serviceEmitter; + return this; + } + public StreamAppenderatorTester build() { return new StreamAppenderatorTester( @@ -404,7 +413,8 @@ public StreamAppenderatorTester build() rowIngestionMeters == null ? new SimpleRowIngestionMeters() : rowIngestionMeters, skipBytesInMemoryOverheadCheck, new NoopDataSegmentAnnouncer(), - CentralizedDatasourceSchemaConfig.create() + CentralizedDatasourceSchemaConfig.create(), + serviceEmitter ); } @@ -422,7 +432,8 @@ public StreamAppenderatorTester build( rowIngestionMeters == null ? new SimpleRowIngestionMeters() : rowIngestionMeters, skipBytesInMemoryOverheadCheck, dataSegmentAnnouncer, - config + config, + serviceEmitter ); } } diff --git a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java index a12722088159..f10248bf1e92 100644 --- a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java @@ -91,14 +91,14 @@ public void testAuditMetricEventWithPayload() throws IOException final AuditEntry entry = createAuditEntry("testKey", "testType", DateTimes.nowUtc()); auditManager.doAudit(entry); - Map> metricEvents = serviceEmitter.getMetricEvents(); + Map> metricEvents = serviceEmitter.getMetricEvents(); Assert.assertEquals(1, metricEvents.size()); - List auditMetricEvents = metricEvents.get("config/audit"); + List auditMetricEvents = metricEvents.get("config/audit"); Assert.assertNotNull(auditMetricEvents); Assert.assertEquals(1, auditMetricEvents.size()); - ServiceMetricEvent metric = auditMetricEvents.get(0); + ServiceMetricEvent metric = auditMetricEvents.get(0).getMetricEvent(); final AuditEntry dbEntry = lookupAuditEntryForKey("testKey"); Assert.assertNotNull(dbEntry); @@ -120,14 +120,14 @@ public void testCreateAuditEntry() throws IOException Assert.assertEquals(entry, dbEntry); // Verify emitted metrics - Map> metricEvents = serviceEmitter.getMetricEvents(); + Map> metricEvents = serviceEmitter.getMetricEvents(); Assert.assertEquals(1, metricEvents.size()); - List auditMetricEvents = metricEvents.get("config/audit"); + List auditMetricEvents = metricEvents.get("config/audit"); Assert.assertNotNull(auditMetricEvents); Assert.assertEquals(1, auditMetricEvents.size()); - ServiceMetricEvent metric = auditMetricEvents.get(0); + ServiceMetricEvent metric = auditMetricEvents.get(0).getMetricEvent(); Assert.assertEquals(dbEntry.getKey(), metric.getUserDims().get("key")); Assert.assertEquals(dbEntry.getType(), metric.getUserDims().get("type")); Assert.assertNull(metric.getUserDims().get("payload"));