Skip to content

Commit

Permalink
Fix race condition in submitBlockAndAwaitRetainer
Browse files Browse the repository at this point in the history
  • Loading branch information
muXxer committed Mar 25, 2024
1 parent 4ddfa64 commit cab5750
Showing 1 changed file with 54 additions and 35 deletions.
89 changes: 54 additions & 35 deletions pkg/requesthandler/blockissuance.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"
"github.com/iotaledger/iota-core/pkg/protocol/engine/filter/postsolidfilter"
"github.com/iotaledger/iota-core/pkg/protocol/engine/filter/presolidfilter"
"github.com/iotaledger/iota-core/pkg/retainer/txretainer"
iotago "github.com/iotaledger/iota.go/v4"
)

Expand All @@ -27,84 +26,104 @@ func (r *RequestHandler) SubmitBlockWithoutAwaitingBooking(block *model.Block) e

// submitBlockAndAwaitRetainer submits a block to be processed and waits for the block gets retained.
func (r *RequestHandler) submitBlockAndAwaitRetainer(ctx context.Context, block *model.Block) error {
filtered := make(chan error, 1)
exit := make(chan struct{})
defer close(exit)

blockFiltered := make(chan error, 1)
defer close(blockFiltered)

// Make sure we don't wait forever here. If the block is not dispatched to the main engine,
// it will never trigger one of the below events.
processingCtx, processingCtxCancel := context.WithTimeout(ctx, 10*time.Second)
defer processingCtxCancel()

// Calculate the blockID so that we don't capture the block pointer in the event handlers.
blockID := block.ID()

var successUnhook func()
// Hook to TransactionAttached event if the block contains a transaction.
signedTx, isTx := block.SignedTransaction()
if isTx {
// Hook to TransactionRetained event if the block contains a transaction.
var txRetained chan struct{}
var txRetainedUnhook func()

if signedTx, isTx := block.SignedTransaction(); isTx {
txID := signedTx.Transaction.MustID()
// Check if the transaction is already retained. The onTransactionAttached event is only triggered if it's a new transaction.
// If the transaction is already retained, we hook to the BlockRetained event.
_, err := r.protocol.Engines.Main.Get().TxRetainer.TransactionMetadata(txID)
if ierrors.Is(err, txretainer.ErrEntryNotFound) {
successUnhook = r.protocol.Events.Engine.TransactionRetainer.TransactionRetained.Hook(func(transactionID iotago.TransactionID) {
if transactionID != txID {
return
}
select {
case filtered <- nil:
case <-exit:
}
}, event.WithWorkerPool(r.workerPool)).Unhook
}
}

// if no hook was set, hook to the block retained event.
if successUnhook == nil {
successUnhook = r.protocol.Events.Engine.BlockRetainer.BlockRetained.Hook(func(eventBlock *blocks.Block) {
if blockID != eventBlock.ID() {
txRetained = make(chan struct{}, 2) // buffered to 2 to avoid blocking in case of a race condition
defer close(txRetained)

// we hook to the TransactionRetained event first, because there could be a race condition when the transaction gets retained
// the moment we check if the transaction is already retained.
txRetainedUnhook = r.protocol.Events.Engine.TransactionRetainer.TransactionRetained.Hook(func(transactionID iotago.TransactionID) {
if transactionID != txID {
return
}
select {
case filtered <- nil:
case <-exit:
}

// signal that the transaction is retained
txRetained <- struct{}{}
}, event.WithWorkerPool(r.workerPool)).Unhook

// if the transaction is already retained, we don't need to wait for the event because
// the onTransactionAttached event is only triggered if it's a new transaction.
if _, err := r.protocol.Engines.Main.Get().TxRetainer.TransactionMetadata(txID); err == nil {
// signal that the transaction is retained
txRetained <- struct{}{}
}
}

prefilteredUnhook := r.protocol.Events.Engine.PreSolidFilter.BlockPreFiltered.Hook(func(event *presolidfilter.BlockPreFilteredEvent) {
blockRetainedUnhook := r.protocol.Events.Engine.BlockRetainer.BlockRetained.Hook(func(eventBlock *blocks.Block) {
if blockID != eventBlock.ID() {
return
}
select {
case blockFiltered <- nil:
case <-exit:
}
}, event.WithWorkerPool(r.workerPool)).Unhook

blockPreFilteredUnhook := r.protocol.Events.Engine.PreSolidFilter.BlockPreFiltered.Hook(func(event *presolidfilter.BlockPreFilteredEvent) {
if blockID != event.Block.ID() {
return
}
select {
case filtered <- event.Reason:
case blockFiltered <- event.Reason:
case <-exit:
}
}, event.WithWorkerPool(r.workerPool)).Unhook

postfilteredUnhook := r.protocol.Events.Engine.PostSolidFilter.BlockFiltered.Hook(func(event *postsolidfilter.BlockFilteredEvent) {
blockPostFilteredUnhook := r.protocol.Events.Engine.PostSolidFilter.BlockFiltered.Hook(func(event *postsolidfilter.BlockFilteredEvent) {
if blockID != event.Block.ID() {
return
}
select {
case filtered <- event.Reason:
case blockFiltered <- event.Reason:
case <-exit:
}
}, event.WithWorkerPool(r.workerPool)).Unhook

defer lo.BatchReverse(successUnhook, prefilteredUnhook, postfilteredUnhook)()
defer lo.BatchReverse(txRetainedUnhook, blockRetainedUnhook, blockPreFilteredUnhook, blockPostFilteredUnhook)()

if err := r.submitBlock(block); err != nil {
return ierrors.Wrapf(err, "failed to issue block %s", blockID)
}

select {
case <-processingCtx.Done():
return ierrors.Errorf("context canceled whilst waiting for event on block %s", blockID)
case err := <-filtered:

case err := <-blockFiltered:
if err != nil {
return ierrors.Wrapf(err, "block filtered %s", blockID)
}

if txRetained != nil {
select {
case <-processingCtx.Done():
return ierrors.Errorf("context canceled whilst waiting for transaction retained event on block %s", blockID)

case <-txRetained:
// we need to wait for the transaction to be retained before we can return
}
}

return nil
}
}
Expand Down

0 comments on commit cab5750

Please sign in to comment.