Skip to content

Commit

Permalink
[multistage] framework to back-propagate metadata across opChains (ap…
Browse files Browse the repository at this point in the history
…ache#11746)

* initial commit to alter early termination and signaling backwards
behavior
* add more tests


---------

Co-authored-by: Rong Rong <[email protected]>
  • Loading branch information
walterddr and Rong Rong authored Oct 12, 2023
1 parent ad0d2b1 commit 876e58d
Show file tree
Hide file tree
Showing 27 changed files with 315 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public GrpcSendingMailbox(String id, ChannelManager channelManager, String hostn
@Override
public void send(TransferableBlock block)
throws IOException {
if (isTerminated()) {
if (isTerminated() || (isEarlyTerminated() && !block.isEndOfStreamBlock())) {
return;
}
if (LOGGER.isDebugEnabled()) {
Expand Down Expand Up @@ -101,9 +101,13 @@ public void cancel(Throwable t) {
}
}

@Override
public boolean isEarlyTerminated() {
return _statusObserver.isEarlyTerminated();
}

@Override
public boolean isTerminated() {
// TODO: We cannot differentiate early termination vs stream error
return _statusObserver.isFinished();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.TimeoutException;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.spi.exception.QueryCancelledException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -34,6 +35,7 @@ public class InMemorySendingMailbox implements SendingMailbox {

private ReceivingMailbox _receivingMailbox;
private volatile boolean _isTerminated;
private volatile boolean _isEarlyTerminated;

public InMemorySendingMailbox(String id, MailboxService mailboxService, long deadlineMs) {
_id = id;
Expand All @@ -44,7 +46,7 @@ public InMemorySendingMailbox(String id, MailboxService mailboxService, long dea
@Override
public void send(TransferableBlock block)
throws TimeoutException {
if (_isTerminated) {
if (isTerminated() || (isEarlyTerminated() && !block.isEndOfStreamBlock())) {
return;
}
if (_receivingMailbox == null) {
Expand All @@ -55,13 +57,15 @@ public void send(TransferableBlock block)
switch (status) {
case SUCCESS:
break;
case CANCELLED:
throw new QueryCancelledException(String.format("Mailbox: %s already cancelled from upstream", _id));
case ERROR:
throw new RuntimeException(String.format("Mailbox: %s already errored out (received error block before)", _id));
case TIMEOUT:
throw new TimeoutException(
String.format("Timed out adding block into mailbox: %s with timeout: %dms", _id, timeoutMs));
case EARLY_TERMINATED:
_isTerminated = true;
_isEarlyTerminated = true;
break;
default:
throw new IllegalStateException("Unsupported mailbox status: " + status);
Expand All @@ -70,6 +74,7 @@ public void send(TransferableBlock block)

@Override
public void complete() {
_isTerminated = true;
}

@Override
Expand All @@ -85,6 +90,11 @@ public void cancel(Throwable t) {
new RuntimeException("Cancelled by sender with exception: " + t.getMessage(), t)));
}

@Override
public boolean isEarlyTerminated() {
return _isEarlyTerminated;
}

@Override
public boolean isTerminated() {
return _isTerminated;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class ReceivingMailbox {
// TODO: Revisit if this is the correct way to apply back pressure
private final BlockingQueue<TransferableBlock> _blocks = new ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS);
private final AtomicReference<TransferableBlock> _errorBlock = new AtomicReference<>();
private volatile boolean _isEarlyTerminated = false;

@Nullable
private volatile Reader _reader;

Expand Down Expand Up @@ -78,7 +80,7 @@ public ReceivingMailboxStatus offer(TransferableBlock block, long timeoutMs) {
TransferableBlock errorBlock = _errorBlock.get();
if (errorBlock != null) {
LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring the late block", _id);
return errorBlock == CANCELLED_ERROR_BLOCK ? ReceivingMailboxStatus.EARLY_TERMINATED
return errorBlock == CANCELLED_ERROR_BLOCK ? ReceivingMailboxStatus.CANCELLED
: ReceivingMailboxStatus.ERROR;
}
if (timeoutMs <= 0) {
Expand All @@ -95,11 +97,11 @@ public ReceivingMailboxStatus offer(TransferableBlock block, long timeoutMs) {
LOGGER.debug("==[MAILBOX]== Block " + block + " ready to read from mailbox: " + _id);
}
notifyReader();
return ReceivingMailboxStatus.SUCCESS;
return _isEarlyTerminated ? ReceivingMailboxStatus.EARLY_TERMINATED : ReceivingMailboxStatus.SUCCESS;
} else {
LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring the late block", _id);
_blocks.clear();
return errorBlock == CANCELLED_ERROR_BLOCK ? ReceivingMailboxStatus.EARLY_TERMINATED
return errorBlock == CANCELLED_ERROR_BLOCK ? ReceivingMailboxStatus.CANCELLED
: ReceivingMailboxStatus.ERROR;
}
} else {
Expand Down Expand Up @@ -136,6 +138,13 @@ public TransferableBlock poll() {
return errorBlock != null ? errorBlock : _blocks.poll();
}

/**
* Early terminate the mailbox, called when upstream doesn't expect any more data block.
*/
public void earlyTerminate() {
_isEarlyTerminated = true;
}

/**
* Cancels the mailbox. No more blocks are accepted after calling this method. Should only be called by the receive
* operator to clean up the remaining blocks.
Expand Down Expand Up @@ -166,6 +175,6 @@ public interface Reader {
}

public enum ReceivingMailboxStatus {
SUCCESS, ERROR, TIMEOUT, EARLY_TERMINATED
SUCCESS, ERROR, TIMEOUT, CANCELLED, EARLY_TERMINATED
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,10 @@ void send(TransferableBlock block)
* mailbox is terminated.
*/
boolean isTerminated();

/**
* Returns whether the {@link ReceivingMailbox} is considered itself finished, and is expected a EOS block with
* statistics to be sent next.
*/
boolean isEarlyTerminated();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ private ChannelUtils() {
}

public static final String MAILBOX_METADATA_BUFFER_SIZE_KEY = "buffer.size";
public static final String MAILBOX_METADATA_BEGIN_OF_STREAM_KEY = "begin.of.stream";
public static final String MAILBOX_METADATA_REQUEST_EARLY_TERMINATE = "request.early.terminate";
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ public void onNext(MailboxContent mailboxContent) {
.putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY,
Integer.toString(_mailbox.getNumPendingBlocks())).build());
break;
case CANCELLED:
LOGGER.warn("Mailbox: {} already cancelled from upstream", mailboxId);
cancelStream();
break;
case ERROR:
LOGGER.warn("Mailbox: {} already errored out (received error block before)", mailboxId);
cancelStream();
Expand All @@ -94,7 +98,8 @@ public void onNext(MailboxContent mailboxContent) {
break;
case EARLY_TERMINATED:
LOGGER.debug("Mailbox: {} has been early terminated", mailboxId);
onCompleted();
_responseObserver.onNext(MailboxStatus.newBuilder().setMailboxId(mailboxId)
.putMetadata(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE, "true").build());
break;
default:
throw new IllegalStateException("Unsupported mailbox status: " + status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,19 @@ public class MailboxStatusObserver implements StreamObserver<MailboxStatus> {

private final AtomicInteger _bufferSize = new AtomicInteger(DEFAULT_MAILBOX_QUEUE_CAPACITY);
private final AtomicBoolean _finished = new AtomicBoolean();
private volatile boolean _isEarlyTerminated;

@Override
public void onNext(MailboxStatus mailboxStatus) {
// when received a mailbox status from the receiving end, sending end update the known buffer size available
// so we can make better throughput send judgement. here is a simple example.
// when receiving mailbox receives a data block it will return an updated info of the receiving end status including
// 1. the buffer size available, for back-pressure handling
// 2. status whether there's no need to send any additional data block b/c it considered itself finished.
// -- handle early-terminate EOS request.
if (Boolean.parseBoolean(
mailboxStatus.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE))) {
_isEarlyTerminated = true;
}
// -- handling buffer size back-pressure
// TODO: this feedback info is not used to throttle the send speed. it is currently being discarded.
if (mailboxStatus.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY)) {
_bufferSize.set(
Expand All @@ -49,6 +57,10 @@ public void onNext(MailboxStatus mailboxStatus) {
}
}

public boolean isEarlyTerminated() {
return _isEarlyTerminated;
}

public int getBufferSize() {
return _bufferSize.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ protected TransferableBlock getNextBlock() {
if (!_hasReturnedAggregateBlock) {
return produceAggregatedBlock();
} else {
// TODO: Move to close call.
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ public List<String> getMailboxIds() {
return _mailboxIds;
}

@Override
protected void earlyTerminate() {
_isEarlyTerminated = true;
_multiConsumer.earlyTerminate();
}

@Override
public List<MultiStageOperator> getChildOperators() {
return Collections.emptyList();
Expand Down Expand Up @@ -128,6 +134,11 @@ public void addOnNewDataListener(OnNewData onNewData) {
_mailbox.registeredReader(onNewData::newDataAvailable);
}

@Override
public void earlyTerminate() {
_mailbox.earlyTerminate();
}

@Override
public void cancel() {
_mailbox.cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,8 @@ private void buildBroadcastHashTable()
}
_currentRowsInHashTable += container.size();
if (_currentRowsInHashTable == _maxRowsInHashTable) {
// Early terminate right table operator.
_rightTableOperator.close();
break;
// setting only the rightTableOperator to be early terminated and awaits EOS block next.
_rightTableOperator.earlyTerminate();
}
rightBlock = _rightTableOperator.nextBlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,21 +126,25 @@ protected TransferableBlock getNextBlock() {
if (exceptions != null) {
return TransferableBlockUtils.getErrorTransferableBlock(exceptions);
}
if (resultsBlock != LAST_RESULTS_BLOCK) {
if (_isEarlyTerminated || resultsBlock == LAST_RESULTS_BLOCK) {
return constructMetadataBlock();
} else {
// Regular data block
return composeTransferableBlock(resultsBlock, _dataSchema);
} else {
// All data blocks have been returned. Record the stats and return EOS.
Map<String, String> executionStats = _executionStats;
OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, getOperatorId());
operatorStats.recordExecutionStats(executionStats);
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
}
} catch (Exception e) {
return TransferableBlockUtils.getErrorTransferableBlock(e);
}
}

private TransferableBlock constructMetadataBlock() {
// All data blocks have been returned. Record the stats and return EOS.
Map<String, String> executionStats = _executionStats;
OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, getOperatorId());
operatorStats.recordExecutionStats(executionStats);
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
}

private Future<Void> startExecution() {
ResultsBlockConsumer resultsBlockConsumer = new ResultsBlockConsumer();
return _executorService.submit(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public String toExplainString() {

@Override
protected TransferableBlock getNextBlock() {
if (!_isLiteralBlockReturned) {
if (!_isLiteralBlockReturned && !_isEarlyTerminated) {
_isLiteralBlockReturned = true;
return _rexLiteralBlock;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ public String toExplainString() {

@Override
protected TransferableBlock getNextBlock() {
return getMultiConsumer().readBlockBlocking();
TransferableBlock block = getMultiConsumer().readBlockBlocking();
// When early termination flag is set, caller is expecting an EOS block to be returned, however since the 2 stages
// between sending/receiving mailbox are setting early termination flag asynchronously, there's chances that the
// next block pulled out of the ReceivingMailbox to be an already buffered normal data block. This requires the
// MailboxReceiveOperator to continue pulling and dropping data block until an EOS block is observed.
while (_isEarlyTerminated && !block.isEndOfStreamBlock()) {
block = getMultiConsumer().readBlockBlocking();
}
return block;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.exception.QueryCancelledException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -123,14 +123,16 @@ protected TransferableBlock getNextBlock() {
// and the receiving opChain will not be able to access the stats from the previous opChain
TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock(
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
// no need to check early terminate signal b/c the current block is already EOS
sendTransferableBlock(eosBlockWithStats);
} else {
sendTransferableBlock(block);
if (sendTransferableBlock(block)) {
earlyTerminate();
}
}
return block;
} catch (EarlyTerminationException e) {
// TODO: Query stats are not sent when opChain is early terminated
LOGGER.debug("Early terminating opChain: {}", _context.getId());
} catch (QueryCancelledException e) {
LOGGER.debug("Query was cancelled! for opChain: {}", _context.getId());
return TransferableBlockUtils.getEndOfStreamTransferableBlock();
} catch (TimeoutException e) {
LOGGER.warn("Timed out transferring data on opChain: {}", _context.getId(), e);
Expand All @@ -147,12 +149,13 @@ protected TransferableBlock getNextBlock() {
}
}

private void sendTransferableBlock(TransferableBlock block)
private boolean sendTransferableBlock(TransferableBlock block)
throws Exception {
_exchange.send(block);
boolean isEarlyTerminated = _exchange.send(block);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("==[SEND]== Block " + block + " sent from: " + _context.getId());
}
return isEarlyTerminated;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>,
protected final OpChainExecutionContext _context;
protected final String _operatorId;
protected final OpChainStats _opChainStats;
protected boolean _isEarlyTerminated;

public MultiStageOperator(OpChainExecutionContext context) {
_context = context;
_operatorId = Joiner.on("_").join(getClass().getSimpleName(), _context.getStageId(), _context.getServer());
_opChainStats = _context.getStats();
_isEarlyTerminated = false;
}

@Override
Expand Down Expand Up @@ -70,6 +72,13 @@ public String getOperatorId() {
// Make it protected because we should always call nextBlock()
protected abstract TransferableBlock getNextBlock();

protected void earlyTerminate() {
_isEarlyTerminated = true;
for (MultiStageOperator child : getChildOperators()) {
child.earlyTerminate();
}
}

protected boolean shouldCollectStats() {
return _context.isTraceEnabled();
}
Expand Down
Loading

0 comments on commit 876e58d

Please sign in to comment.