diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java index 808caba04ed9..a88e12273958 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java @@ -47,7 +47,7 @@ public abstract class BaseMailboxReceiveOperator extends MultiStageOperator { protected final MailboxService _mailboxService; protected final RelDistribution.Type _exchangeType; protected final List _mailboxIds; - private final BlockingMultiStreamConsumer.OfTransferableBlock _multiConsumer; + protected final BlockingMultiStreamConsumer.OfTransferableBlock _multiConsumer; public BaseMailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType, int senderStageId) { @@ -73,14 +73,6 @@ public BaseMailboxReceiveOperator(OpChainExecutionContext context, RelDistributi new BlockingMultiStreamConsumer.OfTransferableBlock(context.getId(), context.getDeadlineMs(), asyncStreams); } - protected BlockingMultiStreamConsumer.OfTransferableBlock getMultiConsumer() { - return _multiConsumer; - } - - public List getMailboxIds() { - return _mailboxIds; - } - @Override protected void earlyTerminate() { _isEarlyTerminated = true; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java index ad7913cdc167..584b49640f3d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java @@ -42,13 +42,13 @@ public String toExplainString() { @Override protected TransferableBlock getNextBlock() { - TransferableBlock block = getMultiConsumer().readBlockBlocking(); + TransferableBlock block = _multiConsumer.readBlockBlocking(); // When early termination flag is set, caller is expecting an EOS block to be returned, however since the 2 stages // between sending/receiving mailbox are setting early termination flag asynchronously, there's chances that the // next block pulled out of the ReceivingMailbox to be an already buffered normal data block. This requires the // MailboxReceiveOperator to continue pulling and dropping data block until an EOS block is observed. while (_isEarlyTerminated && !block.isEndOfStreamBlock()) { - block = getMultiConsumer().readBlockBlocking(); + block = _multiConsumer.readBlockBlocking(); } return block; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java index 05804c12d33f..8949ad569a40 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java @@ -52,7 +52,7 @@ public class SortedMailboxReceiveOperator extends BaseMailboxReceiveOperator { private final boolean _isSortOnSender; private final List _rows = new ArrayList<>(); - private boolean _isSortedBlockConstructed; + private TransferableBlock _eosBlock; public SortedMailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType, DataSchema dataSchema, List collationKeys, List collationDirections, @@ -74,20 +74,25 @@ public String toExplainString() { @Override protected TransferableBlock getNextBlock() { - while (true) { // loop in order to keep asking if we receive data blocks - TransferableBlock block = getMultiConsumer().readBlockBlocking(); + if (_eosBlock != null) { + return _eosBlock; + } + // Collect all the rows from the mailbox and sort them + while (true) { + TransferableBlock block = _multiConsumer.readBlockBlocking(); if (block.isDataBlock()) { _rows.addAll(block.getContainer()); } else if (block.isErrorBlock()) { return block; } else { assert block.isSuccessfulEndOfStreamBlock(); - - if (!_isSortedBlockConstructed && !_rows.isEmpty()) { + if (!_rows.isEmpty()) { + _eosBlock = block; + // TODO: This might not be efficient because we are sorting all the received rows. We should use a k-way merge + // when sender side is sorted. _rows.sort( new SortUtils.SortComparator(_collationKeys, _collationDirections, _collationNullDirections, _dataSchema, false)); - _isSortedBlockConstructed = true; return new TransferableBlock(_rows, _dataSchema, DataBlock.Type.ROW); } else { return block;