From 64c95d4711cb753ea3e164383a2f5f282aad5893 Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Thu, 21 Sep 2023 21:43:23 -0700 Subject: [PATCH 1/5] Add metric for the number of pending queries waiting for shared merge buffers. --- .../druid/collections/BlockingPool.java | 9 ++- .../collections/DefaultBlockingPool.java | 18 +++++ .../druid/collections/DummyBlockingPool.java | 6 ++ .../util/metrics/MergeBufferPoolMonitor.java | 54 ++++++++++++++ .../metrics/MergeBufferPoolMonitorTest.java | 70 +++++++++++++++++++ .../apache/druid/query/TestBufferPool.java | 6 ++ .../druid/guice/BrokerProcessingModule.java | 2 + .../druid/guice/DruidProcessingModule.java | 2 + .../druid/guice/RouterProcessingModule.java | 2 + 9 files changed, 167 insertions(+), 2 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitor.java create mode 100644 processing/src/test/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitorTest.java diff --git a/processing/src/main/java/org/apache/druid/collections/BlockingPool.java b/processing/src/main/java/org/apache/druid/collections/BlockingPool.java index c17329917cd2..b2a2ecf4cb81 100644 --- a/processing/src/main/java/org/apache/druid/collections/BlockingPool.java +++ b/processing/src/main/java/org/apache/druid/collections/BlockingPool.java @@ -31,7 +31,6 @@ public interface BlockingPool * * @param elementNum number of resources to take * @param timeoutMs maximum time to wait for resources, in milliseconds. - * * @return a list of resource holders. An empty list is returned if {@code elementNum} resources aren't available. */ List> takeBatch(int elementNum, long timeoutMs); @@ -40,8 +39,14 @@ public interface BlockingPool * Take resources from the pool, waiting if necessary until the elements of the given number become available. * * @param elementNum number of resources to take - * * @return a list of resource holders. An empty list is returned if {@code elementNum} resources aren't available. */ List> takeBatch(int elementNum); + + /** + * Return the count of pending queries blocking in this queue for merge buffers + * + * @return count of pending queries + */ + long getPendingQueries(); } diff --git a/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java b/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java index 1021974b1b4e..940be7f824cd 100644 --- a/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java +++ b/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -48,6 +49,9 @@ public class DefaultBlockingPool implements BlockingPool private final Condition notEnough; private final int maxSize; + private final AtomicLong pendingQueries; + + public DefaultBlockingPool( Supplier generator, int limit @@ -62,6 +66,8 @@ public DefaultBlockingPool( this.lock = new ReentrantLock(); this.notEnough = lock.newCondition(); + + this.pendingQueries = new AtomicLong(); } @Override @@ -91,12 +97,16 @@ public List> takeBatch(final int elementNum, Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs); checkInitialized(); try { + pendingQueries.incrementAndGet(); final List objects = timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum); return objects.stream().map(this::wrapObject).collect(Collectors.toList()); } catch (InterruptedException e) { throw new RuntimeException(e); } + finally { + pendingQueries.decrementAndGet(); + } } @Override @@ -104,13 +114,21 @@ public List> takeBatch(final int elementNum) { checkInitialized(); try { + pendingQueries.incrementAndGet(); return takeObjects(elementNum).stream().map(this::wrapObject).collect(Collectors.toList()); } catch (InterruptedException e) { + pendingQueries.incrementAndGet(); throw new RuntimeException(e); } } + @Override + public long getPendingQueries() + { + return pendingQueries.get(); + } + private List pollObjects(int elementNum) throws InterruptedException { final List list = new ArrayList<>(elementNum); diff --git a/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java b/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java index dcd6cea07aa7..2f895e7e0d20 100644 --- a/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java +++ b/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java @@ -55,4 +55,10 @@ public List> takeBatch(int elementNum) { throw new UnsupportedOperationException(); } + + @Override + public long getPendingQueries() + { + return 0; + } } diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitor.java new file mode 100644 index 000000000000..c9a66f4b71f6 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics; + +import com.google.inject.Inject; +import org.apache.druid.collections.BlockingPool; +import org.apache.druid.guice.annotations.Merging; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; + +import java.nio.ByteBuffer; + +public class MergeBufferPoolMonitor extends AbstractMonitor +{ + + private final BlockingPool mergeBufferPool; + + @Inject + public MergeBufferPoolMonitor( + @Merging BlockingPool mergeBufferPoolIn + ) + { + this.mergeBufferPool = mergeBufferPoolIn; + } + + @Override + public boolean doMonitor(ServiceEmitter emitter) + { + long pendingQueries = this.mergeBufferPool.getPendingQueries(); + + ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder().setFeed("metrics"); + emitter.emit(builder.setMetric("mergebuffer/pendingQueries", pendingQueries)); + + return true; + } + +} diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitorTest.java new file mode 100644 index 000000000000..98de1a0ac389 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitorTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics; + +import org.apache.druid.collections.BlockingPool; +import org.apache.druid.collections.DefaultBlockingPool; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class MergeBufferPoolMonitorTest +{ + private ExecutorService executorService; + + @Before + public void setUp() throws IOException + { + executorService = Executors.newSingleThreadExecutor(); + } + + @After + public void tearDown() throws IOException + { + executorService.shutdown(); + } + + @Test + public void testBlockingQueriesCount() + { + BlockingPool pool = new DefaultBlockingPool(() -> ByteBuffer.allocate(1024), 1); + MergeBufferPoolMonitor monitor = new MergeBufferPoolMonitor(pool); + + executorService.submit(() -> { + pool.takeBatch(10); + }); + + StubServiceEmitter emitter = new StubServiceEmitter("DummyService", "DummyHost"); + boolean ret = monitor.doMonitor(emitter); + Assert.assertTrue(ret); + + List numbers = emitter.getMetricValues("mergebuffer/pendingQueries", Collections.emptyMap()); + Assert.assertEquals(numbers.size(), 1); + Assert.assertEquals(numbers.get(0).intValue(), 1); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/TestBufferPool.java b/processing/src/test/java/org/apache/druid/query/TestBufferPool.java index 10690d31be13..0f7eba436be2 100644 --- a/processing/src/test/java/org/apache/druid/query/TestBufferPool.java +++ b/processing/src/test/java/org/apache/druid/query/TestBufferPool.java @@ -132,6 +132,12 @@ public List> takeBatch(int elementNu } } + @Override + public long getPendingQueries() + { + return 0; + } + public long getOutstandingObjectCount() { return takenFromMap.size(); diff --git a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java index 0fcb4785191e..e11c79c4fc27 100644 --- a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java @@ -40,6 +40,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.metrics.MergeBufferPoolMonitor; import org.apache.druid.offheap.OffheapBufferGenerator; import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.DruidProcessingConfig; @@ -70,6 +71,7 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.processing.merge", BrokerParallelMergeConfig.class); JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class); MetricsModule.register(binder, ExecutorServiceMonitor.class); + MetricsModule.register(binder, MergeBufferPoolMonitor.class); } @Provides diff --git a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java index 76cc855cd224..64be6d8dea09 100644 --- a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java @@ -40,6 +40,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.metrics.MergeBufferPoolMonitor; import org.apache.druid.offheap.OffheapBufferGenerator; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ExecutorServiceMonitor; @@ -64,6 +65,7 @@ public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class); MetricsModule.register(binder, ExecutorServiceMonitor.class); + MetricsModule.register(binder, MergeBufferPoolMonitor.class); } @Provides diff --git a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java index dc68cbb4bffa..5685479cef8f 100644 --- a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java @@ -30,6 +30,7 @@ import org.apache.druid.guice.annotations.Merging; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.metrics.MergeBufferPoolMonitor; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ExecutorServiceMonitor; import org.apache.druid.query.ForwardingQueryProcessingPool; @@ -54,6 +55,7 @@ public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class); MetricsModule.register(binder, ExecutorServiceMonitor.class); + MetricsModule.register(binder, MergeBufferPoolMonitor.class); } @Provides From 06bb04289b993e39974e0989f8f8056f891c27f8 Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Fri, 22 Sep 2023 01:08:24 -0700 Subject: [PATCH 2/5] fix static check failure; remove the registering of MergeBufferPoolMonitor by default --- .../druid/java/util/metrics/MergeBufferPoolMonitorTest.java | 5 ++--- .../java/org/apache/druid/guice/BrokerProcessingModule.java | 2 -- .../java/org/apache/druid/guice/DruidProcessingModule.java | 2 -- .../java/org/apache/druid/guice/RouterProcessingModule.java | 2 -- 4 files changed, 2 insertions(+), 9 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitorTest.java index 98de1a0ac389..90cfc460e2f3 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitorTest.java @@ -26,7 +26,6 @@ import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; @@ -38,13 +37,13 @@ public class MergeBufferPoolMonitorTest private ExecutorService executorService; @Before - public void setUp() throws IOException + public void setUp() { executorService = Executors.newSingleThreadExecutor(); } @After - public void tearDown() throws IOException + public void tearDown() { executorService.shutdown(); } diff --git a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java index e11c79c4fc27..0fcb4785191e 100644 --- a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java @@ -40,7 +40,6 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.metrics.MergeBufferPoolMonitor; import org.apache.druid.offheap.OffheapBufferGenerator; import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.DruidProcessingConfig; @@ -71,7 +70,6 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.processing.merge", BrokerParallelMergeConfig.class); JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class); MetricsModule.register(binder, ExecutorServiceMonitor.class); - MetricsModule.register(binder, MergeBufferPoolMonitor.class); } @Provides diff --git a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java index 64be6d8dea09..76cc855cd224 100644 --- a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java @@ -40,7 +40,6 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.metrics.MergeBufferPoolMonitor; import org.apache.druid.offheap.OffheapBufferGenerator; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ExecutorServiceMonitor; @@ -65,7 +64,6 @@ public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class); MetricsModule.register(binder, ExecutorServiceMonitor.class); - MetricsModule.register(binder, MergeBufferPoolMonitor.class); } @Provides diff --git a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java index 5685479cef8f..dc68cbb4bffa 100644 --- a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java @@ -30,7 +30,6 @@ import org.apache.druid.guice.annotations.Merging; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.metrics.MergeBufferPoolMonitor; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ExecutorServiceMonitor; import org.apache.druid.query.ForwardingQueryProcessingPool; @@ -55,7 +54,6 @@ public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class); MetricsModule.register(binder, ExecutorServiceMonitor.class); - MetricsModule.register(binder, MergeBufferPoolMonitor.class); } @Provides From a15cba34497821915ab718d263294abb812a0266 Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Fri, 22 Sep 2023 12:08:03 -0700 Subject: [PATCH 3/5] enhance the unit test to make it not flaky --- .../metrics/MergeBufferPoolMonitorTest.java | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitorTest.java index 90cfc460e2f3..e81bb8c11467 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitorTest.java @@ -29,6 +29,7 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -54,16 +55,31 @@ public void testBlockingQueriesCount() BlockingPool pool = new DefaultBlockingPool(() -> ByteBuffer.allocate(1024), 1); MergeBufferPoolMonitor monitor = new MergeBufferPoolMonitor(pool); + CountDownLatch latch = new CountDownLatch(1); executorService.submit(() -> { + latch.countDown(); pool.takeBatch(10); }); - StubServiceEmitter emitter = new StubServiceEmitter("DummyService", "DummyHost"); - boolean ret = monitor.doMonitor(emitter); - Assert.assertTrue(ret); + try { + // the latch returns from await() guarantees the above lamda to take buffer from the pool starting to run in the + // executorService thread + latch.await(); + + // give 1 sec for pool.takeBatch to run and blocking at the pool + Thread.sleep(1000); + + StubServiceEmitter emitter = new StubServiceEmitter("DummyService", "DummyHost"); + boolean ret = monitor.doMonitor(emitter); + Assert.assertTrue(ret); + + List numbers = emitter.getMetricValues("mergebuffer/pendingQueries", Collections.emptyMap()); + Assert.assertEquals(numbers.size(), 1); + Assert.assertEquals(numbers.get(0).intValue(), 1); + } + catch (InterruptedException e) { + // do nothing + } - List numbers = emitter.getMetricValues("mergebuffer/pendingQueries", Collections.emptyMap()); - Assert.assertEquals(numbers.size(), 1); - Assert.assertEquals(numbers.get(0).intValue(), 1); } } From 1352bd40361ebf4bddf29f3aebb50f19d3b50a97 Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Mon, 2 Oct 2023 23:33:59 -0700 Subject: [PATCH 4/5] revised base on feedback. --- docs/operations/metrics.md | 2 + .../druid/collections/BlockingPool.java | 2 +- .../collections/DefaultBlockingPool.java | 6 +- .../util/metrics/MergeBufferPoolMonitor.java | 54 ------------ .../metrics/MergeBufferPoolMonitorTest.java | 85 ------------------- .../metrics/QueryCountStatsMonitor.java | 11 ++- .../metrics/QueryCountStatsMonitorTest.java | 57 ++++++++++++- 7 files changed, 71 insertions(+), 146 deletions(-) delete mode 100644 processing/src/main/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitor.java delete mode 100644 processing/src/test/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitorTest.java diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 77a79170bec8..6ae21cdc76dc 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -62,6 +62,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | +|`mergeBuffer/pendingQueries`|Number of queries waiting for the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`query/segments/count`|This metric is not enabled by default. See the `QueryMetrics` Interface for reference regarding enabling this metric. Number of segments that will be touched by the query. In the broker, it makes a plan to distribute the query to realtime tasks and historicals based on a snapshot of segment distribution state. If there are some segments moved after this snapshot is created, certain historicals and realtime tasks can report those segments as missing to the broker. The broker will resend the query to the new servers that serve those segments after move. In this case, those segments can be counted more than once in this metric.||Varies| |`query/priority`|Assigned lane and priority, only if Laning strategy is enabled. Refer to [Laning strategies](../configuration/index.md#laning-strategies)|`lane`, `dataSource`, `type`|0| |`sqlQuery/time`|Milliseconds taken to complete a SQL query.|`id`, `nativeQueryIds`, `dataSource`, `remoteAddress`, `success`, `engine`|< 1s| @@ -97,6 +98,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| +|`mergeBuffer/pendingQueries`|Number of queries waiting for the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| ### Real-time diff --git a/processing/src/main/java/org/apache/druid/collections/BlockingPool.java b/processing/src/main/java/org/apache/druid/collections/BlockingPool.java index b2a2ecf4cb81..20dc5626da97 100644 --- a/processing/src/main/java/org/apache/druid/collections/BlockingPool.java +++ b/processing/src/main/java/org/apache/druid/collections/BlockingPool.java @@ -44,7 +44,7 @@ public interface BlockingPool List> takeBatch(int elementNum); /** - * Return the count of pending queries blocking in this queue for merge buffers + * Return the count of pending queries waiting for merge buffers * * @return count of pending queries */ diff --git a/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java b/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java index 940be7f824cd..affed13698b8 100644 --- a/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java +++ b/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java @@ -51,7 +51,6 @@ public class DefaultBlockingPool implements BlockingPool private final AtomicLong pendingQueries; - public DefaultBlockingPool( Supplier generator, int limit @@ -66,7 +65,6 @@ public DefaultBlockingPool( this.lock = new ReentrantLock(); this.notEnough = lock.newCondition(); - this.pendingQueries = new AtomicLong(); } @@ -118,9 +116,11 @@ public List> takeBatch(final int elementNum) return takeObjects(elementNum).stream().map(this::wrapObject).collect(Collectors.toList()); } catch (InterruptedException e) { - pendingQueries.incrementAndGet(); throw new RuntimeException(e); } + finally { + pendingQueries.incrementAndGet(); + } } @Override diff --git a/processing/src/main/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitor.java b/processing/src/main/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitor.java deleted file mode 100644 index c9a66f4b71f6..000000000000 --- a/processing/src/main/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitor.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.java.util.metrics; - -import com.google.inject.Inject; -import org.apache.druid.collections.BlockingPool; -import org.apache.druid.guice.annotations.Merging; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; - -import java.nio.ByteBuffer; - -public class MergeBufferPoolMonitor extends AbstractMonitor -{ - - private final BlockingPool mergeBufferPool; - - @Inject - public MergeBufferPoolMonitor( - @Merging BlockingPool mergeBufferPoolIn - ) - { - this.mergeBufferPool = mergeBufferPoolIn; - } - - @Override - public boolean doMonitor(ServiceEmitter emitter) - { - long pendingQueries = this.mergeBufferPool.getPendingQueries(); - - ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder().setFeed("metrics"); - emitter.emit(builder.setMetric("mergebuffer/pendingQueries", pendingQueries)); - - return true; - } - -} diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitorTest.java deleted file mode 100644 index e81bb8c11467..000000000000 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/MergeBufferPoolMonitorTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.java.util.metrics; - -import org.apache.druid.collections.BlockingPool; -import org.apache.druid.collections.DefaultBlockingPool; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -public class MergeBufferPoolMonitorTest -{ - private ExecutorService executorService; - - @Before - public void setUp() - { - executorService = Executors.newSingleThreadExecutor(); - } - - @After - public void tearDown() - { - executorService.shutdown(); - } - - @Test - public void testBlockingQueriesCount() - { - BlockingPool pool = new DefaultBlockingPool(() -> ByteBuffer.allocate(1024), 1); - MergeBufferPoolMonitor monitor = new MergeBufferPoolMonitor(pool); - - CountDownLatch latch = new CountDownLatch(1); - executorService.submit(() -> { - latch.countDown(); - pool.takeBatch(10); - }); - - try { - // the latch returns from await() guarantees the above lamda to take buffer from the pool starting to run in the - // executorService thread - latch.await(); - - // give 1 sec for pool.takeBatch to run and blocking at the pool - Thread.sleep(1000); - - StubServiceEmitter emitter = new StubServiceEmitter("DummyService", "DummyHost"); - boolean ret = monitor.doMonitor(emitter); - Assert.assertTrue(ret); - - List numbers = emitter.getMetricValues("mergebuffer/pendingQueries", Collections.emptyMap()); - Assert.assertEquals(numbers.size(), 1); - Assert.assertEquals(numbers.get(0).intValue(), 1); - } - catch (InterruptedException e) { - // do nothing - } - - } -} diff --git a/server/src/main/java/org/apache/druid/server/metrics/QueryCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/QueryCountStatsMonitor.java index da2017dbc00a..6b120e5fdd60 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/QueryCountStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/QueryCountStatsMonitor.java @@ -21,24 +21,30 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; +import org.apache.druid.collections.BlockingPool; +import org.apache.druid.guice.annotations.Merging; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.AbstractMonitor; import org.apache.druid.java.util.metrics.KeyedDiff; +import java.nio.ByteBuffer; import java.util.Map; public class QueryCountStatsMonitor extends AbstractMonitor { private final KeyedDiff keyedDiff = new KeyedDiff(); private final QueryCountStatsProvider statsProvider; + private final BlockingPool mergeBufferPool; @Inject public QueryCountStatsMonitor( - QueryCountStatsProvider statsProvider + QueryCountStatsProvider statsProvider, + @Merging BlockingPool mergeBufferPoolIn ) { this.statsProvider = statsProvider; + mergeBufferPool = mergeBufferPoolIn; } @Override @@ -65,6 +71,9 @@ public boolean doMonitor(ServiceEmitter emitter) emitter.emit(builder.setMetric(diffEntry.getKey(), diffEntry.getValue())); } } + + long pendingQueries = this.mergeBufferPool.getPendingQueries(); + emitter.emit(builder.setMetric("mergeBuffer/pendingQueries", pendingQueries)); return true; } diff --git a/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java index 95b9f27d1c26..7cf844ab7cbc 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java @@ -19,17 +19,27 @@ package org.apache.druid.server.metrics; +import org.apache.druid.collections.BlockingPool; +import org.apache.druid.collections.DefaultBlockingPool; import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; public class QueryCountStatsMonitorTest { private QueryCountStatsProvider queryCountStatsProvider; + private BlockingPool mergeBufferPool; + private ExecutorService executorService; @Before public void setUp() @@ -69,14 +79,24 @@ public long getTimedOutQueryCount() return timedOutEmitCount; } }; + + mergeBufferPool = new DefaultBlockingPool(() -> ByteBuffer.allocate(1024), 1); + executorService = Executors.newSingleThreadExecutor(); + } + + @After + public void tearDown() + { + executorService.shutdown(); } @Test public void testMonitor() { - final QueryCountStatsMonitor monitor = new QueryCountStatsMonitor(queryCountStatsProvider); + final QueryCountStatsMonitor monitor = new QueryCountStatsMonitor(queryCountStatsProvider, mergeBufferPool); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); + emitter.flush(); // Trigger metric emission monitor.doMonitor(emitter); Map resultMap = emitter.getEvents() @@ -85,12 +105,45 @@ public void testMonitor() event -> (String) event.toMap().get("metric"), event -> (Long) event.toMap().get("value") )); - Assert.assertEquals(5, resultMap.size()); + Assert.assertEquals(6, resultMap.size()); Assert.assertEquals(1L, (long) resultMap.get("query/success/count")); Assert.assertEquals(2L, (long) resultMap.get("query/failed/count")); Assert.assertEquals(3L, (long) resultMap.get("query/interrupted/count")); Assert.assertEquals(4L, (long) resultMap.get("query/timeout/count")); Assert.assertEquals(10L, (long) resultMap.get("query/count")); + Assert.assertEquals(0, (long) resultMap.get("mergeBuffer/pendingQueries")); + + } + + @Test + public void testMonitoringMergeBuffer() + { + executorService.submit(() -> { + mergeBufferPool.takeBatch(10); + }); + + int count = 0; + try { + // wait at most 10 secs for the executor thread to block + while (mergeBufferPool.getPendingQueries() == 0) { + Thread.sleep(100); + count++; + if (count >= 100) { + break; + } + } + + final QueryCountStatsMonitor monitor = new QueryCountStatsMonitor(queryCountStatsProvider, mergeBufferPool); + final StubServiceEmitter emitter = new StubServiceEmitter("DummyService", "DummyHost"); + boolean ret = monitor.doMonitor(emitter); + Assert.assertTrue(ret); + List numbers = emitter.getMetricValues("mergeBuffer/pendingQueries", Collections.emptyMap()); + Assert.assertEquals(1, numbers.size()); + Assert.assertEquals(1, numbers.get(0).intValue()); + } + catch (InterruptedException e) { + // do nothing + } } } From 32f36970957bb001e07e03316c7b6391ad33690e Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Tue, 3 Oct 2023 16:14:38 -0700 Subject: [PATCH 5/5] address Laksh's comments --- docs/operations/metrics.md | 4 ++-- .../apache/druid/collections/BlockingPool.java | 6 +++--- .../druid/collections/DefaultBlockingPool.java | 16 ++++++++-------- .../druid/collections/DummyBlockingPool.java | 2 +- .../org/apache/druid/query/TestBufferPool.java | 2 +- .../server/metrics/QueryCountStatsMonitor.java | 8 ++++---- .../metrics/QueryCountStatsMonitorTest.java | 10 +++++----- 7 files changed, 24 insertions(+), 24 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 6ae21cdc76dc..28e8a9fa9646 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -62,7 +62,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | -|`mergeBuffer/pendingQueries`|Number of queries waiting for the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | +|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`query/segments/count`|This metric is not enabled by default. See the `QueryMetrics` Interface for reference regarding enabling this metric. Number of segments that will be touched by the query. In the broker, it makes a plan to distribute the query to realtime tasks and historicals based on a snapshot of segment distribution state. If there are some segments moved after this snapshot is created, certain historicals and realtime tasks can report those segments as missing to the broker. The broker will resend the query to the new servers that serve those segments after move. In this case, those segments can be counted more than once in this metric.||Varies| |`query/priority`|Assigned lane and priority, only if Laning strategy is enabled. Refer to [Laning strategies](../configuration/index.md#laning-strategies)|`lane`, `dataSource`, `type`|0| |`sqlQuery/time`|Milliseconds taken to complete a SQL query.|`id`, `nativeQueryIds`, `dataSource`, `remoteAddress`, `success`, `engine`|< 1s| @@ -98,7 +98,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| -|`mergeBuffer/pendingQueries`|Number of queries waiting for the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| +|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| ### Real-time diff --git a/processing/src/main/java/org/apache/druid/collections/BlockingPool.java b/processing/src/main/java/org/apache/druid/collections/BlockingPool.java index 20dc5626da97..4fb3ff66d8bf 100644 --- a/processing/src/main/java/org/apache/druid/collections/BlockingPool.java +++ b/processing/src/main/java/org/apache/druid/collections/BlockingPool.java @@ -44,9 +44,9 @@ public interface BlockingPool List> takeBatch(int elementNum); /** - * Return the count of pending queries waiting for merge buffers + * Returns the count of the requests waiting to acquire a batch of resources. * - * @return count of pending queries + * @return count of pending requests */ - long getPendingQueries(); + long getPendingRequests(); } diff --git a/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java b/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java index affed13698b8..e41a9e5d75d4 100644 --- a/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java +++ b/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java @@ -49,7 +49,7 @@ public class DefaultBlockingPool implements BlockingPool private final Condition notEnough; private final int maxSize; - private final AtomicLong pendingQueries; + private final AtomicLong pendingRequests; public DefaultBlockingPool( Supplier generator, @@ -65,7 +65,7 @@ public DefaultBlockingPool( this.lock = new ReentrantLock(); this.notEnough = lock.newCondition(); - this.pendingQueries = new AtomicLong(); + this.pendingRequests = new AtomicLong(); } @Override @@ -95,7 +95,7 @@ public List> takeBatch(final int elementNum, Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs); checkInitialized(); try { - pendingQueries.incrementAndGet(); + pendingRequests.incrementAndGet(); final List objects = timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum); return objects.stream().map(this::wrapObject).collect(Collectors.toList()); } @@ -103,7 +103,7 @@ public List> takeBatch(final int elementNum, throw new RuntimeException(e); } finally { - pendingQueries.decrementAndGet(); + pendingRequests.decrementAndGet(); } } @@ -112,21 +112,21 @@ public List> takeBatch(final int elementNum) { checkInitialized(); try { - pendingQueries.incrementAndGet(); + pendingRequests.incrementAndGet(); return takeObjects(elementNum).stream().map(this::wrapObject).collect(Collectors.toList()); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { - pendingQueries.incrementAndGet(); + pendingRequests.incrementAndGet(); } } @Override - public long getPendingQueries() + public long getPendingRequests() { - return pendingQueries.get(); + return pendingRequests.get(); } private List pollObjects(int elementNum) throws InterruptedException diff --git a/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java b/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java index 2f895e7e0d20..2553f9ab425f 100644 --- a/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java +++ b/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java @@ -57,7 +57,7 @@ public List> takeBatch(int elementNum) } @Override - public long getPendingQueries() + public long getPendingRequests() { return 0; } diff --git a/processing/src/test/java/org/apache/druid/query/TestBufferPool.java b/processing/src/test/java/org/apache/druid/query/TestBufferPool.java index 0f7eba436be2..a650437f83f0 100644 --- a/processing/src/test/java/org/apache/druid/query/TestBufferPool.java +++ b/processing/src/test/java/org/apache/druid/query/TestBufferPool.java @@ -133,7 +133,7 @@ public List> takeBatch(int elementNu } @Override - public long getPendingQueries() + public long getPendingRequests() { return 0; } diff --git a/server/src/main/java/org/apache/druid/server/metrics/QueryCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/QueryCountStatsMonitor.java index 6b120e5fdd60..ce951d5933f7 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/QueryCountStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/QueryCountStatsMonitor.java @@ -40,11 +40,11 @@ public class QueryCountStatsMonitor extends AbstractMonitor @Inject public QueryCountStatsMonitor( QueryCountStatsProvider statsProvider, - @Merging BlockingPool mergeBufferPoolIn + @Merging BlockingPool mergeBufferPool ) { this.statsProvider = statsProvider; - mergeBufferPool = mergeBufferPoolIn; + this.mergeBufferPool = mergeBufferPool; } @Override @@ -72,8 +72,8 @@ public boolean doMonitor(ServiceEmitter emitter) } } - long pendingQueries = this.mergeBufferPool.getPendingQueries(); - emitter.emit(builder.setMetric("mergeBuffer/pendingQueries", pendingQueries)); + long pendingQueries = this.mergeBufferPool.getPendingRequests(); + emitter.emit(builder.setMetric("mergeBuffer/pendingRequests", pendingQueries)); return true; } diff --git a/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java index 7cf844ab7cbc..717c95d62c5b 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java @@ -111,11 +111,11 @@ public void testMonitor() Assert.assertEquals(3L, (long) resultMap.get("query/interrupted/count")); Assert.assertEquals(4L, (long) resultMap.get("query/timeout/count")); Assert.assertEquals(10L, (long) resultMap.get("query/count")); - Assert.assertEquals(0, (long) resultMap.get("mergeBuffer/pendingQueries")); + Assert.assertEquals(0, (long) resultMap.get("mergeBuffer/pendingRequests")); } - @Test + @Test(timeout = 2_000L) public void testMonitoringMergeBuffer() { executorService.submit(() -> { @@ -125,10 +125,10 @@ public void testMonitoringMergeBuffer() int count = 0; try { // wait at most 10 secs for the executor thread to block - while (mergeBufferPool.getPendingQueries() == 0) { + while (mergeBufferPool.getPendingRequests() == 0) { Thread.sleep(100); count++; - if (count >= 100) { + if (count >= 20) { break; } } @@ -138,7 +138,7 @@ public void testMonitoringMergeBuffer() boolean ret = monitor.doMonitor(emitter); Assert.assertTrue(ret); - List numbers = emitter.getMetricValues("mergeBuffer/pendingQueries", Collections.emptyMap()); + List numbers = emitter.getMetricValues("mergeBuffer/pendingRequests", Collections.emptyMap()); Assert.assertEquals(1, numbers.size()); Assert.assertEquals(1, numbers.get(0).intValue()); }