From 8edbfc00f6c8f7d8a756f4781b061360409a0221 Mon Sep 17 00:00:00 2001 From: kaisun2000 <52840222+kaisun2000@users.noreply.github.com> Date: Mon, 9 Oct 2023 00:26:23 -0700 Subject: [PATCH] Add metric -- count of queries waiting for merge buffers (#15025) Add 'mergeBuffer/pendingRequests' metric that exposes the count of waiting queries (threads) blocking in the merge buffers pools. --- docs/operations/metrics.md | 2 + .../druid/collections/BlockingPool.java | 9 ++- .../collections/DefaultBlockingPool.java | 18 ++++++ .../druid/collections/DummyBlockingPool.java | 6 ++ .../apache/druid/query/TestBufferPool.java | 6 ++ .../metrics/QueryCountStatsMonitor.java | 11 +++- .../metrics/QueryCountStatsMonitorTest.java | 57 ++++++++++++++++++- 7 files changed, 104 insertions(+), 5 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 77a79170bec8d..28e8a9fa96469 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -62,6 +62,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | +|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`query/segments/count`|This metric is not enabled by default. See the `QueryMetrics` Interface for reference regarding enabling this metric. Number of segments that will be touched by the query. In the broker, it makes a plan to distribute the query to realtime tasks and historicals based on a snapshot of segment distribution state. If there are some segments moved after this snapshot is created, certain historicals and realtime tasks can report those segments as missing to the broker. The broker will resend the query to the new servers that serve those segments after move. In this case, those segments can be counted more than once in this metric.||Varies| |`query/priority`|Assigned lane and priority, only if Laning strategy is enabled. Refer to [Laning strategies](../configuration/index.md#laning-strategies)|`lane`, `dataSource`, `type`|0| |`sqlQuery/time`|Milliseconds taken to complete a SQL query.|`id`, `nativeQueryIds`, `dataSource`, `remoteAddress`, `success`, `engine`|< 1s| @@ -97,6 +98,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| +|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| ### Real-time diff --git a/processing/src/main/java/org/apache/druid/collections/BlockingPool.java b/processing/src/main/java/org/apache/druid/collections/BlockingPool.java index c17329917cd2e..4fb3ff66d8bf1 100644 --- a/processing/src/main/java/org/apache/druid/collections/BlockingPool.java +++ b/processing/src/main/java/org/apache/druid/collections/BlockingPool.java @@ -31,7 +31,6 @@ public interface BlockingPool * * @param elementNum number of resources to take * @param timeoutMs maximum time to wait for resources, in milliseconds. - * * @return a list of resource holders. An empty list is returned if {@code elementNum} resources aren't available. */ List> takeBatch(int elementNum, long timeoutMs); @@ -40,8 +39,14 @@ public interface BlockingPool * Take resources from the pool, waiting if necessary until the elements of the given number become available. * * @param elementNum number of resources to take - * * @return a list of resource holders. An empty list is returned if {@code elementNum} resources aren't available. */ List> takeBatch(int elementNum); + + /** + * Returns the count of the requests waiting to acquire a batch of resources. + * + * @return count of pending requests + */ + long getPendingRequests(); } diff --git a/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java b/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java index 1021974b1b4ed..e41a9e5d75d4c 100644 --- a/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java +++ b/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -48,6 +49,8 @@ public class DefaultBlockingPool implements BlockingPool private final Condition notEnough; private final int maxSize; + private final AtomicLong pendingRequests; + public DefaultBlockingPool( Supplier generator, int limit @@ -62,6 +65,7 @@ public DefaultBlockingPool( this.lock = new ReentrantLock(); this.notEnough = lock.newCondition(); + this.pendingRequests = new AtomicLong(); } @Override @@ -91,12 +95,16 @@ public List> takeBatch(final int elementNum, Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs); checkInitialized(); try { + pendingRequests.incrementAndGet(); final List objects = timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum); return objects.stream().map(this::wrapObject).collect(Collectors.toList()); } catch (InterruptedException e) { throw new RuntimeException(e); } + finally { + pendingRequests.decrementAndGet(); + } } @Override @@ -104,11 +112,21 @@ public List> takeBatch(final int elementNum) { checkInitialized(); try { + pendingRequests.incrementAndGet(); return takeObjects(elementNum).stream().map(this::wrapObject).collect(Collectors.toList()); } catch (InterruptedException e) { throw new RuntimeException(e); } + finally { + pendingRequests.incrementAndGet(); + } + } + + @Override + public long getPendingRequests() + { + return pendingRequests.get(); } private List pollObjects(int elementNum) throws InterruptedException diff --git a/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java b/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java index dcd6cea07aa72..2553f9ab425f5 100644 --- a/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java +++ b/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java @@ -55,4 +55,10 @@ public List> takeBatch(int elementNum) { throw new UnsupportedOperationException(); } + + @Override + public long getPendingRequests() + { + return 0; + } } diff --git a/processing/src/test/java/org/apache/druid/query/TestBufferPool.java b/processing/src/test/java/org/apache/druid/query/TestBufferPool.java index 10690d31be132..a650437f83f03 100644 --- a/processing/src/test/java/org/apache/druid/query/TestBufferPool.java +++ b/processing/src/test/java/org/apache/druid/query/TestBufferPool.java @@ -132,6 +132,12 @@ public List> takeBatch(int elementNu } } + @Override + public long getPendingRequests() + { + return 0; + } + public long getOutstandingObjectCount() { return takenFromMap.size(); diff --git a/server/src/main/java/org/apache/druid/server/metrics/QueryCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/QueryCountStatsMonitor.java index da2017dbc00a0..ce951d5933f71 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/QueryCountStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/QueryCountStatsMonitor.java @@ -21,24 +21,30 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; +import org.apache.druid.collections.BlockingPool; +import org.apache.druid.guice.annotations.Merging; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.AbstractMonitor; import org.apache.druid.java.util.metrics.KeyedDiff; +import java.nio.ByteBuffer; import java.util.Map; public class QueryCountStatsMonitor extends AbstractMonitor { private final KeyedDiff keyedDiff = new KeyedDiff(); private final QueryCountStatsProvider statsProvider; + private final BlockingPool mergeBufferPool; @Inject public QueryCountStatsMonitor( - QueryCountStatsProvider statsProvider + QueryCountStatsProvider statsProvider, + @Merging BlockingPool mergeBufferPool ) { this.statsProvider = statsProvider; + this.mergeBufferPool = mergeBufferPool; } @Override @@ -65,6 +71,9 @@ public boolean doMonitor(ServiceEmitter emitter) emitter.emit(builder.setMetric(diffEntry.getKey(), diffEntry.getValue())); } } + + long pendingQueries = this.mergeBufferPool.getPendingRequests(); + emitter.emit(builder.setMetric("mergeBuffer/pendingRequests", pendingQueries)); return true; } diff --git a/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java index 95b9f27d1c269..717c95d62c5b1 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java @@ -19,17 +19,27 @@ package org.apache.druid.server.metrics; +import org.apache.druid.collections.BlockingPool; +import org.apache.druid.collections.DefaultBlockingPool; import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; public class QueryCountStatsMonitorTest { private QueryCountStatsProvider queryCountStatsProvider; + private BlockingPool mergeBufferPool; + private ExecutorService executorService; @Before public void setUp() @@ -69,14 +79,24 @@ public long getTimedOutQueryCount() return timedOutEmitCount; } }; + + mergeBufferPool = new DefaultBlockingPool(() -> ByteBuffer.allocate(1024), 1); + executorService = Executors.newSingleThreadExecutor(); + } + + @After + public void tearDown() + { + executorService.shutdown(); } @Test public void testMonitor() { - final QueryCountStatsMonitor monitor = new QueryCountStatsMonitor(queryCountStatsProvider); + final QueryCountStatsMonitor monitor = new QueryCountStatsMonitor(queryCountStatsProvider, mergeBufferPool); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); + emitter.flush(); // Trigger metric emission monitor.doMonitor(emitter); Map resultMap = emitter.getEvents() @@ -85,12 +105,45 @@ public void testMonitor() event -> (String) event.toMap().get("metric"), event -> (Long) event.toMap().get("value") )); - Assert.assertEquals(5, resultMap.size()); + Assert.assertEquals(6, resultMap.size()); Assert.assertEquals(1L, (long) resultMap.get("query/success/count")); Assert.assertEquals(2L, (long) resultMap.get("query/failed/count")); Assert.assertEquals(3L, (long) resultMap.get("query/interrupted/count")); Assert.assertEquals(4L, (long) resultMap.get("query/timeout/count")); Assert.assertEquals(10L, (long) resultMap.get("query/count")); + Assert.assertEquals(0, (long) resultMap.get("mergeBuffer/pendingRequests")); + + } + + @Test(timeout = 2_000L) + public void testMonitoringMergeBuffer() + { + executorService.submit(() -> { + mergeBufferPool.takeBatch(10); + }); + + int count = 0; + try { + // wait at most 10 secs for the executor thread to block + while (mergeBufferPool.getPendingRequests() == 0) { + Thread.sleep(100); + count++; + if (count >= 20) { + break; + } + } + + final QueryCountStatsMonitor monitor = new QueryCountStatsMonitor(queryCountStatsProvider, mergeBufferPool); + final StubServiceEmitter emitter = new StubServiceEmitter("DummyService", "DummyHost"); + boolean ret = monitor.doMonitor(emitter); + Assert.assertTrue(ret); + List numbers = emitter.getMetricValues("mergeBuffer/pendingRequests", Collections.emptyMap()); + Assert.assertEquals(1, numbers.size()); + Assert.assertEquals(1, numbers.get(0).intValue()); + } + catch (InterruptedException e) { + // do nothing + } } }