Skip to content

Commit

Permalink
[timeseries] Response Size Limit, Metrics and Series Limit (apache#14501
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ankitsultana authored Nov 22, 2024
1 parent 4973abf commit f0c6bba
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.ws.rs.core.HttpHeaders;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -39,6 +40,8 @@
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerTimer;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.common.utils.HumanReadableDuration;
Expand Down Expand Up @@ -90,19 +93,32 @@ public void shutDown() {
@Override
public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String rawQueryParamString,
RequestContext requestContext) {
requestContext.setBrokerId(_brokerId);
requestContext.setRequestId(_requestIdGenerator.get());
RangeTimeSeriesRequest timeSeriesRequest = null;
PinotBrokerTimeSeriesResponse timeSeriesResponse = null;
long queryStartTime = System.currentTimeMillis();
try {
timeSeriesRequest = buildRangeTimeSeriesRequest(lang, rawQueryParamString);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES, 1);
requestContext.setBrokerId(_brokerId);
requestContext.setRequestId(_requestIdGenerator.get());
RangeTimeSeriesRequest timeSeriesRequest = null;
try {
timeSeriesRequest = buildRangeTimeSeriesRequest(lang, rawQueryParamString);
} catch (URISyntaxException e) {
return PinotBrokerTimeSeriesResponse.newErrorResponse("BAD_REQUEST", "Error building RangeTimeSeriesRequest");
}
TimeSeriesLogicalPlanResult logicalPlanResult = _queryEnvironment.buildLogicalPlan(timeSeriesRequest);
TimeSeriesDispatchablePlan dispatchablePlan =
_queryEnvironment.buildPhysicalPlan(timeSeriesRequest, requestContext, logicalPlanResult);
timeSeriesResponse = _queryDispatcher.submitAndGet(requestContext, dispatchablePlan,
timeSeriesRequest.getTimeout().toMillis(), new HashMap<>());
return timeSeriesResponse;
} finally {
_brokerMetrics.addTimedValue(BrokerTimer.QUERY_TOTAL_TIME_MS, System.currentTimeMillis() - queryStartTime,
TimeUnit.MILLISECONDS);
if (timeSeriesResponse == null
|| timeSeriesResponse.getStatus().equals(PinotBrokerTimeSeriesResponse.ERROR_STATUS)) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES_FAILED, 1);
}
}
TimeSeriesLogicalPlanResult logicalPlanResult = _queryEnvironment.buildLogicalPlan(timeSeriesRequest);
TimeSeriesDispatchablePlan dispatchablePlan = _queryEnvironment.buildPhysicalPlan(timeSeriesRequest, requestContext,
logicalPlanResult);
return _queryDispatcher.submitAndGet(requestContext, dispatchablePlan, timeSeriesRequest.getTimeout().toMillis(),
new HashMap<>());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
* Number of single-stage queries executed that would not have successfully run on the multi-stage query engine as is.
*/
SINGLE_STAGE_QUERIES_INVALID_MULTI_STAGE("queries", true),
/**
* Number of time-series queries. This metric is not grouped on the table name.
*/
TIME_SERIES_GLOBAL_QUERIES("queries", true),
/**
* Number of time-series queries that failed. This metric is not grouped on the table name.
*/
TIME_SERIES_GLOBAL_QUERIES_FAILED("queries", true),
// These metrics track the exceptions caught during query execution in broker side.
// Query rejected by Jersey thread pool executor
QUERY_REJECTED_EXCEPTIONS("exceptions", true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pinot.core.query.request.context.QueryContext;


// TODO(timeseries): Implement unsupported functions when merging with MSE.
public class TimeSeriesResultsBlock extends BaseResultsBlock {
private final TimeSeriesBuilderBlock _timeSeriesBuilderBlock;

Expand All @@ -36,34 +37,31 @@ public TimeSeriesResultsBlock(TimeSeriesBuilderBlock timeSeriesBuilderBlock) {

@Override
public int getNumRows() {
// TODO: Unused right now.
return 0;
return _timeSeriesBuilderBlock.getSeriesBuilderMap().size();
}

@Nullable
@Override
public QueryContext getQueryContext() {
// TODO: Unused right now.
return null;
throw new UnsupportedOperationException("Time series results block does not support getting QueryContext yet");
}

@Nullable
@Override
public DataSchema getDataSchema() {
// TODO: Unused right now.
return null;
throw new UnsupportedOperationException("Time series results block does not support getting DataSchema yet");
}

@Nullable
@Override
public List<Object[]> getRows() {
return null;
throw new UnsupportedOperationException("Time series results block does not support getRows yet");
}

@Override
public DataTable getDataTable()
throws IOException {
return null;
throw new UnsupportedOperationException("Time series results block does not support returning DataTable");
}

public TimeSeriesBuilderBlock getTimeSeriesBuilderBlock() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.operator.combine.merger;

import com.google.common.base.Preconditions;
import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock;
import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock;
import org.apache.pinot.tsdb.spi.AggInfo;
Expand All @@ -28,10 +29,14 @@
public class TimeSeriesAggResultsBlockMerger implements ResultsBlockMerger<TimeSeriesResultsBlock> {
private final TimeSeriesBuilderFactory _seriesBuilderFactory;
private final AggInfo _aggInfo;
private final int _maxSeriesLimit;
private final long _maxDataPointsLimit;

public TimeSeriesAggResultsBlockMerger(TimeSeriesBuilderFactory seriesBuilderFactory, AggInfo aggInfo) {
_seriesBuilderFactory = seriesBuilderFactory;
_aggInfo = aggInfo;
_maxSeriesLimit = _seriesBuilderFactory.getMaxUniqueSeriesPerServerLimit();
_maxDataPointsLimit = _seriesBuilderFactory.getMaxDataPointsPerServerLimit();
}

@Override
Expand All @@ -44,6 +49,14 @@ public void mergeResultsBlocks(TimeSeriesResultsBlock mergedBlock, TimeSeriesRes
BaseTimeSeriesBuilder newTimeSeriesToMerge = entry.getValue();
if (currentTimeSeriesBuilder == null) {
currentTimeSeriesBlock.getSeriesBuilderMap().put(seriesHash, newTimeSeriesToMerge);
final long currentUniqueSeries = currentTimeSeriesBlock.getSeriesBuilderMap().size();
final long numBuckets = currentTimeSeriesBlock.getTimeBuckets().getNumBuckets();
Preconditions.checkState(currentUniqueSeries * numBuckets <= _maxDataPointsLimit,
"Max data points limit reached in combine operator. Limit: %s. Current count: %s",
_maxDataPointsLimit, currentUniqueSeries * numBuckets);
Preconditions.checkState(currentUniqueSeries <= _maxSeriesLimit,
"Max series limit reached in combine operator. Limit: %s. Current Size: %s",
_maxSeriesLimit, currentTimeSeriesBlock.getSeriesBuilderMap().size());
} else {
currentTimeSeriesBuilder.mergeAlignedSeriesBuilder(newTimeSeriesToMerge);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.operator.timeseries;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.time.Duration;
import java.util.HashMap;
Expand All @@ -37,6 +38,7 @@
import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.tsdb.spi.AggInfo;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder;
Expand All @@ -59,6 +61,10 @@ public class TimeSeriesAggregationOperator extends BaseOperator<TimeSeriesResult
private final BaseProjectOperator<? extends ValueBlock> _projectOperator;
private final TimeBuckets _timeBuckets;
private final TimeSeriesBuilderFactory _seriesBuilderFactory;
private final int _maxSeriesLimit;
private final long _maxDataPointsLimit;
private final long _numTotalDocs;
private long _numDocsScanned = 0;

public TimeSeriesAggregationOperator(
String timeColumn,
Expand All @@ -69,7 +75,8 @@ public TimeSeriesAggregationOperator(
List<String> groupByExpressions,
TimeBuckets timeBuckets,
BaseProjectOperator<? extends ValueBlock> projectOperator,
TimeSeriesBuilderFactory seriesBuilderFactory) {
TimeSeriesBuilderFactory seriesBuilderFactory,
SegmentMetadata segmentMetadata) {
_timeColumn = timeColumn;
_storedTimeUnit = timeUnit;
_timeOffset = timeOffsetSeconds == null ? 0L : timeUnit.convert(Duration.ofSeconds(timeOffsetSeconds));
Expand All @@ -79,6 +86,9 @@ public TimeSeriesAggregationOperator(
_projectOperator = projectOperator;
_timeBuckets = timeBuckets;
_seriesBuilderFactory = seriesBuilderFactory;
_maxSeriesLimit = _seriesBuilderFactory.getMaxUniqueSeriesPerServerLimit();
_maxDataPointsLimit = _seriesBuilderFactory.getMaxDataPointsPerServerLimit();
_numTotalDocs = segmentMetadata.getTotalDocs();
}

@Override
Expand All @@ -87,6 +97,7 @@ protected TimeSeriesResultsBlock getNextBlock() {
Map<Long, BaseTimeSeriesBuilder> seriesBuilderMap = new HashMap<>(1024);
while ((valueBlock = _projectOperator.nextBlock()) != null) {
int numDocs = valueBlock.getNumDocs();
_numDocsScanned += numDocs;
// TODO: This is quite unoptimized and allocates liberally
BlockValSet blockValSet = valueBlock.getBlockValueSet(_timeColumn);
long[] timeValues = blockValSet.getLongValuesSV();
Expand Down Expand Up @@ -129,6 +140,12 @@ protected TimeSeriesResultsBlock getNextBlock() {
throw new IllegalStateException(
"Don't yet support value expression of type: " + valueExpressionBlockValSet.getValueType());
}
Preconditions.checkState(seriesBuilderMap.size() * (long) _timeBuckets.getNumBuckets() <= _maxDataPointsLimit,
"Exceeded max data point limit per server. Limit: %s. Data points in current segment so far: %s",
_maxDataPointsLimit, seriesBuilderMap.size() * _timeBuckets.getNumBuckets());
Preconditions.checkState(seriesBuilderMap.size() <= _maxSeriesLimit,
"Exceeded max unique series limit per server. Limit: %s. Series in current segment so far: %s",
_maxSeriesLimit, seriesBuilderMap.size());
}
return new TimeSeriesResultsBlock(new TimeSeriesBuilderBlock(_timeBuckets, seriesBuilderMap));
}
Expand All @@ -147,8 +164,10 @@ public String toExplainString() {

@Override
public ExecutionStatistics getExecutionStatistics() {
// TODO: Implement this.
return new ExecutionStatistics(0, 0, 0, 0);
long numEntriesScannedInFilter = _projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
long numEntriesScannedPostFilter = _numDocsScanned * _projectOperator.getNumColumnsProjected();
return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
_numTotalDocs);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ _segmentContext, _queryContext, getProjectPlanNodeExpressions(), DocIdSetPlanNod
getGroupByColumns(),
_timeSeriesContext.getTimeBuckets(),
projectionOperator,
_seriesBuilderFactory);
_seriesBuilderFactory,
_segmentContext.getIndexSegment().getSegmentMetadata());
}

private List<ExpressionContext> getProjectPlanNodeExpressions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,19 @@

import java.time.Duration;
import java.util.Collections;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.operator.BaseProjectOperator;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.tsdb.spi.AggInfo;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory;
import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;
import org.testng.annotations.Test;

Expand All @@ -33,9 +41,49 @@


public class TimeSeriesAggregationOperatorTest {
private static final Random RANDOM = new Random();
private static final String DUMMY_TIME_COLUMN = "someTimeColumn";
private static final AggInfo AGG_INFO = new AggInfo("", Collections.emptyMap());
private static final String GROUP_BY_COLUMN = "city";
private static final AggInfo AGG_INFO = new AggInfo("SUM", Collections.emptyMap());
private static final ExpressionContext VALUE_EXPRESSION = ExpressionContext.forIdentifier("someValueColumn");
private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000, Duration.ofSeconds(100), 10);
private static final int NUM_DOCS_IN_DUMMY_DATA = 1000;

@Test
public void testTimeSeriesAggregationOperator() {
TimeSeriesAggregationOperator timeSeriesAggregationOperator = buildOperatorWithSampleData(
new SimpleTimeSeriesBuilderFactory());
TimeSeriesResultsBlock resultsBlock = timeSeriesAggregationOperator.getNextBlock();
// Expect 2 series: Chicago and San Francisco
assertNotNull(resultsBlock);
assertEquals(2, resultsBlock.getNumRows());
}

@Test
public void testTimeSeriesAggregationOperatorWhenSeriesLimit() {
// Since we test with 2 series, use 1 as the limit.
TimeSeriesAggregationOperator timeSeriesAggregationOperator = buildOperatorWithSampleData(
new SimpleTimeSeriesBuilderFactory(1, 100_000_000L));
try {
timeSeriesAggregationOperator.getNextBlock();
fail();
} catch (IllegalStateException e) {
assertTrue(e.getMessage().contains("Limit: 1. Series in current"));
}
}

@Test
public void testTimeSeriesAggregationOperatorWhenDataPoints() {
// Since we test with 2 series, use 1 as the limit.
TimeSeriesAggregationOperator timeSeriesAggregationOperator = buildOperatorWithSampleData(
new SimpleTimeSeriesBuilderFactory(1000, 11));
try {
timeSeriesAggregationOperator.getNextBlock();
fail();
} catch (IllegalStateException e) {
assertTrue(e.getMessage().contains("Limit: 11. Data points in current"));
}
}

@Test
public void testGetTimeValueIndexForSeconds() {
Expand Down Expand Up @@ -88,9 +136,60 @@ private void testOutOfBoundsTimeValueIndex(long[] storedTimeValues, int numTimeB
assertTrue(indexes[0] < 0 || indexes[0] >= numTimeBuckets, "Expected time index to spill beyond valid range");
}

private TimeSeriesAggregationOperator buildOperatorWithSampleData(TimeSeriesBuilderFactory seriesBuilderFactory) {
BaseProjectOperator<ValueBlock> mockProjectOperator = mock(BaseProjectOperator.class);
ValueBlock valueBlock = buildValueBlockForProjectOperator();
when(mockProjectOperator.nextBlock()).thenReturn(valueBlock, (ValueBlock) null);
return new TimeSeriesAggregationOperator(DUMMY_TIME_COLUMN,
TimeUnit.SECONDS, 0L, AGG_INFO, VALUE_EXPRESSION, Collections.singletonList(GROUP_BY_COLUMN),
TIME_BUCKETS, mockProjectOperator, seriesBuilderFactory, mock(SegmentMetadata.class));
}

private static ValueBlock buildValueBlockForProjectOperator() {
ValueBlock valueBlock = mock(ValueBlock.class);
doReturn(NUM_DOCS_IN_DUMMY_DATA).when(valueBlock).getNumDocs();
doReturn(buildBlockValSetForTime()).when(valueBlock).getBlockValueSet(DUMMY_TIME_COLUMN);
doReturn(buildBlockValSetForValues()).when(valueBlock).getBlockValueSet(VALUE_EXPRESSION);
doReturn(buildBlockValSetForGroupByColumns()).when(valueBlock).getBlockValueSet(GROUP_BY_COLUMN);
return valueBlock;
}

private static BlockValSet buildBlockValSetForGroupByColumns() {
BlockValSet blockValSet = mock(BlockValSet.class);
String[] stringArray = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL];
for (int index = 0; index < NUM_DOCS_IN_DUMMY_DATA; index++) {
stringArray[index] = RANDOM.nextBoolean() ? "Chicago" : "San Francisco";
}
doReturn(stringArray).when(blockValSet).getStringValuesSV();
doReturn(FieldSpec.DataType.STRING).when(blockValSet).getValueType();
return blockValSet;
}

private static BlockValSet buildBlockValSetForValues() {
BlockValSet blockValSet = mock(BlockValSet.class);
long[] valuesArray = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL];
for (int index = 0; index < NUM_DOCS_IN_DUMMY_DATA; index++) {
valuesArray[index] = index;
}
doReturn(valuesArray).when(blockValSet).getLongValuesSV();
doReturn(FieldSpec.DataType.LONG).when(blockValSet).getValueType();
return blockValSet;
}

private static BlockValSet buildBlockValSetForTime() {
BlockValSet blockValSet = mock(BlockValSet.class);
long[] timeValueArray = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL];
for (int index = 0; index < NUM_DOCS_IN_DUMMY_DATA; index++) {
timeValueArray[index] = 901 + RANDOM.nextInt(1000);
}
doReturn(timeValueArray).when(blockValSet).getLongValuesSV();
return blockValSet;
}

private TimeSeriesAggregationOperator buildOperator(TimeUnit storedTimeUnit, TimeBuckets timeBuckets) {
return new TimeSeriesAggregationOperator(
DUMMY_TIME_COLUMN, storedTimeUnit, 0L, AGG_INFO, VALUE_EXPRESSION, Collections.emptyList(),
timeBuckets, mock(BaseProjectOperator.class), mock(TimeSeriesBuilderFactory.class));
timeBuckets, mock(BaseProjectOperator.class), mock(TimeSeriesBuilderFactory.class),
mock(SegmentMetadata.class));
}
}
Loading

0 comments on commit f0c6bba

Please sign in to comment.