From 876e58dcc5e4a4c8639bdeff22fda55d9f12f3ef Mon Sep 17 00:00:00 2001 From: Rong Rong Date: Wed, 11 Oct 2023 22:10:18 -0700 Subject: [PATCH] [multistage] framework to back-propagate metadata across opChains (#11746) * initial commit to alter early termination and signaling backwards behavior * add more tests --------- Co-authored-by: Rong Rong --- .../query/mailbox/GrpcSendingMailbox.java | 8 ++- .../query/mailbox/InMemorySendingMailbox.java | 14 ++++- .../pinot/query/mailbox/ReceivingMailbox.java | 17 ++++-- .../pinot/query/mailbox/SendingMailbox.java | 6 ++ .../query/mailbox/channel/ChannelUtils.java | 2 +- .../channel/MailboxContentObserver.java | 7 ++- .../channel/MailboxStatusObserver.java | 16 +++++- .../runtime/operator/AggregateOperator.java | 1 - .../operator/BaseMailboxReceiveOperator.java | 11 ++++ .../runtime/operator/HashJoinOperator.java | 5 +- .../LeafStageTransferableBlockOperator.java | 18 +++--- .../operator/LiteralValueOperator.java | 2 +- .../operator/MailboxReceiveOperator.java | 10 +++- .../runtime/operator/MailboxSendOperator.java | 17 +++--- .../runtime/operator/MultiStageOperator.java | 9 +++ .../query/runtime/operator/SortOperator.java | 13 +++-- .../operator/WindowAggregateOperator.java | 1 - .../operator/exchange/BlockExchange.java | 32 ++++++----- .../runtime/operator/utils/AsyncStream.java | 5 ++ .../utils/BlockingMultiStreamConsumer.java | 6 ++ .../query/mailbox/MailboxServiceTest.java | 55 +++++++++++++++++++ .../operator/HashJoinOperatorTest.java | 1 + .../operator/MailboxReceiveOperatorTest.java | 29 ++++++++++ .../operator/MailboxSendOperatorTest.java | 21 +++++-- .../query/runtime/operator/OpChainTest.java | 5 ++ .../runtime/operator/SortOperatorTest.java | 27 +++++++++ .../operator/exchange/BlockExchangeTest.java | 33 +++++++++++ 27 files changed, 315 insertions(+), 56 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java index ccb9afc27005..87a8bda603ea 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java @@ -60,7 +60,7 @@ public GrpcSendingMailbox(String id, ChannelManager channelManager, String hostn @Override public void send(TransferableBlock block) throws IOException { - if (isTerminated()) { + if (isTerminated() || (isEarlyTerminated() && !block.isEndOfStreamBlock())) { return; } if (LOGGER.isDebugEnabled()) { @@ -101,9 +101,13 @@ public void cancel(Throwable t) { } } + @Override + public boolean isEarlyTerminated() { + return _statusObserver.isEarlyTerminated(); + } + @Override public boolean isTerminated() { - // TODO: We cannot differentiate early termination vs stream error return _statusObserver.isFinished(); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java index 444e7e1ddee9..306cd6992a38 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeoutException; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.spi.exception.QueryCancelledException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +35,7 @@ public class InMemorySendingMailbox implements SendingMailbox { private ReceivingMailbox _receivingMailbox; private volatile boolean _isTerminated; + private volatile boolean _isEarlyTerminated; public InMemorySendingMailbox(String id, MailboxService mailboxService, long deadlineMs) { _id = id; @@ -44,7 +46,7 @@ public InMemorySendingMailbox(String id, MailboxService mailboxService, long dea @Override public void send(TransferableBlock block) throws TimeoutException { - if (_isTerminated) { + if (isTerminated() || (isEarlyTerminated() && !block.isEndOfStreamBlock())) { return; } if (_receivingMailbox == null) { @@ -55,13 +57,15 @@ public void send(TransferableBlock block) switch (status) { case SUCCESS: break; + case CANCELLED: + throw new QueryCancelledException(String.format("Mailbox: %s already cancelled from upstream", _id)); case ERROR: throw new RuntimeException(String.format("Mailbox: %s already errored out (received error block before)", _id)); case TIMEOUT: throw new TimeoutException( String.format("Timed out adding block into mailbox: %s with timeout: %dms", _id, timeoutMs)); case EARLY_TERMINATED: - _isTerminated = true; + _isEarlyTerminated = true; break; default: throw new IllegalStateException("Unsupported mailbox status: " + status); @@ -70,6 +74,7 @@ public void send(TransferableBlock block) @Override public void complete() { + _isTerminated = true; } @Override @@ -85,6 +90,11 @@ public void cancel(Throwable t) { new RuntimeException("Cancelled by sender with exception: " + t.getMessage(), t))); } + @Override + public boolean isEarlyTerminated() { + return _isEarlyTerminated; + } + @Override public boolean isTerminated() { return _isTerminated; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java index 8bbffee36023..97c8731f95a8 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java @@ -49,6 +49,8 @@ public class ReceivingMailbox { // TODO: Revisit if this is the correct way to apply back pressure private final BlockingQueue _blocks = new ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS); private final AtomicReference _errorBlock = new AtomicReference<>(); + private volatile boolean _isEarlyTerminated = false; + @Nullable private volatile Reader _reader; @@ -78,7 +80,7 @@ public ReceivingMailboxStatus offer(TransferableBlock block, long timeoutMs) { TransferableBlock errorBlock = _errorBlock.get(); if (errorBlock != null) { LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring the late block", _id); - return errorBlock == CANCELLED_ERROR_BLOCK ? ReceivingMailboxStatus.EARLY_TERMINATED + return errorBlock == CANCELLED_ERROR_BLOCK ? ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR; } if (timeoutMs <= 0) { @@ -95,11 +97,11 @@ public ReceivingMailboxStatus offer(TransferableBlock block, long timeoutMs) { LOGGER.debug("==[MAILBOX]== Block " + block + " ready to read from mailbox: " + _id); } notifyReader(); - return ReceivingMailboxStatus.SUCCESS; + return _isEarlyTerminated ? ReceivingMailboxStatus.EARLY_TERMINATED : ReceivingMailboxStatus.SUCCESS; } else { LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring the late block", _id); _blocks.clear(); - return errorBlock == CANCELLED_ERROR_BLOCK ? ReceivingMailboxStatus.EARLY_TERMINATED + return errorBlock == CANCELLED_ERROR_BLOCK ? ReceivingMailboxStatus.CANCELLED : ReceivingMailboxStatus.ERROR; } } else { @@ -136,6 +138,13 @@ public TransferableBlock poll() { return errorBlock != null ? errorBlock : _blocks.poll(); } + /** + * Early terminate the mailbox, called when upstream doesn't expect any more data block. + */ + public void earlyTerminate() { + _isEarlyTerminated = true; + } + /** * Cancels the mailbox. No more blocks are accepted after calling this method. Should only be called by the receive * operator to clean up the remaining blocks. @@ -166,6 +175,6 @@ public interface Reader { } public enum ReceivingMailboxStatus { - SUCCESS, ERROR, TIMEOUT, EARLY_TERMINATED + SUCCESS, ERROR, TIMEOUT, CANCELLED, EARLY_TERMINATED } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java index 7cdfc8c97dc1..576a4703dacc 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java @@ -62,4 +62,10 @@ void send(TransferableBlock block) * mailbox is terminated. */ boolean isTerminated(); + + /** + * Returns whether the {@link ReceivingMailbox} is considered itself finished, and is expected a EOS block with + * statistics to be sent next. + */ + boolean isEarlyTerminated(); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java index ef43fe79a3eb..e297ac5f9c83 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelUtils.java @@ -23,5 +23,5 @@ private ChannelUtils() { } public static final String MAILBOX_METADATA_BUFFER_SIZE_KEY = "buffer.size"; - public static final String MAILBOX_METADATA_BEGIN_OF_STREAM_KEY = "begin.of.stream"; + public static final String MAILBOX_METADATA_REQUEST_EARLY_TERMINATE = "request.early.terminate"; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java index 9074d811517b..ce91701c3ade 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java @@ -84,6 +84,10 @@ public void onNext(MailboxContent mailboxContent) { .putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY, Integer.toString(_mailbox.getNumPendingBlocks())).build()); break; + case CANCELLED: + LOGGER.warn("Mailbox: {} already cancelled from upstream", mailboxId); + cancelStream(); + break; case ERROR: LOGGER.warn("Mailbox: {} already errored out (received error block before)", mailboxId); cancelStream(); @@ -94,7 +98,8 @@ public void onNext(MailboxContent mailboxContent) { break; case EARLY_TERMINATED: LOGGER.debug("Mailbox: {} has been early terminated", mailboxId); - onCompleted(); + _responseObserver.onNext(MailboxStatus.newBuilder().setMailboxId(mailboxId) + .putMetadata(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE, "true").build()); break; default: throw new IllegalStateException("Unsupported mailbox status: " + status); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java index 9f1c488d1802..4288f4a087ad 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java @@ -35,11 +35,19 @@ public class MailboxStatusObserver implements StreamObserver { private final AtomicInteger _bufferSize = new AtomicInteger(DEFAULT_MAILBOX_QUEUE_CAPACITY); private final AtomicBoolean _finished = new AtomicBoolean(); + private volatile boolean _isEarlyTerminated; @Override public void onNext(MailboxStatus mailboxStatus) { - // when received a mailbox status from the receiving end, sending end update the known buffer size available - // so we can make better throughput send judgement. here is a simple example. + // when receiving mailbox receives a data block it will return an updated info of the receiving end status including + // 1. the buffer size available, for back-pressure handling + // 2. status whether there's no need to send any additional data block b/c it considered itself finished. + // -- handle early-terminate EOS request. + if (Boolean.parseBoolean( + mailboxStatus.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE))) { + _isEarlyTerminated = true; + } + // -- handling buffer size back-pressure // TODO: this feedback info is not used to throttle the send speed. it is currently being discarded. if (mailboxStatus.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY)) { _bufferSize.set( @@ -49,6 +57,10 @@ public void onNext(MailboxStatus mailboxStatus) { } } + public boolean isEarlyTerminated() { + return _isEarlyTerminated; + } + public int getBufferSize() { return _bufferSize.get(); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java index e3fc9cc315cd..27b6cbdf080b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java @@ -142,7 +142,6 @@ protected TransferableBlock getNextBlock() { if (!_hasReturnedAggregateBlock) { return produceAggregatedBlock(); } else { - // TODO: Move to close call. return TransferableBlockUtils.getEndOfStreamTransferableBlock(); } } catch (Exception e) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java index c633783d48a1..438cec849473 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java @@ -81,6 +81,12 @@ public List getMailboxIds() { return _mailboxIds; } + @Override + protected void earlyTerminate() { + _isEarlyTerminated = true; + _multiConsumer.earlyTerminate(); + } + @Override public List getChildOperators() { return Collections.emptyList(); @@ -128,6 +134,11 @@ public void addOnNewDataListener(OnNewData onNewData) { _mailbox.registeredReader(onNewData::newDataAvailable); } + @Override + public void earlyTerminate() { + _mailbox.earlyTerminate(); + } + @Override public void cancel() { _mailbox.cancel(); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java index 32ca46290a3b..295e0c49af21 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java @@ -239,9 +239,8 @@ private void buildBroadcastHashTable() } _currentRowsInHashTable += container.size(); if (_currentRowsInHashTable == _maxRowsInHashTable) { - // Early terminate right table operator. - _rightTableOperator.close(); - break; + // setting only the rightTableOperator to be early terminated and awaits EOS block next. + _rightTableOperator.earlyTerminate(); } rightBlock = _rightTableOperator.nextBlock(); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java index 4e690e404128..35361e48e9bc 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java @@ -126,21 +126,25 @@ protected TransferableBlock getNextBlock() { if (exceptions != null) { return TransferableBlockUtils.getErrorTransferableBlock(exceptions); } - if (resultsBlock != LAST_RESULTS_BLOCK) { + if (_isEarlyTerminated || resultsBlock == LAST_RESULTS_BLOCK) { + return constructMetadataBlock(); + } else { // Regular data block return composeTransferableBlock(resultsBlock, _dataSchema); - } else { - // All data blocks have been returned. Record the stats and return EOS. - Map executionStats = _executionStats; - OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, getOperatorId()); - operatorStats.recordExecutionStats(executionStats); - return TransferableBlockUtils.getEndOfStreamTransferableBlock(); } } catch (Exception e) { return TransferableBlockUtils.getErrorTransferableBlock(e); } } + private TransferableBlock constructMetadataBlock() { + // All data blocks have been returned. Record the stats and return EOS. + Map executionStats = _executionStats; + OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, getOperatorId()); + operatorStats.recordExecutionStats(executionStats); + return TransferableBlockUtils.getEndOfStreamTransferableBlock(); + } + private Future startExecution() { ResultsBlockConsumer resultsBlockConsumer = new ResultsBlockConsumer(); return _executorService.submit(() -> { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java index 029e58e9a2a5..59aeec3e7574 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java @@ -61,7 +61,7 @@ public String toExplainString() { @Override protected TransferableBlock getNextBlock() { - if (!_isLiteralBlockReturned) { + if (!_isLiteralBlockReturned && !_isEarlyTerminated) { _isLiteralBlockReturned = true; return _rexLiteralBlock; } else { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java index 60c751405ca9..ad7913cdc167 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java @@ -42,6 +42,14 @@ public String toExplainString() { @Override protected TransferableBlock getNextBlock() { - return getMultiConsumer().readBlockBlocking(); + TransferableBlock block = getMultiConsumer().readBlockBlocking(); + // When early termination flag is set, caller is expecting an EOS block to be returned, however since the 2 stages + // between sending/receiving mailbox are setting early termination flag asynchronously, there's chances that the + // next block pulled out of the ReceivingMailbox to be an already buffered normal data block. This requires the + // MailboxReceiveOperator to continue pulling and dropping data block until an EOS block is observed. + while (_isEarlyTerminated && !block.isEndOfStreamBlock()) { + block = getMultiConsumer().readBlockBlocking(); + } + return block; } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java index 74fe297a8f69..f6f25510df5a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java @@ -38,7 +38,7 @@ import org.apache.pinot.query.runtime.operator.exchange.BlockExchange; import org.apache.pinot.query.runtime.operator.utils.OperatorUtils; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; -import org.apache.pinot.spi.exception.EarlyTerminationException; +import org.apache.pinot.spi.exception.QueryCancelledException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,14 +123,16 @@ protected TransferableBlock getNextBlock() { // and the receiving opChain will not be able to access the stats from the previous opChain TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock( OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap())); + // no need to check early terminate signal b/c the current block is already EOS sendTransferableBlock(eosBlockWithStats); } else { - sendTransferableBlock(block); + if (sendTransferableBlock(block)) { + earlyTerminate(); + } } return block; - } catch (EarlyTerminationException e) { - // TODO: Query stats are not sent when opChain is early terminated - LOGGER.debug("Early terminating opChain: {}", _context.getId()); + } catch (QueryCancelledException e) { + LOGGER.debug("Query was cancelled! for opChain: {}", _context.getId()); return TransferableBlockUtils.getEndOfStreamTransferableBlock(); } catch (TimeoutException e) { LOGGER.warn("Timed out transferring data on opChain: {}", _context.getId(), e); @@ -147,12 +149,13 @@ protected TransferableBlock getNextBlock() { } } - private void sendTransferableBlock(TransferableBlock block) + private boolean sendTransferableBlock(TransferableBlock block) throws Exception { - _exchange.send(block); + boolean isEarlyTerminated = _exchange.send(block); if (LOGGER.isDebugEnabled()) { LOGGER.debug("==[SEND]== Block " + block + " sent from: " + _context.getId()); } + return isEarlyTerminated; } /** diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java index 84228071c953..ade326ea2049 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java @@ -36,11 +36,13 @@ public abstract class MultiStageOperator implements Operator, protected final OpChainExecutionContext _context; protected final String _operatorId; protected final OpChainStats _opChainStats; + protected boolean _isEarlyTerminated; public MultiStageOperator(OpChainExecutionContext context) { _context = context; _operatorId = Joiner.on("_").join(getClass().getSimpleName(), _context.getStageId(), _context.getServer()); _opChainStats = _context.getStats(); + _isEarlyTerminated = false; } @Override @@ -70,6 +72,13 @@ public String getOperatorId() { // Make it protected because we should always call nextBlock() protected abstract TransferableBlock getNextBlock(); + protected void earlyTerminate() { + _isEarlyTerminated = true; + for (MultiStageOperator child : getChildOperators()) { + child.earlyTerminate(); + } + } + protected boolean shouldCollectStats() { return _context.isTraceEnabled(); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java index d803848a7fa4..64f0926a6386 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java @@ -51,7 +51,7 @@ public class SortOperator extends MultiStageOperator { private final ArrayList _rows; private final int _numRowsToKeep; - private boolean _isSortedBlockConstructed; + private boolean _hasReturnedSortedBlock; private TransferableBlock _upstreamErrorBlock; public SortOperator(OpChainExecutionContext context, MultiStageOperator upstreamOperator, @@ -74,7 +74,7 @@ public SortOperator(OpChainExecutionContext context, MultiStageOperator upstream _offset = Math.max(offset, 0); _dataSchema = dataSchema; _upstreamErrorBlock = null; - _isSortedBlockConstructed = false; + _hasReturnedSortedBlock = false; // Setting numRowsToKeep as default maximum on Broker if limit not set. // TODO: make this default behavior configurable. _numRowsToKeep = _fetch > 0 ? _fetch + _offset : defaultResponseLimit; @@ -123,8 +123,8 @@ private TransferableBlock produceSortedBlock() { return _upstreamErrorBlock; } - if (!_isSortedBlockConstructed) { - _isSortedBlockConstructed = true; + if (!_hasReturnedSortedBlock) { + _hasReturnedSortedBlock = true; if (_priorityQueue == null) { if (_rows.size() > _offset) { List row = _rows.subList(_offset, _rows.size()); @@ -150,7 +150,7 @@ private TransferableBlock produceSortedBlock() { } private void consumeInputBlocks() { - if (!_isSortedBlockConstructed) { + if (!_hasReturnedSortedBlock) { TransferableBlock block = _upstreamOperator.nextBlock(); while (block.isDataBlock()) { List container = block.getContainer(); @@ -164,7 +164,8 @@ private void consumeInputBlocks() { _rows.addAll(container.subList(0, _numRowsToKeep - numRows)); LOGGER.debug("Early terminate at SortOperator - operatorId={}, opChainId={}", _operatorId, _context.getId()); - break; + // setting operator to be early terminated and awaits EOS block next. + earlyTerminate(); } } } else { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java index a4dfde626386..ea9e065416cb 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java @@ -172,7 +172,6 @@ protected TransferableBlock getNextBlock() { if (!_hasReturnedWindowAggregateBlock) { return produceWindowAggregatedBlock(); } else { - // TODO: Move to close call. return TransferableBlockUtils.getEndOfStreamTransferableBlock(); } } catch (Exception e) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java index ae8d0c75d452..453288ecb31f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java @@ -28,7 +28,6 @@ import org.apache.pinot.query.planner.partitioning.KeySelectorFactory; import org.apache.pinot.query.runtime.blocks.BlockSplitter; import org.apache.pinot.query.runtime.blocks.TransferableBlock; -import org.apache.pinot.spi.exception.EarlyTerminationException; /** @@ -68,24 +67,31 @@ protected BlockExchange(List sendingMailboxes, BlockSplitter spl _splitter = splitter; } - public void send(TransferableBlock block) + /** + * API to send a block to the destination mailboxes. + * @param block the block to be transferred + * @return true if all the mailboxes has been early terminated. + * @throws Exception when sending stream unexpectedly closed. + */ + public boolean send(TransferableBlock block) throws Exception { - boolean isEarlyTerminated = true; - for (SendingMailbox sendingMailbox : _sendingMailboxes) { - if (!sendingMailbox.isTerminated()) { - isEarlyTerminated = false; - break; - } - } - if (isEarlyTerminated) { - throw new EarlyTerminationException(); - } if (block.isEndOfStreamBlock()) { for (SendingMailbox sendingMailbox : _sendingMailboxes) { sendBlock(sendingMailbox, block); } + return false; } else { - route(_sendingMailboxes, block); + boolean isEarlyTerminated = true; + for (SendingMailbox sendingMailbox : _sendingMailboxes) { + if (!sendingMailbox.isEarlyTerminated()) { + isEarlyTerminated = false; + break; + } + } + if (!isEarlyTerminated) { + route(_sendingMailboxes, block); + } + return isEarlyTerminated; } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/AsyncStream.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/AsyncStream.java index 7df33ebefc38..287ae7359eb2 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/AsyncStream.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/AsyncStream.java @@ -67,6 +67,11 @@ public interface AsyncStream { */ void cancel(); + /** + * Set this stream to early terminate state, asking for metadata block. + */ + void earlyTerminate(); + interface OnNewData { void newDataAvailable(); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java index 4605dc8d4de4..387a44e75462 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java @@ -71,6 +71,12 @@ public void cancel(Throwable t) { cancelRemainingMailboxes(); } + public void earlyTerminate() { + for (AsyncStream mailbox : _mailboxes) { + mailbox.earlyTerminate(); + } + } + protected void cancelRemainingMailboxes() { for (AsyncStream mailbox : _mailboxes) { mailbox.cancel(); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java index 11616a68f053..8e6b563ac4f2 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java @@ -292,6 +292,31 @@ public void testLocalBufferFull() assertEquals(receivingMailbox.getNumPendingBlocks(), 0); } + @Test + public void testLocalEarlyTerminated() + throws Exception { + String mailboxId = MailboxIdUtils.toMailboxId(_requestId++, SENDER_STAGE_ID, 0, RECEIVER_STAGE_ID, 0); + SendingMailbox sendingMailbox = + _mailboxService1.getSendingMailbox("localhost", _mailboxService1.getPort(), mailboxId, Long.MAX_VALUE); + ReceivingMailbox receivingMailbox = _mailboxService1.getReceivingMailbox(mailboxId); + receivingMailbox.registeredReader(() -> { }); + + // send a block + sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{0})); + // receiving-side early terminates after pulling the first block + TransferableBlock block = receivingMailbox.poll(); + receivingMailbox.earlyTerminate(); + assertNotNull(block); + assertEquals(block.getNumRows(), 1); + // send another block b/c it doesn't guarantee the next block must be EOS + sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{0})); + // send a metadata block + sendingMailbox.send(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + + // sending side should early terminate + assertTrue(sendingMailbox.isEarlyTerminated()); + } + @Test public void testRemoteHappyPathSendFirst() throws Exception { @@ -556,4 +581,34 @@ public void testRemoteBufferFull() assertEquals(numCallbacks.get(), ReceivingMailbox.DEFAULT_MAX_PENDING_BLOCKS + 1); assertEquals(receivingMailbox.getNumPendingBlocks(), 0); } + + @Test + public void testRemoteEarlyTerminated() + throws Exception { + String mailboxId = MailboxIdUtils.toMailboxId(_requestId++, SENDER_STAGE_ID, 0, RECEIVER_STAGE_ID, 0); + + // Sends are non-blocking as long as channel capacity is not breached + SendingMailbox sendingMailbox = + _mailboxService2.getSendingMailbox("localhost", _mailboxService1.getPort(), mailboxId, Long.MAX_VALUE); + ReceivingMailbox receivingMailbox = _mailboxService1.getReceivingMailbox(mailboxId); + receivingMailbox.registeredReader(() -> { }); + + // send a block + sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{0})); + // receiving-side early terminates after pulling the first block + TestUtils.waitForCondition(aVoid -> { + TransferableBlock block = receivingMailbox.poll(); + return block != null && block.getNumRows() == 1; + }, 1000L, "Failed to deliver mails"); + receivingMailbox.earlyTerminate(); + + // send another block b/c it doesn't guarantee the next block must be EOS + sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{0})); + // send a metadata block + sendingMailbox.send(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + sendingMailbox.complete(); + + // sending side should early terminate + TestUtils.waitForCondition(aVoid -> sendingMailbox.isEarlyTerminated(), 1000L, "Failed to early-terminate sender"); + } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java index 9abd2e78b503..88d9cf4fe0ec 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java @@ -646,6 +646,7 @@ public void shouldHandleJoinWithPartialResultsWhenHitDataRowsLimit() { new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = join.nextBlock(); + Mockito.verify(_rightOperator).earlyTerminate(); Assert.assertFalse(result.isErrorBlock()); Assert.assertEquals(result.getNumRows(), 1); Assert.assertTrue(result.getExceptions().get(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE) diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java index 6e3fa310a939..9f5b8dfffe66 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java @@ -44,6 +44,7 @@ import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT; import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -236,4 +237,32 @@ public void shouldGetReceptionReceiveErrorMailbox() { assertTrue(block.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains(errorMessage)); } } + + @Test + public void shouldEarlyTerminateMailboxesWhenIndicated() { + Object[] row1 = new Object[]{1, 1}; + Object[] row2 = new Object[]{2, 2}; + Object[] row3 = new Object[]{3, 3}; + when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1); + when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row1), + OperatorTestUtil.block(DATA_SCHEMA, row3), TransferableBlockUtils.getEndOfStreamTransferableBlock()); + when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_2))).thenReturn(_mailbox2); + when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row2), + TransferableBlockUtils.getEndOfStreamTransferableBlock()); + + OpChainExecutionContext context = + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth); + try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED, + 1)) { + // Receive first block from server1 + assertEquals(receiveOp.nextBlock().getContainer().get(0), row1); + // at this point operator received a signal to early terminate + receiveOp.earlyTerminate(); + // Receive next block should be EOS even if upstream keep sending normal block. + assertTrue(receiveOp.nextBlock().isEndOfStreamBlock()); + // Assure that early terminate signal goes into each mailbox + verify(_mailbox1).earlyTerminate(); + verify(_mailbox2).earlyTerminate(); + } + } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java index c01447d7cdb3..7a49dcf16a1a 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java @@ -39,10 +39,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.openMocks; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertSame; @@ -185,6 +182,22 @@ public void shouldSendDataBlock() assertTrue(resultMetadata.containsKey(mailboxSendOperator.getOperatorId())); } + @Test + public void shouldEarlyTerminateWhenUpstreamWhenIndicated() + throws Exception { + // Given: + TransferableBlock dataBlock = + OperatorTestUtil.block(new DataSchema(new String[]{}, new DataSchema.ColumnDataType[]{})); + when(_sourceOperator.nextBlock()).thenReturn(dataBlock); + doReturn(true).when(_exchange).send(any()); + + // When: + TransferableBlock block = getMailboxSendOperator().nextBlock(); + + // Then: + verify(_sourceOperator).earlyTerminate(); + } + private MailboxSendOperator getMailboxSendOperator() { StageMetadata stageMetadata = new StageMetadata.Builder().setWorkerMetadataList( Collections.singletonList(new WorkerMetadata.Builder().setVirtualServerAddress(_server).build())).build(); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java index db6d18ee3c1f..fb937e623879 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java @@ -375,6 +375,11 @@ public DummyMultiStageCallableOperator(OpChainExecutionContext context, MultiSta _sleepTimeInMillis = sleepTimeInMillis; } + @Override + public List getChildOperators() { + return Collections.singletonList(_upstream); + } + @Override protected TransferableBlock getNextBlock() { try { diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java index c58b394d19a4..03fc1755ba99 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import org.apache.calcite.rel.RelFieldCollation.Direction; @@ -618,6 +619,32 @@ public void shouldHandleMultipleCollationKeysWithNulls() { Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate"); } + @Test + public void shouldEarlyTerminateCorrectlyWithSignalingPropagateUpstream() { + // Given: + List collation = Collections.emptyList(); + List directions = ImmutableList.of(Direction.ASCENDING); + List nullDirections = ImmutableList.of(NullDirection.LAST); + DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); + SortOperator op = + new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0, + schema, false); + + Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1}, new Object[]{2}, new Object[]{3}, + new Object[]{4}, new Object[]{5}, new Object[]{6}, new Object[]{7}, new Object[]{8}, new Object[]{9}, + new Object[]{10}, new Object[]{11}, new Object[]{12})) + .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); + + // When: + TransferableBlock block = op.nextBlock(); // construct + TransferableBlock block2 = op.nextBlock(); // eos + + // Then: + Mockito.verify(_input).earlyTerminate(); + Assert.assertEquals(block.getNumRows(), 10); + Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate"); + } + private static List collation(int... indexes) { return Arrays.stream(indexes).mapToObj(RexExpression.InputRef::new).collect(Collectors.toList()); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java index db7e8cfc55eb..752d8ea4b39a 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java @@ -37,6 +37,8 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.mockito.Mockito.when; + public class BlockExchangeTest { private AutoCloseable _mocks; @@ -99,6 +101,37 @@ public void shouldSendDataBlocksOnlyToTargetDestination() Mockito.verify(_mailbox2, Mockito.never()).send(Mockito.any()); } + @Test + public void shouldSignalEarlyTerminationProperly() + throws Exception { + // Given: + List destinations = ImmutableList.of(_mailbox1, _mailbox2); + BlockExchange exchange = new TestBlockExchange(destinations); + TransferableBlock block = new TransferableBlock(ImmutableList.of(new Object[]{"val"}), + new DataSchema(new String[]{"foo"}, new ColumnDataType[]{ColumnDataType.STRING}), DataBlock.Type.ROW); + + // When send normal block and some mailbox has terminated + when(_mailbox1.isEarlyTerminated()).thenReturn(true); + boolean isEarlyTerminated = exchange.send(block); + + // Then: + Assert.assertFalse(isEarlyTerminated); + + // When send normal block and both terminated + when(_mailbox2.isTerminated()).thenReturn(true); + isEarlyTerminated = exchange.send(block); + + // Then: + Assert.assertFalse(isEarlyTerminated); + + // When send metadata block + when(_mailbox2.isEarlyTerminated()).thenReturn(true); + isEarlyTerminated = exchange.send(block); + + // Then: + Assert.assertTrue(isEarlyTerminated); + } + @Test public void shouldSplitBlocks() throws Exception {