From cab57500047b5dbf3632a9fa52e68160d4d40870 Mon Sep 17 00:00:00 2001 From: muXxer Date: Mon, 25 Mar 2024 15:37:51 +0100 Subject: [PATCH 1/2] Fix race condition in `submitBlockAndAwaitRetainer` --- pkg/requesthandler/blockissuance.go | 89 +++++++++++++++++------------ 1 file changed, 54 insertions(+), 35 deletions(-) diff --git a/pkg/requesthandler/blockissuance.go b/pkg/requesthandler/blockissuance.go index 4bb949bd4..67dab810c 100644 --- a/pkg/requesthandler/blockissuance.go +++ b/pkg/requesthandler/blockissuance.go @@ -11,7 +11,6 @@ import ( "github.com/iotaledger/iota-core/pkg/protocol/engine/blocks" "github.com/iotaledger/iota-core/pkg/protocol/engine/filter/postsolidfilter" "github.com/iotaledger/iota-core/pkg/protocol/engine/filter/presolidfilter" - "github.com/iotaledger/iota-core/pkg/retainer/txretainer" iotago "github.com/iotaledger/iota.go/v4" ) @@ -27,84 +26,104 @@ func (r *RequestHandler) SubmitBlockWithoutAwaitingBooking(block *model.Block) e // submitBlockAndAwaitRetainer submits a block to be processed and waits for the block gets retained. func (r *RequestHandler) submitBlockAndAwaitRetainer(ctx context.Context, block *model.Block) error { - filtered := make(chan error, 1) exit := make(chan struct{}) defer close(exit) + blockFiltered := make(chan error, 1) + defer close(blockFiltered) + // Make sure we don't wait forever here. If the block is not dispatched to the main engine, // it will never trigger one of the below events. processingCtx, processingCtxCancel := context.WithTimeout(ctx, 10*time.Second) defer processingCtxCancel() + // Calculate the blockID so that we don't capture the block pointer in the event handlers. blockID := block.ID() - var successUnhook func() - // Hook to TransactionAttached event if the block contains a transaction. - signedTx, isTx := block.SignedTransaction() - if isTx { + // Hook to TransactionRetained event if the block contains a transaction. + var txRetained chan struct{} + var txRetainedUnhook func() + + if signedTx, isTx := block.SignedTransaction(); isTx { txID := signedTx.Transaction.MustID() - // Check if the transaction is already retained. The onTransactionAttached event is only triggered if it's a new transaction. - // If the transaction is already retained, we hook to the BlockRetained event. - _, err := r.protocol.Engines.Main.Get().TxRetainer.TransactionMetadata(txID) - if ierrors.Is(err, txretainer.ErrEntryNotFound) { - successUnhook = r.protocol.Events.Engine.TransactionRetainer.TransactionRetained.Hook(func(transactionID iotago.TransactionID) { - if transactionID != txID { - return - } - select { - case filtered <- nil: - case <-exit: - } - }, event.WithWorkerPool(r.workerPool)).Unhook - } - } - // if no hook was set, hook to the block retained event. - if successUnhook == nil { - successUnhook = r.protocol.Events.Engine.BlockRetainer.BlockRetained.Hook(func(eventBlock *blocks.Block) { - if blockID != eventBlock.ID() { + txRetained = make(chan struct{}, 2) // buffered to 2 to avoid blocking in case of a race condition + defer close(txRetained) + + // we hook to the TransactionRetained event first, because there could be a race condition when the transaction gets retained + // the moment we check if the transaction is already retained. + txRetainedUnhook = r.protocol.Events.Engine.TransactionRetainer.TransactionRetained.Hook(func(transactionID iotago.TransactionID) { + if transactionID != txID { return } - select { - case filtered <- nil: - case <-exit: - } + + // signal that the transaction is retained + txRetained <- struct{}{} }, event.WithWorkerPool(r.workerPool)).Unhook + + // if the transaction is already retained, we don't need to wait for the event because + // the onTransactionAttached event is only triggered if it's a new transaction. + if _, err := r.protocol.Engines.Main.Get().TxRetainer.TransactionMetadata(txID); err == nil { + // signal that the transaction is retained + txRetained <- struct{}{} + } } - prefilteredUnhook := r.protocol.Events.Engine.PreSolidFilter.BlockPreFiltered.Hook(func(event *presolidfilter.BlockPreFilteredEvent) { + blockRetainedUnhook := r.protocol.Events.Engine.BlockRetainer.BlockRetained.Hook(func(eventBlock *blocks.Block) { + if blockID != eventBlock.ID() { + return + } + select { + case blockFiltered <- nil: + case <-exit: + } + }, event.WithWorkerPool(r.workerPool)).Unhook + + blockPreFilteredUnhook := r.protocol.Events.Engine.PreSolidFilter.BlockPreFiltered.Hook(func(event *presolidfilter.BlockPreFilteredEvent) { if blockID != event.Block.ID() { return } select { - case filtered <- event.Reason: + case blockFiltered <- event.Reason: case <-exit: } }, event.WithWorkerPool(r.workerPool)).Unhook - postfilteredUnhook := r.protocol.Events.Engine.PostSolidFilter.BlockFiltered.Hook(func(event *postsolidfilter.BlockFilteredEvent) { + blockPostFilteredUnhook := r.protocol.Events.Engine.PostSolidFilter.BlockFiltered.Hook(func(event *postsolidfilter.BlockFilteredEvent) { if blockID != event.Block.ID() { return } select { - case filtered <- event.Reason: + case blockFiltered <- event.Reason: case <-exit: } }, event.WithWorkerPool(r.workerPool)).Unhook - defer lo.BatchReverse(successUnhook, prefilteredUnhook, postfilteredUnhook)() + defer lo.BatchReverse(txRetainedUnhook, blockRetainedUnhook, blockPreFilteredUnhook, blockPostFilteredUnhook)() if err := r.submitBlock(block); err != nil { return ierrors.Wrapf(err, "failed to issue block %s", blockID) } + select { case <-processingCtx.Done(): return ierrors.Errorf("context canceled whilst waiting for event on block %s", blockID) - case err := <-filtered: + + case err := <-blockFiltered: if err != nil { return ierrors.Wrapf(err, "block filtered %s", blockID) } + if txRetained != nil { + select { + case <-processingCtx.Done(): + return ierrors.Errorf("context canceled whilst waiting for transaction retained event on block %s", blockID) + + case <-txRetained: + // we need to wait for the transaction to be retained before we can return + } + } + return nil } } From 9eb6357d6b468d5990a837ad2a1f60a2ae654d5d Mon Sep 17 00:00:00 2001 From: muXxer Date: Mon, 25 Mar 2024 15:48:22 +0100 Subject: [PATCH 2/2] Add error message if deadline was exceeded --- pkg/requesthandler/blockissuance.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/requesthandler/blockissuance.go b/pkg/requesthandler/blockissuance.go index 67dab810c..80b261c54 100644 --- a/pkg/requesthandler/blockissuance.go +++ b/pkg/requesthandler/blockissuance.go @@ -107,6 +107,10 @@ func (r *RequestHandler) submitBlockAndAwaitRetainer(ctx context.Context, block select { case <-processingCtx.Done(): + if ierrors.Is(processingCtx.Err(), context.DeadlineExceeded) { + return ierrors.Errorf("context deadline exceeded whilst waiting for event on block %s", blockID) + } + return ierrors.Errorf("context canceled whilst waiting for event on block %s", blockID) case err := <-blockFiltered: @@ -117,6 +121,10 @@ func (r *RequestHandler) submitBlockAndAwaitRetainer(ctx context.Context, block if txRetained != nil { select { case <-processingCtx.Done(): + if ierrors.Is(processingCtx.Err(), context.DeadlineExceeded) { + return ierrors.Errorf("context deadline exceeded whilst waiting for transaction retained event on block %s", blockID) + } + return ierrors.Errorf("context canceled whilst waiting for transaction retained event on block %s", blockID) case <-txRetained: