Skip to content

Commit

Permalink
Merge pull request #878 from iotaledger/fix/channel-data-race
Browse files Browse the repository at this point in the history
Fix channel data race
  • Loading branch information
muXxer authored Mar 26, 2024
2 parents 4780bb5 + 1e7adb8 commit fde08f1
Showing 1 changed file with 30 additions and 29 deletions.
59 changes: 30 additions & 29 deletions pkg/requesthandler/blockissuance.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ import (
iotago "github.com/iotaledger/iota.go/v4"
)

var (
// errBlockRetained is not really an error but it is used to signal that the block was retained.
errBlockRetained = ierrors.New("block retained")
)

// TODO: make sure an honest validator does not issue blocks within the same slot ratification period in two conflicting chains.
// - this can be achieved by remembering the last issued block together with the engine name/chain.
// - if the engine name/chain is the same we can always issue a block.
Expand All @@ -26,29 +31,27 @@ func (r *RequestHandler) SubmitBlockWithoutAwaitingBooking(block *model.Block) e

// submitBlockAndAwaitRetainer submits a block to be processed and waits for the block gets retained.
func (r *RequestHandler) submitBlockAndAwaitRetainer(ctx context.Context, block *model.Block) error {
exit := make(chan struct{})
defer close(exit)

blockFiltered := make(chan error, 1)
defer close(blockFiltered)
// Calculate the blockID so that we don't capture the block pointer in the event handlers.
blockID := block.ID()

// Make sure we don't wait forever here. If the block is not dispatched to the main engine,
// it will never trigger one of the below events.
processingCtx, processingCtxCancel := context.WithTimeout(ctx, 10*time.Second)
defer processingCtxCancel()

// Calculate the blockID so that we don't capture the block pointer in the event handlers.
blockID := block.ID()
blockCtx, blockCtxCancel := context.WithCancelCause(context.Background())
defer blockCtxCancel(context.Canceled) // make sure the context is canceled when we return to prevent memory leaks

// Hook to TransactionRetained event if the block contains a transaction.
var txRetained chan struct{}
var txCtx context.Context
var txRetainedUnhook func()

if signedTx, isTx := block.SignedTransaction(); isTx {
txID := signedTx.Transaction.MustID()

txRetained = make(chan struct{}, 2) // buffered to 2 to avoid blocking in case of a race condition
defer close(txRetained)
var txCtxCancel context.CancelFunc
txCtx, txCtxCancel = context.WithCancel(context.Background())
defer txCtxCancel() // make sure the context is canceled when we return to prevent memory leaks

// we hook to the TransactionRetained event first, because there could be a race condition when the transaction gets retained
// the moment we check if the transaction is already retained.
Expand All @@ -58,45 +61,42 @@ func (r *RequestHandler) submitBlockAndAwaitRetainer(ctx context.Context, block
}

// signal that the transaction is retained
txRetained <- struct{}{}
txCtxCancel()
}, event.WithWorkerPool(r.workerPool)).Unhook

// if the transaction is already retained, we don't need to wait for the event because
// the onTransactionAttached event is only triggered if it's a new transaction.
if _, err := r.protocol.Engines.Main.Get().TxRetainer.TransactionMetadata(txID); err == nil {
// signal that the transaction is retained
txRetained <- struct{}{}
txCtxCancel()
}
}

blockRetainedUnhook := r.protocol.Events.Engine.BlockRetainer.BlockRetained.Hook(func(eventBlock *blocks.Block) {
if blockID != eventBlock.ID() {
return
}
select {
case blockFiltered <- nil:
case <-exit:
}

// signal that block was retained
blockCtxCancel(errBlockRetained)
}, event.WithWorkerPool(r.workerPool)).Unhook

blockPreFilteredUnhook := r.protocol.Events.Engine.PreSolidFilter.BlockPreFiltered.Hook(func(event *presolidfilter.BlockPreFilteredEvent) {
if blockID != event.Block.ID() {
return
}
select {
case blockFiltered <- event.Reason:
case <-exit:
}

// signal that block was filtered
blockCtxCancel(event.Reason)
}, event.WithWorkerPool(r.workerPool)).Unhook

blockPostFilteredUnhook := r.protocol.Events.Engine.PostSolidFilter.BlockFiltered.Hook(func(event *postsolidfilter.BlockFilteredEvent) {
if blockID != event.Block.ID() {
return
}
select {
case blockFiltered <- event.Reason:
case <-exit:
}

// signal that block was filtered
blockCtxCancel(event.Reason)
}, event.WithWorkerPool(r.workerPool)).Unhook

defer lo.BatchReverse(txRetainedUnhook, blockRetainedUnhook, blockPreFilteredUnhook, blockPostFilteredUnhook)()
Expand All @@ -113,12 +113,12 @@ func (r *RequestHandler) submitBlockAndAwaitRetainer(ctx context.Context, block

return ierrors.Errorf("context canceled whilst waiting for event on block %s", blockID)

case err := <-blockFiltered:
if err != nil {
case <-blockCtx.Done():
if err := context.Cause(blockCtx); !ierrors.Is(err, errBlockRetained) {
return ierrors.Wrapf(err, "block filtered %s", blockID)
}

if txRetained != nil {
if txCtx != nil {
select {
case <-processingCtx.Done():
if ierrors.Is(processingCtx.Err(), context.DeadlineExceeded) {
Expand All @@ -127,8 +127,9 @@ func (r *RequestHandler) submitBlockAndAwaitRetainer(ctx context.Context, block

return ierrors.Errorf("context canceled whilst waiting for transaction retained event on block %s", blockID)

case <-txRetained:
// we need to wait for the transaction to be retained before we can return
case <-txCtx.Done():
// we need to wait for the transaction to be retained before we can return.
// the txCtx can only be canceled manually, so we can be sure that the transaction was retained.
}
}

Expand Down

0 comments on commit fde08f1

Please sign in to comment.