Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metric -- count of queries waiting for merge buffers #15025

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`mergeBuffer/pendingQueries`|Number of queries waiting for the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`query/segments/count`|This metric is not enabled by default. See the `QueryMetrics` Interface for reference regarding enabling this metric. Number of segments that will be touched by the query. In the broker, it makes a plan to distribute the query to realtime tasks and historicals based on a snapshot of segment distribution state. If there are some segments moved after this snapshot is created, certain historicals and realtime tasks can report those segments as missing to the broker. The broker will resend the query to the new servers that serve those segments after move. In this case, those segments can be counted more than once in this metric.||Varies|
|`query/priority`|Assigned lane and priority, only if Laning strategy is enabled. Refer to [Laning strategies](../configuration/index.md#laning-strategies)|`lane`, `dataSource`, `type`|0|
|`sqlQuery/time`|Milliseconds taken to complete a SQL query.|`id`, `nativeQueryIds`, `dataSource`, `remoteAddress`, `success`, `engine`|< 1s|
Expand Down Expand Up @@ -97,6 +98,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`mergeBuffer/pendingQueries`|Number of queries waiting for the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.||

### Real-time

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public interface BlockingPool<T>
*
* @param elementNum number of resources to take
* @param timeoutMs maximum time to wait for resources, in milliseconds.
*
* @return a list of resource holders. An empty list is returned if {@code elementNum} resources aren't available.
*/
List<ReferenceCountingResourceHolder<T>> takeBatch(int elementNum, long timeoutMs);
Expand All @@ -40,8 +39,14 @@ public interface BlockingPool<T>
* Take resources from the pool, waiting if necessary until the elements of the given number become available.
*
* @param elementNum number of resources to take
*
* @return a list of resource holders. An empty list is returned if {@code elementNum} resources aren't available.
*/
List<ReferenceCountingResourceHolder<T>> takeBatch(int elementNum);

/**
* Return the count of pending queries waiting for merge buffers
*
* @return count of pending queries
*/
long getPendingQueries();
kaisun2000 marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
Expand All @@ -48,6 +49,8 @@ public class DefaultBlockingPool<T> implements BlockingPool<T>
private final Condition notEnough;
private final int maxSize;

private final AtomicLong pendingQueries;
kaisun2000 marked this conversation as resolved.
Show resolved Hide resolved

public DefaultBlockingPool(
Supplier<T> generator,
int limit
Expand All @@ -62,6 +65,7 @@ public DefaultBlockingPool(

this.lock = new ReentrantLock();
this.notEnough = lock.newCondition();
this.pendingQueries = new AtomicLong();
}

@Override
Expand Down Expand Up @@ -91,24 +95,38 @@ public List<ReferenceCountingResourceHolder<T>> takeBatch(final int elementNum,
Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs);
checkInitialized();
try {
pendingQueries.incrementAndGet();
final List<T> objects = timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum);
return objects.stream().map(this::wrapObject).collect(Collectors.toList());
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
finally {
pendingQueries.decrementAndGet();
}
}

@Override
public List<ReferenceCountingResourceHolder<T>> takeBatch(final int elementNum)
{
checkInitialized();
try {
pendingQueries.incrementAndGet();
return takeObjects(elementNum).stream().map(this::wrapObject).collect(Collectors.toList());
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
finally {
pendingQueries.incrementAndGet();
}
}

@Override
public long getPendingQueries()
{
return pendingQueries.get();
}

private List<T> pollObjects(int elementNum) throws InterruptedException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,10 @@ public List<ReferenceCountingResourceHolder<T>> takeBatch(int elementNum)
{
throw new UnsupportedOperationException();
}

@Override
public long getPendingQueries()
{
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ public List<ReferenceCountingResourceHolder<ByteBuffer>> takeBatch(int elementNu
}
}

@Override
public long getPendingQueries()
{
return 0;
}

public long getOutstandingObjectCount()
{
return takenFromMap.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,30 @@

import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor;
import org.apache.druid.java.util.metrics.KeyedDiff;

import java.nio.ByteBuffer;
import java.util.Map;

public class QueryCountStatsMonitor extends AbstractMonitor
{
private final KeyedDiff keyedDiff = new KeyedDiff();
private final QueryCountStatsProvider statsProvider;
private final BlockingPool<ByteBuffer> mergeBufferPool;

@Inject
public QueryCountStatsMonitor(
QueryCountStatsProvider statsProvider
QueryCountStatsProvider statsProvider,
@Merging BlockingPool<ByteBuffer> mergeBufferPoolIn
kaisun2000 marked this conversation as resolved.
Show resolved Hide resolved
)
{
this.statsProvider = statsProvider;
mergeBufferPool = mergeBufferPoolIn;
}

@Override
Expand All @@ -65,6 +71,9 @@ public boolean doMonitor(ServiceEmitter emitter)
emitter.emit(builder.setMetric(diffEntry.getKey(), diffEntry.getValue()));
}
}

long pendingQueries = this.mergeBufferPool.getPendingQueries();
emitter.emit(builder.setMetric("mergeBuffer/pendingQueries", pendingQueries));
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,27 @@

package org.apache.druid.server.metrics;

import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class QueryCountStatsMonitorTest
{
private QueryCountStatsProvider queryCountStatsProvider;
private BlockingPool<ByteBuffer> mergeBufferPool;
private ExecutorService executorService;

@Before
public void setUp()
Expand Down Expand Up @@ -69,14 +79,24 @@ public long getTimedOutQueryCount()
return timedOutEmitCount;
}
};

mergeBufferPool = new DefaultBlockingPool(() -> ByteBuffer.allocate(1024), 1);
executorService = Executors.newSingleThreadExecutor();
}

