From f0c6bba73c0e4de43115d69ffc4ffdc010848fee Mon Sep 17 00:00:00 2001 From: Ankit Sultana Date: Fri, 22 Nov 2024 13:45:41 -0600 Subject: [PATCH] [timeseries] Response Size Limit, Metrics and Series Limit (#14501) --- .../TimeSeriesRequestHandler.java | 38 +++++-- .../pinot/common/metrics/BrokerMeter.java | 8 ++ .../results/TimeSeriesResultsBlock.java | 14 +-- .../TimeSeriesAggResultsBlockMerger.java | 13 +++ .../TimeSeriesAggregationOperator.java | 25 ++++- .../pinot/core/plan/TimeSeriesPlanNode.java | 3 +- .../TimeSeriesAggregationOperatorTest.java | 103 +++++++++++++++++- .../timeseries/LeafTimeSeriesOperator.java | 4 + .../timeseries/TimeSeriesDispatchClient.java | 7 +- .../SimpleTimeSeriesBuilderFactory.java | 19 ++++ .../spi/series/TimeSeriesBuilderFactory.java | 14 +++ 11 files changed, 221 insertions(+), 27 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java index 1773fca957ad..52cf63f562e0 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import javax.ws.rs.core.HttpHeaders; import org.apache.commons.lang3.StringUtils; @@ -39,6 +40,8 @@ import org.apache.pinot.broker.queryquota.QueryQuotaManager; import org.apache.pinot.broker.routing.BrokerRoutingManager; import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.metrics.BrokerMeter; +import org.apache.pinot.common.metrics.BrokerTimer; import org.apache.pinot.common.response.BrokerResponse; import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse; import org.apache.pinot.common.utils.HumanReadableDuration; @@ -90,19 +93,32 @@ public void shutDown() { @Override public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String rawQueryParamString, RequestContext requestContext) { - requestContext.setBrokerId(_brokerId); - requestContext.setRequestId(_requestIdGenerator.get()); - RangeTimeSeriesRequest timeSeriesRequest = null; + PinotBrokerTimeSeriesResponse timeSeriesResponse = null; + long queryStartTime = System.currentTimeMillis(); try { - timeSeriesRequest = buildRangeTimeSeriesRequest(lang, rawQueryParamString); - } catch (URISyntaxException e) { - throw new RuntimeException(e); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES, 1); + requestContext.setBrokerId(_brokerId); + requestContext.setRequestId(_requestIdGenerator.get()); + RangeTimeSeriesRequest timeSeriesRequest = null; + try { + timeSeriesRequest = buildRangeTimeSeriesRequest(lang, rawQueryParamString); + } catch (URISyntaxException e) { + return PinotBrokerTimeSeriesResponse.newErrorResponse("BAD_REQUEST", "Error building RangeTimeSeriesRequest"); + } + TimeSeriesLogicalPlanResult logicalPlanResult = _queryEnvironment.buildLogicalPlan(timeSeriesRequest); + TimeSeriesDispatchablePlan dispatchablePlan = + _queryEnvironment.buildPhysicalPlan(timeSeriesRequest, requestContext, logicalPlanResult); + timeSeriesResponse = _queryDispatcher.submitAndGet(requestContext, dispatchablePlan, + timeSeriesRequest.getTimeout().toMillis(), new HashMap<>()); + return timeSeriesResponse; + } finally { + _brokerMetrics.addTimedValue(BrokerTimer.QUERY_TOTAL_TIME_MS, System.currentTimeMillis() - queryStartTime, + TimeUnit.MILLISECONDS); + if (timeSeriesResponse == null + || timeSeriesResponse.getStatus().equals(PinotBrokerTimeSeriesResponse.ERROR_STATUS)) { + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES_FAILED, 1); + } } - TimeSeriesLogicalPlanResult logicalPlanResult = _queryEnvironment.buildLogicalPlan(timeSeriesRequest); - TimeSeriesDispatchablePlan dispatchablePlan = _queryEnvironment.buildPhysicalPlan(timeSeriesRequest, requestContext, - logicalPlanResult); - return _queryDispatcher.submitAndGet(requestContext, dispatchablePlan, timeSeriesRequest.getTimeout().toMillis(), - new HashMap<>()); } @Override diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java index 086d5ffd9e48..c36b4ab504e6 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java @@ -60,6 +60,14 @@ public enum BrokerMeter implements AbstractMetrics.Meter { * Number of single-stage queries executed that would not have successfully run on the multi-stage query engine as is. */ SINGLE_STAGE_QUERIES_INVALID_MULTI_STAGE("queries", true), + /** + * Number of time-series queries. This metric is not grouped on the table name. + */ + TIME_SERIES_GLOBAL_QUERIES("queries", true), + /** + * Number of time-series queries that failed. This metric is not grouped on the table name. + */ + TIME_SERIES_GLOBAL_QUERIES_FAILED("queries", true), // These metrics track the exceptions caught during query execution in broker side. // Query rejected by Jersey thread pool executor QUERY_REJECTED_EXCEPTIONS("exceptions", true), diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java index 30a66bd6245d..f8e7fac944c5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/TimeSeriesResultsBlock.java @@ -27,6 +27,7 @@ import org.apache.pinot.core.query.request.context.QueryContext; +// TODO(timeseries): Implement unsupported functions when merging with MSE. public class TimeSeriesResultsBlock extends BaseResultsBlock { private final TimeSeriesBuilderBlock _timeSeriesBuilderBlock; @@ -36,34 +37,31 @@ public TimeSeriesResultsBlock(TimeSeriesBuilderBlock timeSeriesBuilderBlock) { @Override public int getNumRows() { - // TODO: Unused right now. - return 0; + return _timeSeriesBuilderBlock.getSeriesBuilderMap().size(); } @Nullable @Override public QueryContext getQueryContext() { - // TODO: Unused right now. - return null; + throw new UnsupportedOperationException("Time series results block does not support getting QueryContext yet"); } @Nullable @Override public DataSchema getDataSchema() { - // TODO: Unused right now. - return null; + throw new UnsupportedOperationException("Time series results block does not support getting DataSchema yet"); } @Nullable @Override public List getRows() { - return null; + throw new UnsupportedOperationException("Time series results block does not support getRows yet"); } @Override public DataTable getDataTable() throws IOException { - return null; + throw new UnsupportedOperationException("Time series results block does not support returning DataTable"); } public TimeSeriesBuilderBlock getTimeSeriesBuilderBlock() { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java index 17f22a173761..428cfde55511 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/TimeSeriesAggResultsBlockMerger.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.operator.combine.merger; +import com.google.common.base.Preconditions; import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock; import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; import org.apache.pinot.tsdb.spi.AggInfo; @@ -28,10 +29,14 @@ public class TimeSeriesAggResultsBlockMerger implements ResultsBlockMerger { private final TimeSeriesBuilderFactory _seriesBuilderFactory; private final AggInfo _aggInfo; + private final int _maxSeriesLimit; + private final long _maxDataPointsLimit; public TimeSeriesAggResultsBlockMerger(TimeSeriesBuilderFactory seriesBuilderFactory, AggInfo aggInfo) { _seriesBuilderFactory = seriesBuilderFactory; _aggInfo = aggInfo; + _maxSeriesLimit = _seriesBuilderFactory.getMaxUniqueSeriesPerServerLimit(); + _maxDataPointsLimit = _seriesBuilderFactory.getMaxDataPointsPerServerLimit(); } @Override @@ -44,6 +49,14 @@ public void mergeResultsBlocks(TimeSeriesResultsBlock mergedBlock, TimeSeriesRes BaseTimeSeriesBuilder newTimeSeriesToMerge = entry.getValue(); if (currentTimeSeriesBuilder == null) { currentTimeSeriesBlock.getSeriesBuilderMap().put(seriesHash, newTimeSeriesToMerge); + final long currentUniqueSeries = currentTimeSeriesBlock.getSeriesBuilderMap().size(); + final long numBuckets = currentTimeSeriesBlock.getTimeBuckets().getNumBuckets(); + Preconditions.checkState(currentUniqueSeries * numBuckets <= _maxDataPointsLimit, + "Max data points limit reached in combine operator. Limit: %s. Current count: %s", + _maxDataPointsLimit, currentUniqueSeries * numBuckets); + Preconditions.checkState(currentUniqueSeries <= _maxSeriesLimit, + "Max series limit reached in combine operator. Limit: %s. Current Size: %s", + _maxSeriesLimit, currentTimeSeriesBlock.getSeriesBuilderMap().size()); } else { currentTimeSeriesBuilder.mergeAlignedSeriesBuilder(newTimeSeriesToMerge); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java index fd895a460701..a39c996f4fb7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperator.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.operator.timeseries; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import java.time.Duration; import java.util.HashMap; @@ -37,6 +38,7 @@ import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock; import org.apache.pinot.core.operator.blocks.ValueBlock; import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; +import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.tsdb.spi.AggInfo; import org.apache.pinot.tsdb.spi.TimeBuckets; import org.apache.pinot.tsdb.spi.series.BaseTimeSeriesBuilder; @@ -59,6 +61,10 @@ public class TimeSeriesAggregationOperator extends BaseOperator _projectOperator; private final TimeBuckets _timeBuckets; private final TimeSeriesBuilderFactory _seriesBuilderFactory; + private final int _maxSeriesLimit; + private final long _maxDataPointsLimit; + private final long _numTotalDocs; + private long _numDocsScanned = 0; public TimeSeriesAggregationOperator( String timeColumn, @@ -69,7 +75,8 @@ public TimeSeriesAggregationOperator( List groupByExpressions, TimeBuckets timeBuckets, BaseProjectOperator projectOperator, - TimeSeriesBuilderFactory seriesBuilderFactory) { + TimeSeriesBuilderFactory seriesBuilderFactory, + SegmentMetadata segmentMetadata) { _timeColumn = timeColumn; _storedTimeUnit = timeUnit; _timeOffset = timeOffsetSeconds == null ? 0L : timeUnit.convert(Duration.ofSeconds(timeOffsetSeconds)); @@ -79,6 +86,9 @@ public TimeSeriesAggregationOperator( _projectOperator = projectOperator; _timeBuckets = timeBuckets; _seriesBuilderFactory = seriesBuilderFactory; + _maxSeriesLimit = _seriesBuilderFactory.getMaxUniqueSeriesPerServerLimit(); + _maxDataPointsLimit = _seriesBuilderFactory.getMaxDataPointsPerServerLimit(); + _numTotalDocs = segmentMetadata.getTotalDocs(); } @Override @@ -87,6 +97,7 @@ protected TimeSeriesResultsBlock getNextBlock() { Map seriesBuilderMap = new HashMap<>(1024); while ((valueBlock = _projectOperator.nextBlock()) != null) { int numDocs = valueBlock.getNumDocs(); + _numDocsScanned += numDocs; // TODO: This is quite unoptimized and allocates liberally BlockValSet blockValSet = valueBlock.getBlockValueSet(_timeColumn); long[] timeValues = blockValSet.getLongValuesSV(); @@ -129,6 +140,12 @@ protected TimeSeriesResultsBlock getNextBlock() { throw new IllegalStateException( "Don't yet support value expression of type: " + valueExpressionBlockValSet.getValueType()); } + Preconditions.checkState(seriesBuilderMap.size() * (long) _timeBuckets.getNumBuckets() <= _maxDataPointsLimit, + "Exceeded max data point limit per server. Limit: %s. Data points in current segment so far: %s", + _maxDataPointsLimit, seriesBuilderMap.size() * _timeBuckets.getNumBuckets()); + Preconditions.checkState(seriesBuilderMap.size() <= _maxSeriesLimit, + "Exceeded max unique series limit per server. Limit: %s. Series in current segment so far: %s", + _maxSeriesLimit, seriesBuilderMap.size()); } return new TimeSeriesResultsBlock(new TimeSeriesBuilderBlock(_timeBuckets, seriesBuilderMap)); } @@ -147,8 +164,10 @@ public String toExplainString() { @Override public ExecutionStatistics getExecutionStatistics() { - // TODO: Implement this. - return new ExecutionStatistics(0, 0, 0, 0); + long numEntriesScannedInFilter = _projectOperator.getExecutionStatistics().getNumEntriesScannedInFilter(); + long numEntriesScannedPostFilter = _numDocsScanned * _projectOperator.getNumColumnsProjected(); + return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, + _numTotalDocs); } @VisibleForTesting diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java index 22e3d7b91226..dae0479ebbab 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/TimeSeriesPlanNode.java @@ -66,7 +66,8 @@ _segmentContext, _queryContext, getProjectPlanNodeExpressions(), DocIdSetPlanNod getGroupByColumns(), _timeSeriesContext.getTimeBuckets(), projectionOperator, - _seriesBuilderFactory); + _seriesBuilderFactory, + _segmentContext.getIndexSegment().getSegmentMetadata()); } private List getProjectPlanNodeExpressions() { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java index 888d0658e958..eea81a4ba164 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java @@ -20,11 +20,19 @@ import java.time.Duration; import java.util.Collections; +import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.operator.BaseProjectOperator; +import org.apache.pinot.core.operator.blocks.ValueBlock; +import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; +import org.apache.pinot.core.plan.DocIdSetPlanNode; +import org.apache.pinot.segment.spi.SegmentMetadata; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.tsdb.spi.AggInfo; import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.apache.pinot.tsdb.spi.series.SimpleTimeSeriesBuilderFactory; import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory; import org.testng.annotations.Test; @@ -33,9 +41,49 @@ public class TimeSeriesAggregationOperatorTest { + private static final Random RANDOM = new Random(); private static final String DUMMY_TIME_COLUMN = "someTimeColumn"; - private static final AggInfo AGG_INFO = new AggInfo("", Collections.emptyMap()); + private static final String GROUP_BY_COLUMN = "city"; + private static final AggInfo AGG_INFO = new AggInfo("SUM", Collections.emptyMap()); private static final ExpressionContext VALUE_EXPRESSION = ExpressionContext.forIdentifier("someValueColumn"); + private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000, Duration.ofSeconds(100), 10); + private static final int NUM_DOCS_IN_DUMMY_DATA = 1000; + + @Test + public void testTimeSeriesAggregationOperator() { + TimeSeriesAggregationOperator timeSeriesAggregationOperator = buildOperatorWithSampleData( + new SimpleTimeSeriesBuilderFactory()); + TimeSeriesResultsBlock resultsBlock = timeSeriesAggregationOperator.getNextBlock(); + // Expect 2 series: Chicago and San Francisco + assertNotNull(resultsBlock); + assertEquals(2, resultsBlock.getNumRows()); + } + + @Test + public void testTimeSeriesAggregationOperatorWhenSeriesLimit() { + // Since we test with 2 series, use 1 as the limit. + TimeSeriesAggregationOperator timeSeriesAggregationOperator = buildOperatorWithSampleData( + new SimpleTimeSeriesBuilderFactory(1, 100_000_000L)); + try { + timeSeriesAggregationOperator.getNextBlock(); + fail(); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("Limit: 1. Series in current")); + } + } + + @Test + public void testTimeSeriesAggregationOperatorWhenDataPoints() { + // Since we test with 2 series, use 1 as the limit. + TimeSeriesAggregationOperator timeSeriesAggregationOperator = buildOperatorWithSampleData( + new SimpleTimeSeriesBuilderFactory(1000, 11)); + try { + timeSeriesAggregationOperator.getNextBlock(); + fail(); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("Limit: 11. Data points in current")); + } + } @Test public void testGetTimeValueIndexForSeconds() { @@ -88,9 +136,60 @@ private void testOutOfBoundsTimeValueIndex(long[] storedTimeValues, int numTimeB assertTrue(indexes[0] < 0 || indexes[0] >= numTimeBuckets, "Expected time index to spill beyond valid range"); } + private TimeSeriesAggregationOperator buildOperatorWithSampleData(TimeSeriesBuilderFactory seriesBuilderFactory) { + BaseProjectOperator mockProjectOperator = mock(BaseProjectOperator.class); + ValueBlock valueBlock = buildValueBlockForProjectOperator(); + when(mockProjectOperator.nextBlock()).thenReturn(valueBlock, (ValueBlock) null); + return new TimeSeriesAggregationOperator(DUMMY_TIME_COLUMN, + TimeUnit.SECONDS, 0L, AGG_INFO, VALUE_EXPRESSION, Collections.singletonList(GROUP_BY_COLUMN), + TIME_BUCKETS, mockProjectOperator, seriesBuilderFactory, mock(SegmentMetadata.class)); + } + + private static ValueBlock buildValueBlockForProjectOperator() { + ValueBlock valueBlock = mock(ValueBlock.class); + doReturn(NUM_DOCS_IN_DUMMY_DATA).when(valueBlock).getNumDocs(); + doReturn(buildBlockValSetForTime()).when(valueBlock).getBlockValueSet(DUMMY_TIME_COLUMN); + doReturn(buildBlockValSetForValues()).when(valueBlock).getBlockValueSet(VALUE_EXPRESSION); + doReturn(buildBlockValSetForGroupByColumns()).when(valueBlock).getBlockValueSet(GROUP_BY_COLUMN); + return valueBlock; + } + + private static BlockValSet buildBlockValSetForGroupByColumns() { + BlockValSet blockValSet = mock(BlockValSet.class); + String[] stringArray = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL]; + for (int index = 0; index < NUM_DOCS_IN_DUMMY_DATA; index++) { + stringArray[index] = RANDOM.nextBoolean() ? "Chicago" : "San Francisco"; + } + doReturn(stringArray).when(blockValSet).getStringValuesSV(); + doReturn(FieldSpec.DataType.STRING).when(blockValSet).getValueType(); + return blockValSet; + } + + private static BlockValSet buildBlockValSetForValues() { + BlockValSet blockValSet = mock(BlockValSet.class); + long[] valuesArray = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL]; + for (int index = 0; index < NUM_DOCS_IN_DUMMY_DATA; index++) { + valuesArray[index] = index; + } + doReturn(valuesArray).when(blockValSet).getLongValuesSV(); + doReturn(FieldSpec.DataType.LONG).when(blockValSet).getValueType(); + return blockValSet; + } + + private static BlockValSet buildBlockValSetForTime() { + BlockValSet blockValSet = mock(BlockValSet.class); + long[] timeValueArray = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL]; + for (int index = 0; index < NUM_DOCS_IN_DUMMY_DATA; index++) { + timeValueArray[index] = 901 + RANDOM.nextInt(1000); + } + doReturn(timeValueArray).when(blockValSet).getLongValuesSV(); + return blockValSet; + } + private TimeSeriesAggregationOperator buildOperator(TimeUnit storedTimeUnit, TimeBuckets timeBuckets) { return new TimeSeriesAggregationOperator( DUMMY_TIME_COLUMN, storedTimeUnit, 0L, AGG_INFO, VALUE_EXPRESSION, Collections.emptyList(), - timeBuckets, mock(BaseProjectOperator.class), mock(TimeSeriesBuilderFactory.class)); + timeBuckets, mock(BaseProjectOperator.class), mock(TimeSeriesBuilderFactory.class), + mock(SegmentMetadata.class)); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java index ca119ebf1d47..8b577105d3fd 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/LeafTimeSeriesOperator.java @@ -25,6 +25,7 @@ import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; import org.apache.pinot.core.query.executor.QueryExecutor; +import org.apache.pinot.core.query.logger.ServerQueryLogger; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock; @@ -34,6 +35,7 @@ public class LeafTimeSeriesOperator extends BaseTimeSeriesOperator { private final ServerQueryRequest _request; private final QueryExecutor _queryExecutor; private final ExecutorService _executorService; + private final ServerQueryLogger _queryLogger; public LeafTimeSeriesOperator(ServerQueryRequest serverQueryRequest, QueryExecutor queryExecutor, ExecutorService executorService) { @@ -41,6 +43,7 @@ public LeafTimeSeriesOperator(ServerQueryRequest serverQueryRequest, QueryExecut _request = serverQueryRequest; _queryExecutor = queryExecutor; _executorService = executorService; + _queryLogger = ServerQueryLogger.getInstance(); } @Override @@ -48,6 +51,7 @@ public TimeSeriesBlock getNextBlock() { Preconditions.checkNotNull(_queryExecutor, "Leaf time series operator has not been initialized"); InstanceResponseBlock instanceResponseBlock = _queryExecutor.execute(_request, _executorService); assert instanceResponseBlock.getResultsBlock() instanceof TimeSeriesResultsBlock; + _queryLogger.logQuery(_request, instanceResponseBlock, "TimeSeries"); if (MapUtils.isNotEmpty(instanceResponseBlock.getExceptions())) { // TODO: Return error in the TimeSeriesBlock instead? String oneException = instanceResponseBlock.getExceptions().values().iterator().next(); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchClient.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchClient.java index a68e636b96a0..df7734466530 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchClient.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchClient.java @@ -29,16 +29,19 @@ /** * Dispatch client used to dispatch a runnable plan to the server. - * TODO: This shouldn't exist and we should re-use DispatchClient. TBD as part of multi-stage + * TODO(timeseries): This shouldn't exist and we should re-use DispatchClient. TBD as part of multi-stage * engine integration. */ public class TimeSeriesDispatchClient { + // TODO(timeseries): Note that time-series engine at present uses QueryServer for data transfer from server to broker. + // This will be fixed as we integrate with MSE. + private static final int INBOUND_SIZE_LIMIT = 256 * 1024 * 1024; private final ManagedChannel _channel; private final PinotQueryWorkerGrpc.PinotQueryWorkerStub _dispatchStub; public TimeSeriesDispatchClient(String host, int port) { _channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(); - _dispatchStub = PinotQueryWorkerGrpc.newStub(_channel); + _dispatchStub = PinotQueryWorkerGrpc.newStub(_channel).withMaxInboundMessageSize(INBOUND_SIZE_LIMIT); } public ManagedChannel getChannel() { diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SimpleTimeSeriesBuilderFactory.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SimpleTimeSeriesBuilderFactory.java index 9ed40954e5f4..98882d19a700 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SimpleTimeSeriesBuilderFactory.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/SimpleTimeSeriesBuilderFactory.java @@ -28,8 +28,17 @@ public class SimpleTimeSeriesBuilderFactory extends TimeSeriesBuilderFactory { + private final int _maxSeriesLimit; + private final long _maxDataPointsLimit; + public SimpleTimeSeriesBuilderFactory() { + this(DEFAULT_MAX_UNIQUE_SERIES_PER_SERVER_LIMIT, DEFAULT_MAX_DATA_POINTS_PER_SERVER_LIMIT); + } + + public SimpleTimeSeriesBuilderFactory(int maxSeriesLimit, long maxDataPointsLimit) { super(); + _maxSeriesLimit = maxSeriesLimit; + _maxDataPointsLimit = maxDataPointsLimit; } @Override @@ -50,4 +59,14 @@ public BaseTimeSeriesBuilder newTimeSeriesBuilder(AggInfo aggInfo, String id, Ti @Override public void init(PinotConfiguration pinotConfiguration) { } + + @Override + public int getMaxUniqueSeriesPerServerLimit() { + return _maxSeriesLimit; + } + + @Override + public long getMaxDataPointsPerServerLimit() { + return _maxDataPointsLimit; + } } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java index 088f9b3c8572..c48307efea84 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeriesBuilderFactory.java @@ -25,6 +25,12 @@ public abstract class TimeSeriesBuilderFactory { + protected static final int DEFAULT_MAX_UNIQUE_SERIES_PER_SERVER_LIMIT = 100_000; + /** + * Default limit for the total number of values across all series. + */ + protected static final long DEFAULT_MAX_DATA_POINTS_PER_SERVER_LIMIT = 100_000_000; + public abstract BaseTimeSeriesBuilder newTimeSeriesBuilder( AggInfo aggInfo, String id, @@ -32,5 +38,13 @@ public abstract BaseTimeSeriesBuilder newTimeSeriesBuilder( List tagNames, Object[] tagValues); + public int getMaxUniqueSeriesPerServerLimit() { + return DEFAULT_MAX_UNIQUE_SERIES_PER_SERVER_LIMIT; + } + + public long getMaxDataPointsPerServerLimit() { + return DEFAULT_MAX_DATA_POINTS_PER_SERVER_LIMIT; + } + public abstract void init(PinotConfiguration pinotConfiguration); }