Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore Sink Metric Emission Behaviour: Emit them per-Sink instead of per-FireHydrant #17170

Merged
merged 15 commits into from
Dec 18, 2024

Conversation

findingrish
Copy link
Contributor

This change #15757 to merge FireHydrants flatly for realtime queries for optimising memory usage led to metrics like query/segment/time getting emitted per-FireHydrant instead of per-Sink.

This change restores the metric emission behaviour while keeping the optimisation intact.

It introduces a new SinkMetricsEmittingQueryRunner which accumulates the FireHydrants metrics per-Sink and emits them in the end.
The emitted metrics are query/segment/time, query/segmentAndCacheTime & query/wait/time.
query/wait/time is the time taken to start processing the first FireHydrant for the sink.

Testing

  • Added UT to verify that each of the above metrics are emitted once per Sink.
  • Locally verified the changes for realtime ingestion. Sample emitted metric,
{"type":"scan","version":"32.0.0-SNAPSHOT","duration":"PT86400S","feed":"metrics","metric":"query/segment/time","hasFilters":"false","service":"druid/middleManager","segment":"kttm_2019-08-25T04:00:00.000Z_2019-08-25T05:00:00.000Z_2024-09-26T15:28:40.091Z","host":"localhost:8100","context":{"defaultTimeout":300000,"finalize":false,"maxQueuedBytes":5242880,"maxScatterGatherBytes":9223372036854775807,"queryFailTime":1727364858115,"queryId":"8dab0179-ec39-4f5f-a9e2-9039da416a5a","queryResourceId":"14c72b21-2dbb-471b-91c0-d7fbe11414b5","scanOutermost":false,"sqlOuterLimit":1001,"sqlQueryId":"8dab0179-ec39-4f5f-a9e2-9039da416a5a","sqlStringifyArrays":false,"timeout":299997},"interval":["2019-08-25T00:00:00.000Z/2019-08-26T00:00:00.000Z"],"id":"8dab0179-ec39-4f5f-a9e2-9039da416a5a","value":0,"dataSource":"kttm","timestamp":"2024-09-26T15:29:18.215Z"}

@cryptoe cryptoe modified the milestone: 31.0.0 Sep 30, 2024
Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to checking out the comments, please check the effect on query performance after applying this patch in a scenario where there are many (like 100) FireHydrants per Sink.