@After
public void tearDown()
{
executorService.shutdown();
}

@Test
public void testMonitor()
{
final QueryCountStatsMonitor monitor = new QueryCountStatsMonitor(queryCountStatsProvider);
final QueryCountStatsMonitor monitor = new QueryCountStatsMonitor(queryCountStatsProvider, mergeBufferPool);
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
monitor.doMonitor(emitter);
emitter.flush();
// Trigger metric emission
monitor.doMonitor(emitter);
Map<String, Long> resultMap = emitter.getEvents()
Expand All @@ -85,12 +105,45 @@ public void testMonitor()
event -> (String) event.toMap().get("metric"),
event -> (Long) event.toMap().get("value")
));
Assert.assertEquals(5, resultMap.size());
Assert.assertEquals(6, resultMap.size());
Assert.assertEquals(1L, (long) resultMap.get("query/success/count"));
Assert.assertEquals(2L, (long) resultMap.get("query/failed/count"));
Assert.assertEquals(3L, (long) resultMap.get("query/interrupted/count"));
Assert.assertEquals(4L, (long) resultMap.get("query/timeout/count"));
Assert.assertEquals(10L, (long) resultMap.get("query/count"));
Assert.assertEquals(0, (long) resultMap.get("mergeBuffer/pendingQueries"));

}

@Test
public void testMonitoringMergeBuffer()
{
executorService.submit(() -> {
mergeBufferPool.takeBatch(10);
});

int count = 0;
try {
// wait at most 10 secs for the executor thread to block
kaisun2000 marked this conversation as resolved.
Show resolved Hide resolved
while (mergeBufferPool.getPendingQueries() == 0) {
Thread.sleep(100);
count++;
if (count >= 100) {
break;
}
}

final QueryCountStatsMonitor monitor = new QueryCountStatsMonitor(queryCountStatsProvider, mergeBufferPool);
final StubServiceEmitter emitter = new StubServiceEmitter("DummyService", "DummyHost");
boolean ret = monitor.doMonitor(emitter);
Assert.assertTrue(ret);

List<Number> numbers = emitter.getMetricValues("mergeBuffer/pendingQueries", Collections.emptyMap());
Assert.assertEquals(1, numbers.size());
Assert.assertEquals(1, numbers.get(0).intValue());
}
catch (InterruptedException e) {
// do nothing
}
}
}
Loading