Skip to content

Commit

Permalink
[Multi-stage] Fix SortedMailboxReceiveOperator to not pull 2 EOS bloc…
Browse files Browse the repository at this point in the history
…ks (apache#12406)
  • Loading branch information
Jackie-Jiang authored Feb 14, 2024
1 parent b38fcee commit a5ae4d7
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public abstract class BaseMailboxReceiveOperator extends MultiStageOperator {
protected final MailboxService _mailboxService;
protected final RelDistribution.Type _exchangeType;
protected final List<String> _mailboxIds;
private final BlockingMultiStreamConsumer.OfTransferableBlock _multiConsumer;
protected final BlockingMultiStreamConsumer.OfTransferableBlock _multiConsumer;

public BaseMailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType,
int senderStageId) {
Expand All @@ -73,14 +73,6 @@ public BaseMailboxReceiveOperator(OpChainExecutionContext context, RelDistributi
new BlockingMultiStreamConsumer.OfTransferableBlock(context.getId(), context.getDeadlineMs(), asyncStreams);
}

protected BlockingMultiStreamConsumer.OfTransferableBlock getMultiConsumer() {
return _multiConsumer;
}

public List<String> getMailboxIds() {
return _mailboxIds;
}

@Override
protected void earlyTerminate() {
_isEarlyTerminated = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ public String toExplainString() {

@Override
protected TransferableBlock getNextBlock() {
TransferableBlock block = getMultiConsumer().readBlockBlocking();
TransferableBlock block = _multiConsumer.readBlockBlocking();
// When early termination flag is set, caller is expecting an EOS block to be returned, however since the 2 stages
// between sending/receiving mailbox are setting early termination flag asynchronously, there's chances that the
// next block pulled out of the ReceivingMailbox to be an already buffered normal data block. This requires the
// MailboxReceiveOperator to continue pulling and dropping data block until an EOS block is observed.
while (_isEarlyTerminated && !block.isEndOfStreamBlock()) {
block = getMultiConsumer().readBlockBlocking();
block = _multiConsumer.readBlockBlocking();
}
return block;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class SortedMailboxReceiveOperator extends BaseMailboxReceiveOperator {
private final boolean _isSortOnSender;
private final List<Object[]> _rows = new ArrayList<>();

private boolean _isSortedBlockConstructed;
private TransferableBlock _eosBlock;

public SortedMailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType,
DataSchema dataSchema, List<RexExpression> collationKeys, List<Direction> collationDirections,
Expand All @@ -74,20 +74,25 @@ public String toExplainString() {

@Override
protected TransferableBlock getNextBlock() {
while (true) { // loop in order to keep asking if we receive data blocks
TransferableBlock block = getMultiConsumer().readBlockBlocking();
if (_eosBlock != null) {
return _eosBlock;
}
// Collect all the rows from the mailbox and sort them
while (true) {
TransferableBlock block = _multiConsumer.readBlockBlocking();
if (block.isDataBlock()) {
_rows.addAll(block.getContainer());
} else if (block.isErrorBlock()) {
return block;
} else {
assert block.isSuccessfulEndOfStreamBlock();

if (!_isSortedBlockConstructed && !_rows.isEmpty()) {
if (!_rows.isEmpty()) {
_eosBlock = block;
// TODO: This might not be efficient because we are sorting all the received rows. We should use a k-way merge
// when sender side is sorted.
_rows.sort(
new SortUtils.SortComparator(_collationKeys, _collationDirections, _collationNullDirections, _dataSchema,
false));
_isSortedBlockConstructed = true;
return new TransferableBlock(_rows, _dataSchema, DataBlock.Type.ROW);
} else {
return block;
Expand Down

0 comments on commit a5ae4d7

Please sign in to comment.