The original patch that refactored this stuff (#15757) caused a noticeable performance regression in cases where there were many FireHydrants per Sink and where individual queries were quite fast (10s of milliseconds), but made at a high rate. This was due to the additional overhead from the additional metrics. So, hopefully, reducing the number of metrics improves performance as well.

@@ -197,7 +198,7 @@ public ServiceMetricEvent build(ImmutableMap<String, String> serviceDimensions)
return new ServiceMetricEvent(
createdTime,
serviceDimensions,
userDims,
new HashMap<>(userDims),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this need to be copied?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that StreamAppenderatorTest#testQueryByIntervals fails if we directly pass userDims.

My best rationale so far is that this change is only needed because the test uses StubServiceEmitter, hence the previously stored metrics don't actually get emitted anywhere, and the previously stored metrics end up getting mutated because they use the same userDims.

@findingrish Can you confirm if this is the reason, and this is done only for test purposes, and directly passing userDims should work for any real world scenarios?

Please let me know if I'm missing some other aspect of this change. Appreciate your inputs, thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you confirm if this is the reason, and this is done only for test purposes

Yes, this was only done for test purpose and directly passing userDims should work for real world scenarios.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findingrish Thanks for the confirmation!

It doesn't seem ideal to be copying the userDims just for test purposes. One possible option that would allow us to keep passing userDims directly while not making any compromises with the test quality is to change StubServiceEmitter.

The following can be changed to also store the userDims:

private final ConcurrentHashMap<String, List<ServiceMetricEvent>> metricEvents = new ConcurrentHashMap<>();

So it can look like ConcurrentHashMap<String, List<Map<ServiceMetricEvent, Map<String, Object>>>> where the innermost Map<String, Object> represents the userDims. This would work since ServiceMetricEvent#getUserDims returns a copy of userDims:

(Instead of ConcurrentHashMap<String, List<Map<ServiceMetricEvent, Map<String, Object>>>>, we can have a separate class to encapsulate List<Map<ServiceMetricEvent, Map<String, Object>>> for readability, but the basic idea remains the same)

@gianm @findingrish Thoughts on this approach?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's change the test rather than do a needless copy in production code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm Thanks for the input!
Have made the change by adding a ServiceMetricEventSnapshot helper class in StubServiceEmitter.

@@ -66,11 +66,11 @@ public void testDefaultQueryMetricsQuery()
.context(ImmutableMap.of("testKey", "testValue"))
.build();
queryMetrics.query(query);
queryMetrics.sqlQueryId("dummy");
queryMetrics.queryId("dummy");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did these need to move?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to the userDims -> new HashMap<>(userDims) change.

This test fails if these lines aren't moved before queryMetrics.reportQueryTime(0).emit(serviceEmitter).

Reason: Previously, doing queryMetrics.queryId("dummy") after emit() was updating userDims in serviceEmitter.getEvents(), as we had the same userDims being referenced. But with new HashMap<>(userDims) change, that isn't the case anymore.

Let's wait for Rishabh's inputs on the other comment.

}
catch (Exception e) {
// Query should not fail, because of emitter failure. Swallowing the exception.
log.error("Failure while trying to emit [%s] with stacktrace [%s]", emitter.toString(), e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e should be the first argument, so the logger handles it as an exception. It will print the stack trace, message, etc.

There is no point in calling emitter.toString() because the emitters don't stringify to anything useful. Better to include the segment ID here and say something like Failed to emit metrics for segment[%s].

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have made the change.

/**
* Emit query/segment/time, query/wait/time and query/segmentAndCache/Time metrics for a Sink.
* It accumulates query/segment/time and query/segmentAndCache/time metric for each FireHydrant at the level of Sink.
* query/wait/time metric is the time taken to process the first FireHydrant for the Sink.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please extend the javadoc here in two ways:

  • Link to MetricsEmittingQueryRunner and point out that this is derived from that class.
  • Explain that the class behaves differently based on whether segmentId is null or nonnull. When nonnull it's in "accumulate" mode and when null it's in "emit" mode.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have made the change.

public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext)
{
QueryPlus<T> queryWithMetrics = queryPlus.withQueryMetrics(queryToolChest);
final QueryMetrics<?> queryMetrics = queryWithMetrics.getQueryMetrics();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like queryMetrics isn't used unless segmentId is null. Don't create it if it won't be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have made the change.

).withWaitMeasuredFromNow();
metricsToReport,
segmentMetricsAccumulator,
new HashSet<>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to create this hash set for every hydrant, use an static immutable set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have made the change.

queryMetrics -> queryMetrics.segment(sinkSegmentId.toString())
metricsToReport,
segmentMetricsAccumulator,
Collections.singleton(DefaultQueryMetrics.QUERY_SEGMENT_TIME),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a static for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -193,6 +205,12 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final
final List<SinkSegmentReference> allSegmentReferences = new ArrayList<>();
final Map<SegmentDescriptor, SegmentId> segmentIdMap = new HashMap<>();
final LinkedHashMap<SegmentDescriptor, List<QueryRunner<T>>> allRunners = new LinkedHashMap<>();
final ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicLong>> segmentMetricsAccumulator = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I look at this, I wonder about performance. It's probably fine the way it is, but IMO it would be better to use a class here (with three AtomicLong rather than a Map<String, AtomicLong>). That would save various per-hydrant hash-table lookups. I think the code would also be clearer to read.

In that world, segmentMetricsAccumulator would be ConcurrentHashMap<String, TheNewClass>.

If you keep the inner holder as a Map, then at least make metricsToReport a static.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you keep the inner holder as a Map, then at least make metricsToReport a static.

Have made the change.

When I look at this, I wonder about performance. It's probably fine the way it is, but IMO it would be better to use a class here (with three AtomicLong rather than a Map<String, AtomicLong>)

Will do some performance analysis and report back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm Have reported benchmark stats for master code vs this PR's code at #17170 (comment).

it would be better to use a class here (with three AtomicLong rather than a Map<String, AtomicLong>). That would save various per-hydrant hash-table lookups.

I also tried doing this (haven't pushed the changes to the PR). Reporting benchmark stats for a few benchmark runs with this change:

Benchmark                                        (numFireHydrants)  Mode  Cnt  Score   Error  Units
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                 10  avgt    5  0.194 ± 0.012  ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                 50  avgt    5  0.749 ± 0.072  ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                100  avgt    5  1.358 ± 0.248  ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                200  avgt    5  2.781 ± 0.491  ms/op

Benchmark                                         (numFireHydrants)  Mode  Cnt  Score   Error  Units
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                 10  avgt    5  0.187 ± 0.006  ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                 50  avgt    5  0.704 ± 0.090  ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                100  avgt    5  1.356 ± 0.211  ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                200  avgt    5  2.725 ± 0.510  ms/op

Benchmark                                         (numFireHydrants)  Mode  Cnt  Score   Error  Units
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                 10  avgt    5  0.190 ± 0.020  ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                 50  avgt    5  0.710 ± 0.130  ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                100  avgt    5  1.428 ± 0.242  ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                200  avgt    5  2.791 ± 0.570  ms/op

Benchmark                                         (numFireHydrants)  Mode  Cnt  Score   Error  Units
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                 10  avgt    5  0.188 ± 0.007  ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                 50  avgt    5  0.777 ± 0.051  ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                100  avgt    5  1.523 ± 0.026  ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                200  avgt    5  3.114 ± 0.191  ms/op

This seems almost equal (if not slightly worse) than the current PR code changes, so I didn't push this change.

I'm dumping a diff of the change here though for your reference:

diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 40fb6078fe..e4ceada727 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -218,7 +218,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
     final List<SinkSegmentReference> allSegmentReferences = new ArrayList<>();
     final Map<SegmentDescriptor, SegmentId> segmentIdMap = new HashMap<>();
     final LinkedHashMap<SegmentDescriptor, List<QueryRunner<T>>> allRunners = new LinkedHashMap<>();
-    final ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicLong>> segmentMetricsAccumulator = new ConcurrentHashMap<>();
+    final ConcurrentHashMap<String, SinkMetricsEmittingQueryRunner.SegmentMetrics> segmentMetricsAccumulator = new ConcurrentHashMap<>();
 
     try {
       for (final SegmentDescriptor descriptor : specs) {
@@ -469,7 +469,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
     private final ServiceEmitter emitter;
     private final QueryToolChest<T, ? extends Query<T>> queryToolChest;
     private final QueryRunner<T> queryRunner;
-    private final ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicLong>> segmentMetricsAccumulator;
+    private final ConcurrentHashMap<String, SegmentMetrics> segmentMetricsAccumulator;
     private final Set<String> metricsToCompute;
     @Nullable
     private final String segmentId;
@@ -479,7 +479,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
         ServiceEmitter emitter,
         QueryToolChest<T, ? extends Query<T>> queryToolChest,
         QueryRunner<T> queryRunner,
-        ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicLong>> segmentMetricsAccumulator,
+        ConcurrentHashMap<String, SegmentMetrics> segmentMetricsAccumulator,
         Set<String> metricsToCompute,
         @Nullable String segmentId
     )
@@ -517,29 +517,31 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
             {
               if (segmentId != null) {
                 // accumulate metrics
-                for (String metric : metricsToCompute) {
-                  if (DefaultQueryMetrics.QUERY_WAIT_TIME.equals(metric)) {
-                    long waitTimeNs = startTimeNs - creationTimeNs;
-                    // segment wait time is the time taken to start processing the first FireHydrant for the Sink
-                    segmentMetricsAccumulator.computeIfAbsent(segmentId, metrics -> new ConcurrentHashMap<>())
-                                             .putIfAbsent(metric, new AtomicLong(waitTimeNs));
-                  } else {
-                    long timeTakenNs = System.nanoTime() - startTimeNs;
-                    segmentMetricsAccumulator.computeIfAbsent(segmentId, metrics -> new ConcurrentHashMap<>())
-                                             .computeIfAbsent(metric, value -> new AtomicLong(0))
-                                             .addAndGet(timeTakenNs);
-                  }
+                final SegmentMetrics metrics = segmentMetricsAccumulator.computeIfAbsent(segmentId, id -> new SegmentMetrics());
+                if (metricsToCompute.contains(DefaultQueryMetrics.QUERY_WAIT_TIME)) {
+                  metrics.setWaitTime(startTimeNs - creationTimeNs);
+                }
+                if (metricsToCompute.contains(DefaultQueryMetrics.QUERY_SEGMENT_TIME)) {
+                  metrics.addSegmentTime(System.nanoTime() - startTimeNs);
+                }
+                if (metricsToCompute.contains(DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME)) {
+                  metrics.addSegmentAndCacheTime(System.nanoTime() - startTimeNs);
                 }
               } else {
                 final QueryMetrics<?> queryMetrics = queryWithMetrics.getQueryMetrics();
                 // report accumulated metrics
-                for (Map.Entry<String, ConcurrentHashMap<String, AtomicLong>> segmentAndMetrics : segmentMetricsAccumulator.entrySet()) {
+                for (Map.Entry<String, SegmentMetrics> segmentAndMetrics : segmentMetricsAccumulator.entrySet()) {
                   queryMetrics.segment(segmentAndMetrics.getKey());
 
                   for (Map.Entry<String, ObjLongConsumer<? super QueryMetrics<?>>> reportMetric : METRICS_TO_REPORT.entrySet()) {
-                    String metricName = reportMetric.getKey();
-                    if (segmentAndMetrics.getValue().containsKey(metricName)) {
-                      reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().get(metricName).get());
+                    final String metricName = reportMetric.getKey();
+                    switch (metricName) {
+                      case DefaultQueryMetrics.QUERY_SEGMENT_TIME:
+                        reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().getSegmentTime());
+                      case DefaultQueryMetrics.QUERY_WAIT_TIME:
+                        reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().getWaitTime());
+                      case DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME:
+                        reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().getSegmentAndCacheTime());
                     }
                   }
 
@@ -556,6 +558,36 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
           }
       );
     }
+
+    private static class SegmentMetrics {
+      private final AtomicLong querySegmentTime = new AtomicLong(0);
+      private final AtomicLong queryWaitTime = new AtomicLong(0);
+      private final AtomicLong querySegmentAndCacheTime = new AtomicLong(0);
+
+      private void addSegmentTime(long time) {
+        querySegmentTime.addAndGet(time);
+      }
+
+      private void setWaitTime(long time) {
+        queryWaitTime.set(time);
+      }
+
+      private void addSegmentAndCacheTime(long time) {
+        querySegmentAndCacheTime.addAndGet(time);
+      }
+
+      private long getSegmentTime() {
+        return querySegmentTime.get();
+      }
+
+      private long getWaitTime() {
+        return queryWaitTime.get();
+      }
+
+      private long getSegmentAndCacheTime() {
+        return querySegmentAndCacheTime.get();
+      }
+    }
   }

Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new diff is easier to ready. I suggest we go for readability here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The performance looks to be within the margin of error. IMO, it's ok to leave it as a Map. Thanks for checking.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That being said, I do agree with @cryptoe. The approach with a dedicated class is easier to read and I would prefer it for that reason, even if performance is the same between the two.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm Have made the change, thanks for the review!

@Akshat-Jain
Copy link
Contributor

In addition to checking out the comments, please check the effect on query performance after applying this patch in a scenario where there are many (like 100) FireHydrants per Sink.

@gianm I've added a benchmark for this. Reporting the data for the same below.

For master branch code:

Benchmark                                        (numFireHydrants)  Mode  Cnt  Score   Error  Units
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                 10  avgt    5  0.336 ± 0.014  ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                 50  avgt    5  1.937 ± 0.773  ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                100  avgt    5  3.294 ± 1.997  ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                200  avgt    5  5.931 ± 0.153  ms/op

With this PR's code changes:

Benchmark                                        (numFireHydrants)  Mode  Cnt  Score   Error  Units
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                 10  avgt    5  0.329 ± 0.007  ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                 50  avgt    5  0.738 ± 0.164  ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                100  avgt    5  1.324 ± 0.124  ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics                200  avgt    5  2.650 ± 0.454  ms/op

We can see the improvement in performance with this PR's code change compared to master code. Hope this works!

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved either way, but do consider changing SinkQuerySegmentWalker to use the easier-to-read metrics collection class discussed in this thread https://github.com/apache/druid/pull/17170/files#r1889120810

@cryptoe cryptoe merged commit d5eb94d into apache:master Dec 18, 2024
72 of 77 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants