diff --git a/pkg/requesthandler/blockissuance.go b/pkg/requesthandler/blockissuance.go index 80b261c54..a703b0e77 100644 --- a/pkg/requesthandler/blockissuance.go +++ b/pkg/requesthandler/blockissuance.go @@ -14,6 +14,11 @@ import ( iotago "github.com/iotaledger/iota.go/v4" ) +var ( + // errBlockRetained is not really an error but it is used to signal that the block was retained. + errBlockRetained = ierrors.New("block retained") +) + // TODO: make sure an honest validator does not issue blocks within the same slot ratification period in two conflicting chains. // - this can be achieved by remembering the last issued block together with the engine name/chain. // - if the engine name/chain is the same we can always issue a block. @@ -26,29 +31,27 @@ func (r *RequestHandler) SubmitBlockWithoutAwaitingBooking(block *model.Block) e // submitBlockAndAwaitRetainer submits a block to be processed and waits for the block gets retained. func (r *RequestHandler) submitBlockAndAwaitRetainer(ctx context.Context, block *model.Block) error { - exit := make(chan struct{}) - defer close(exit) - - blockFiltered := make(chan error, 1) - defer close(blockFiltered) + // Calculate the blockID so that we don't capture the block pointer in the event handlers. + blockID := block.ID() // Make sure we don't wait forever here. If the block is not dispatched to the main engine, // it will never trigger one of the below events. processingCtx, processingCtxCancel := context.WithTimeout(ctx, 10*time.Second) defer processingCtxCancel() - // Calculate the blockID so that we don't capture the block pointer in the event handlers. - blockID := block.ID() + blockCtx, blockCtxCancel := context.WithCancelCause(context.Background()) + defer blockCtxCancel(context.Canceled) // make sure the context is canceled when we return to prevent memory leaks // Hook to TransactionRetained event if the block contains a transaction. - var txRetained chan struct{} + var txCtx context.Context var txRetainedUnhook func() if signedTx, isTx := block.SignedTransaction(); isTx { txID := signedTx.Transaction.MustID() - txRetained = make(chan struct{}, 2) // buffered to 2 to avoid blocking in case of a race condition - defer close(txRetained) + var txCtxCancel context.CancelFunc + txCtx, txCtxCancel = context.WithCancel(context.Background()) + defer txCtxCancel() // make sure the context is canceled when we return to prevent memory leaks // we hook to the TransactionRetained event first, because there could be a race condition when the transaction gets retained // the moment we check if the transaction is already retained. @@ -58,14 +61,14 @@ func (r *RequestHandler) submitBlockAndAwaitRetainer(ctx context.Context, block } // signal that the transaction is retained - txRetained <- struct{}{} + txCtxCancel() }, event.WithWorkerPool(r.workerPool)).Unhook // if the transaction is already retained, we don't need to wait for the event because // the onTransactionAttached event is only triggered if it's a new transaction. if _, err := r.protocol.Engines.Main.Get().TxRetainer.TransactionMetadata(txID); err == nil { // signal that the transaction is retained - txRetained <- struct{}{} + txCtxCancel() } } @@ -73,30 +76,27 @@ func (r *RequestHandler) submitBlockAndAwaitRetainer(ctx context.Context, block if blockID != eventBlock.ID() { return } - select { - case blockFiltered <- nil: - case <-exit: - } + + // signal that block was retained + blockCtxCancel(errBlockRetained) }, event.WithWorkerPool(r.workerPool)).Unhook blockPreFilteredUnhook := r.protocol.Events.Engine.PreSolidFilter.BlockPreFiltered.Hook(func(event *presolidfilter.BlockPreFilteredEvent) { if blockID != event.Block.ID() { return } - select { - case blockFiltered <- event.Reason: - case <-exit: - } + + // signal that block was filtered + blockCtxCancel(event.Reason) }, event.WithWorkerPool(r.workerPool)).Unhook blockPostFilteredUnhook := r.protocol.Events.Engine.PostSolidFilter.BlockFiltered.Hook(func(event *postsolidfilter.BlockFilteredEvent) { if blockID != event.Block.ID() { return } - select { - case blockFiltered <- event.Reason: - case <-exit: - } + + // signal that block was filtered + blockCtxCancel(event.Reason) }, event.WithWorkerPool(r.workerPool)).Unhook defer lo.BatchReverse(txRetainedUnhook, blockRetainedUnhook, blockPreFilteredUnhook, blockPostFilteredUnhook)() @@ -113,12 +113,12 @@ func (r *RequestHandler) submitBlockAndAwaitRetainer(ctx context.Context, block return ierrors.Errorf("context canceled whilst waiting for event on block %s", blockID) - case err := <-blockFiltered: - if err != nil { + case <-blockCtx.Done(): + if err := context.Cause(blockCtx); !ierrors.Is(err, errBlockRetained) { return ierrors.Wrapf(err, "block filtered %s", blockID) } - if txRetained != nil { + if txCtx != nil { select { case <-processingCtx.Done(): if ierrors.Is(processingCtx.Err(), context.DeadlineExceeded) { @@ -127,8 +127,9 @@ func (r *RequestHandler) submitBlockAndAwaitRetainer(ctx context.Context, block return ierrors.Errorf("context canceled whilst waiting for transaction retained event on block %s", blockID) - case <-txRetained: - // we need to wait for the transaction to be retained before we can return + case <-txCtx.Done(): + // we need to wait for the transaction to be retained before we can return. + // the txCtx can only be canceled manually, so we can be sure that the transaction was retained. } }