-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Restore Sink Metric Emission Behaviour: Emit them per-Sink instead of per-FireHydrant #17170
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition to checking out the comments, please check the effect on query performance after applying this patch in a scenario where there are many (like 100) FireHydrants per Sink.
The original patch that refactored this stuff (#15757) caused a noticeable performance regression in cases where there were many FireHydrants per Sink and where individual queries were quite fast (10s of milliseconds), but made at a high rate. This was due to the additional overhead from the additional metrics. So, hopefully, reducing the number of metrics improves performance as well.
@@ -197,7 +198,7 @@ public ServiceMetricEvent build(ImmutableMap<String, String> serviceDimensions) | |||
return new ServiceMetricEvent( | |||
createdTime, | |||
serviceDimensions, | |||
userDims, | |||
new HashMap<>(userDims), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this need to be copied?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that StreamAppenderatorTest#testQueryByIntervals
fails if we directly pass userDims
.
My best rationale so far is that this change is only needed because the test uses StubServiceEmitter
, hence the previously stored metrics don't actually get emitted anywhere, and the previously stored metrics end up getting mutated because they use the same userDims
.
@findingrish Can you confirm if this is the reason, and this is done only for test purposes, and directly passing userDims
should work for any real world scenarios?
Please let me know if I'm missing some other aspect of this change. Appreciate your inputs, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you confirm if this is the reason, and this is done only for test purposes
Yes, this was only done for test purpose and directly passing userDims
should work for real world scenarios.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@findingrish Thanks for the confirmation!
It doesn't seem ideal to be copying the userDims
just for test purposes. One possible option that would allow us to keep passing userDims
directly while not making any compromises with the test quality is to change StubServiceEmitter
.
The following can be changed to also store the userDims
:
druid/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
Line 41 in 7705694
private final ConcurrentHashMap<String, List<ServiceMetricEvent>> metricEvents = new ConcurrentHashMap<>(); |
So it can look like ConcurrentHashMap<String, List<Map<ServiceMetricEvent, Map<String, Object>>>>
where the innermost Map<String, Object>
represents the userDims
. This would work since ServiceMetricEvent#getUserDims
returns a copy of userDims
:
druid/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java
Line 95 in 7705694
return ImmutableMap.copyOf(userDims); |
(Instead of ConcurrentHashMap<String, List<Map<ServiceMetricEvent, Map<String, Object>>>>
, we can have a separate class to encapsulate List<Map<ServiceMetricEvent, Map<String, Object>>>
for readability, but the basic idea remains the same)
@gianm @findingrish Thoughts on this approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let's change the test rather than do a needless copy in production code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gianm Thanks for the input!
Have made the change by adding a ServiceMetricEventSnapshot
helper class in StubServiceEmitter
.
@@ -66,11 +66,11 @@ public void testDefaultQueryMetricsQuery() | |||
.context(ImmutableMap.of("testKey", "testValue")) | |||
.build(); | |||
queryMetrics.query(query); | |||
queryMetrics.sqlQueryId("dummy"); | |||
queryMetrics.queryId("dummy"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did these need to move?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is related to the userDims
-> new HashMap<>(userDims)
change.
This test fails if these lines aren't moved before queryMetrics.reportQueryTime(0).emit(serviceEmitter)
.
Reason: Previously, doing queryMetrics.queryId("dummy")
after emit() was updating userDims
in serviceEmitter.getEvents()
, as we had the same userDims
being referenced. But with new HashMap<>(userDims)
change, that isn't the case anymore.
Let's wait for Rishabh's inputs on the other comment.
} | ||
catch (Exception e) { | ||
// Query should not fail, because of emitter failure. Swallowing the exception. | ||
log.error("Failure while trying to emit [%s] with stacktrace [%s]", emitter.toString(), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e
should be the first argument, so the logger handles it as an exception. It will print the stack trace, message, etc.
There is no point in calling emitter.toString()
because the emitters don't stringify to anything useful. Better to include the segment ID here and say something like Failed to emit metrics for segment[%s]
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have made the change.
/** | ||
* Emit query/segment/time, query/wait/time and query/segmentAndCache/Time metrics for a Sink. | ||
* It accumulates query/segment/time and query/segmentAndCache/time metric for each FireHydrant at the level of Sink. | ||
* query/wait/time metric is the time taken to process the first FireHydrant for the Sink. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please extend the javadoc here in two ways:
- Link to
MetricsEmittingQueryRunner
and point out that this is derived from that class. - Explain that the class behaves differently based on whether
segmentId
is null or nonnull. When nonnull it's in "accumulate" mode and when null it's in "emit" mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have made the change.
public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext) | ||
{ | ||
QueryPlus<T> queryWithMetrics = queryPlus.withQueryMetrics(queryToolChest); | ||
final QueryMetrics<?> queryMetrics = queryWithMetrics.getQueryMetrics(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like queryMetrics
isn't used unless segmentId
is null. Don't create it if it won't be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have made the change.
).withWaitMeasuredFromNow(); | ||
metricsToReport, | ||
segmentMetricsAccumulator, | ||
new HashSet<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to create this hash set for every hydrant, use an static immutable set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have made the change.
queryMetrics -> queryMetrics.segment(sinkSegmentId.toString()) | ||
metricsToReport, | ||
segmentMetricsAccumulator, | ||
Collections.singleton(DefaultQueryMetrics.QUERY_SEGMENT_TIME), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a static for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -193,6 +205,12 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final | |||
final List<SinkSegmentReference> allSegmentReferences = new ArrayList<>(); | |||
final Map<SegmentDescriptor, SegmentId> segmentIdMap = new HashMap<>(); | |||
final LinkedHashMap<SegmentDescriptor, List<QueryRunner<T>>> allRunners = new LinkedHashMap<>(); | |||
final ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicLong>> segmentMetricsAccumulator = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I look at this, I wonder about performance. It's probably fine the way it is, but IMO it would be better to use a class here (with three AtomicLong
rather than a Map<String, AtomicLong>
). That would save various per-hydrant hash-table lookups. I think the code would also be clearer to read.
In that world, segmentMetricsAccumulator
would be ConcurrentHashMap<String, TheNewClass>
.
If you keep the inner holder as a Map
, then at least make metricsToReport
a static.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you keep the inner holder as a Map, then at least make metricsToReport a static.
Have made the change.
When I look at this, I wonder about performance. It's probably fine the way it is, but IMO it would be better to use a class here (with three AtomicLong rather than a Map<String, AtomicLong>)
Will do some performance analysis and report back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gianm Have reported benchmark stats for master code vs this PR's code at #17170 (comment).
it would be better to use a class here (with three AtomicLong rather than a Map<String, AtomicLong>). That would save various per-hydrant hash-table lookups.
I also tried doing this (haven't pushed the changes to the PR). Reporting benchmark stats for a few benchmark runs with this change:
Benchmark (numFireHydrants) Mode Cnt Score Error Units
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 10 avgt 5 0.194 ± 0.012 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 50 avgt 5 0.749 ± 0.072 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 100 avgt 5 1.358 ± 0.248 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 200 avgt 5 2.781 ± 0.491 ms/op
Benchmark (numFireHydrants) Mode Cnt Score Error Units
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 10 avgt 5 0.187 ± 0.006 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 50 avgt 5 0.704 ± 0.090 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 100 avgt 5 1.356 ± 0.211 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 200 avgt 5 2.725 ± 0.510 ms/op
Benchmark (numFireHydrants) Mode Cnt Score Error Units
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 10 avgt 5 0.190 ± 0.020 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 50 avgt 5 0.710 ± 0.130 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 100 avgt 5 1.428 ± 0.242 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 200 avgt 5 2.791 ± 0.570 ms/op
Benchmark (numFireHydrants) Mode Cnt Score Error Units
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 10 avgt 5 0.188 ± 0.007 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 50 avgt 5 0.777 ± 0.051 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 100 avgt 5 1.523 ± 0.026 ms/op
SinkQuerySegmentWalkerBenchmark.emitSinkMetrics 200 avgt 5 3.114 ± 0.191 ms/op
This seems almost equal (if not slightly worse) than the current PR code changes, so I didn't push this change.
I'm dumping a diff of the change here though for your reference:
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 40fb6078fe..e4ceada727 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -218,7 +218,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
final List<SinkSegmentReference> allSegmentReferences = new ArrayList<>();
final Map<SegmentDescriptor, SegmentId> segmentIdMap = new HashMap<>();
final LinkedHashMap<SegmentDescriptor, List<QueryRunner<T>>> allRunners = new LinkedHashMap<>();
- final ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicLong>> segmentMetricsAccumulator = new ConcurrentHashMap<>();
+ final ConcurrentHashMap<String, SinkMetricsEmittingQueryRunner.SegmentMetrics> segmentMetricsAccumulator = new ConcurrentHashMap<>();
try {
for (final SegmentDescriptor descriptor : specs) {
@@ -469,7 +469,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
private final ServiceEmitter emitter;
private final QueryToolChest<T, ? extends Query<T>> queryToolChest;
private final QueryRunner<T> queryRunner;
- private final ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicLong>> segmentMetricsAccumulator;
+ private final ConcurrentHashMap<String, SegmentMetrics> segmentMetricsAccumulator;
private final Set<String> metricsToCompute;
@Nullable
private final String segmentId;
@@ -479,7 +479,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
ServiceEmitter emitter,
QueryToolChest<T, ? extends Query<T>> queryToolChest,
QueryRunner<T> queryRunner,
- ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicLong>> segmentMetricsAccumulator,
+ ConcurrentHashMap<String, SegmentMetrics> segmentMetricsAccumulator,
Set<String> metricsToCompute,
@Nullable String segmentId
)
@@ -517,29 +517,31 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
{
if (segmentId != null) {
// accumulate metrics
- for (String metric : metricsToCompute) {
- if (DefaultQueryMetrics.QUERY_WAIT_TIME.equals(metric)) {
- long waitTimeNs = startTimeNs - creationTimeNs;
- // segment wait time is the time taken to start processing the first FireHydrant for the Sink
- segmentMetricsAccumulator.computeIfAbsent(segmentId, metrics -> new ConcurrentHashMap<>())
- .putIfAbsent(metric, new AtomicLong(waitTimeNs));
- } else {
- long timeTakenNs = System.nanoTime() - startTimeNs;
- segmentMetricsAccumulator.computeIfAbsent(segmentId, metrics -> new ConcurrentHashMap<>())
- .computeIfAbsent(metric, value -> new AtomicLong(0))
- .addAndGet(timeTakenNs);
- }
+ final SegmentMetrics metrics = segmentMetricsAccumulator.computeIfAbsent(segmentId, id -> new SegmentMetrics());
+ if (metricsToCompute.contains(DefaultQueryMetrics.QUERY_WAIT_TIME)) {
+ metrics.setWaitTime(startTimeNs - creationTimeNs);
+ }
+ if (metricsToCompute.contains(DefaultQueryMetrics.QUERY_SEGMENT_TIME)) {
+ metrics.addSegmentTime(System.nanoTime() - startTimeNs);
+ }
+ if (metricsToCompute.contains(DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME)) {
+ metrics.addSegmentAndCacheTime(System.nanoTime() - startTimeNs);
}
} else {
final QueryMetrics<?> queryMetrics = queryWithMetrics.getQueryMetrics();
// report accumulated metrics
- for (Map.Entry<String, ConcurrentHashMap<String, AtomicLong>> segmentAndMetrics : segmentMetricsAccumulator.entrySet()) {
+ for (Map.Entry<String, SegmentMetrics> segmentAndMetrics : segmentMetricsAccumulator.entrySet()) {
queryMetrics.segment(segmentAndMetrics.getKey());
for (Map.Entry<String, ObjLongConsumer<? super QueryMetrics<?>>> reportMetric : METRICS_TO_REPORT.entrySet()) {
- String metricName = reportMetric.getKey();
- if (segmentAndMetrics.getValue().containsKey(metricName)) {
- reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().get(metricName).get());
+ final String metricName = reportMetric.getKey();
+ switch (metricName) {
+ case DefaultQueryMetrics.QUERY_SEGMENT_TIME:
+ reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().getSegmentTime());
+ case DefaultQueryMetrics.QUERY_WAIT_TIME:
+ reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().getWaitTime());
+ case DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME:
+ reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().getSegmentAndCacheTime());
}
}
@@ -556,6 +558,36 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
}
);
}
+
+ private static class SegmentMetrics {
+ private final AtomicLong querySegmentTime = new AtomicLong(0);
+ private final AtomicLong queryWaitTime = new AtomicLong(0);
+ private final AtomicLong querySegmentAndCacheTime = new AtomicLong(0);
+
+ private void addSegmentTime(long time) {
+ querySegmentTime.addAndGet(time);
+ }
+
+ private void setWaitTime(long time) {
+ queryWaitTime.set(time);
+ }
+
+ private void addSegmentAndCacheTime(long time) {
+ querySegmentAndCacheTime.addAndGet(time);
+ }
+
+ private long getSegmentTime() {
+ return querySegmentTime.get();
+ }
+
+ private long getWaitTime() {
+ return queryWaitTime.get();
+ }
+
+ private long getSegmentAndCacheTime() {
+ return querySegmentAndCacheTime.get();
+ }
+ }
}
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new diff is easier to ready. I suggest we go for readability here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The performance looks to be within the margin of error. IMO, it's ok to leave it as a Map. Thanks for checking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That being said, I do agree with @cryptoe. The approach with a dedicated class is easier to read and I would prefer it for that reason, even if performance is the same between the two.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gianm Have made the change, thanks for the review!
@gianm I've added a benchmark for this. Reporting the data for the same below. For master branch code:
With this PR's code changes:
We can see the improvement in performance with this PR's code change compared to master code. Hope this works! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved either way, but do consider changing SinkQuerySegmentWalker to use the easier-to-read metrics collection class discussed in this thread https://github.com/apache/druid/pull/17170/files#r1889120810
This change #15757 to merge FireHydrants flatly for realtime queries for optimising memory usage led to metrics like
query/segment/time
getting emitted per-FireHydrant instead of per-Sink.This change restores the metric emission behaviour while keeping the optimisation intact.
It introduces a new
SinkMetricsEmittingQueryRunner
which accumulates the FireHydrants metrics per-Sink and emits them in the end.The emitted metrics are
query/segment/time
,query/segmentAndCacheTime
&query/wait/time
.query/wait/time
is the time taken to start processing the first FireHydrant for the sink.Testing