Skip to content

Commit

Permalink
Merge pull request #895 from iotaledger/fix/inx-block-state
Browse files Browse the repository at this point in the history
Add `BlockAccepted` and `BlockConfirmed` events to blockretainer to fix data race in INX
  • Loading branch information
muXxer authored Mar 28, 2024
2 parents 737aa28 + 5c58b00 commit d5e0427
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 18 deletions.
4 changes: 2 additions & 2 deletions components/inx/server_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (s *Server) ListenToAcceptedBlocks(_ *inx.NoParams, srv inx.INX_ListenToAcc

wp := workerpool.New("ListenToAcceptedBlocks", workerpool.WithWorkerCount(workerCount)).Start()

unhook := deps.Protocol.Events.Engine.BlockGadget.BlockAccepted.Hook(func(block *blocks.Block) {
unhook := deps.Protocol.Events.Engine.BlockRetainer.BlockAccepted.Hook(func(block *blocks.Block) {
payload, err := inx.WrapBlockMetadata(&api.BlockMetadataResponse{
BlockID: block.ID(),
BlockState: api.BlockStateAccepted,
Expand Down Expand Up @@ -118,7 +118,7 @@ func (s *Server) ListenToConfirmedBlocks(_ *inx.NoParams, srv inx.INX_ListenToCo

wp := workerpool.New("ListenToConfirmedBlocks", workerpool.WithWorkerCount(workerCount)).Start()

unhook := deps.Protocol.Events.Engine.BlockGadget.BlockConfirmed.Hook(func(block *blocks.Block) {
unhook := deps.Protocol.Events.Engine.BlockRetainer.BlockConfirmed.Hook(func(block *blocks.Block) {
payload, err := inx.WrapBlockMetadata(&api.BlockMetadataResponse{
BlockID: block.ID(),
BlockState: api.BlockStateConfirmed,
Expand Down
38 changes: 25 additions & 13 deletions pkg/retainer/blockretainer/block_retainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,26 +67,26 @@ func NewProvider() module.Provider[*engine.Engine, retainer.BlockRetainer] {
asyncOpt := event.WithWorkerPool(r.workerPool)

e.ConstructedEvent().OnTrigger(func() {
e.Events.Booker.BlockBooked.Hook(func(b *blocks.Block) {
if err := r.OnBlockBooked(b); err != nil {
e.Events.Booker.BlockBooked.Hook(func(block *blocks.Block) {
if err := r.OnBlockBooked(block); err != nil {
r.errorHandler(ierrors.Wrap(err, "failed to store on BlockBooked in retainer"))
}
}, asyncOpt)

e.Events.BlockGadget.BlockAccepted.Hook(func(b *blocks.Block) {
if err := r.OnBlockAccepted(b.ID()); err != nil {
e.Events.BlockGadget.BlockAccepted.Hook(func(block *blocks.Block) {
if err := r.OnBlockAccepted(block); err != nil {
r.errorHandler(ierrors.Wrap(err, "failed to store on BlockAccepted in retainer"))
}
}, asyncOpt)

e.Events.BlockGadget.BlockConfirmed.Hook(func(b *blocks.Block) {
if err := r.OnBlockConfirmed(b.ID()); err != nil {
e.Events.BlockGadget.BlockConfirmed.Hook(func(block *blocks.Block) {
if err := r.OnBlockConfirmed(block); err != nil {
r.errorHandler(ierrors.Wrap(err, "failed to store on BlockConfirmed in retainer"))
}
}, asyncOpt)

e.Events.Scheduler.BlockDropped.Hook(func(b *blocks.Block, _ error) {
if err := r.OnBlockDropped(b.ID()); err != nil {
e.Events.Scheduler.BlockDropped.Hook(func(block *blocks.Block, _ error) {
if err := r.OnBlockDropped(block.ID()); err != nil {
r.errorHandler(ierrors.Wrap(err, "failed to store on BlockDropped in retainer"))
}
})
Expand All @@ -98,7 +98,7 @@ func NewProvider() module.Provider[*engine.Engine, retainer.BlockRetainer] {
}
}, asyncOpt)

e.Events.BlockRetainer.BlockRetained.LinkTo(r.events.BlockRetained)
e.Events.BlockRetainer.LinkTo(r.events)

r.InitializedEvent().Trigger()
})
Expand Down Expand Up @@ -196,12 +196,24 @@ func (r *BlockRetainer) setBlockBooked(blockID iotago.BlockID) error {
return r.UpdateBlockMetadata(blockID, api.BlockStatePending)
}

func (r *BlockRetainer) OnBlockAccepted(blockID iotago.BlockID) error {
return r.UpdateBlockMetadata(blockID, api.BlockStateAccepted)
func (r *BlockRetainer) OnBlockAccepted(block *blocks.Block) error {
if err := r.UpdateBlockMetadata(block.ID(), api.BlockStateAccepted); err != nil {
return err
}

r.events.BlockAccepted.Trigger(block)

return nil
}

func (r *BlockRetainer) OnBlockConfirmed(blockID iotago.BlockID) error {
return r.UpdateBlockMetadata(blockID, api.BlockStateConfirmed)
func (r *BlockRetainer) OnBlockConfirmed(block *blocks.Block) error {
if err := r.UpdateBlockMetadata(block.ID(), api.BlockStateConfirmed); err != nil {
return err
}

r.events.BlockConfirmed.Trigger(block)

return nil
}

func (r *BlockRetainer) OnBlockDropped(blockID iotago.BlockID) error {
Expand Down
17 changes: 15 additions & 2 deletions pkg/retainer/blockretainer/tests/testframework.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type TestFramework struct {
api iotago.API
test *testing.T

testBlocks map[string]*blocks.Block
testBlockIDs map[string]iotago.BlockID

lastCommittedSlot iotago.SlotIndex
Expand All @@ -52,6 +53,7 @@ func NewTestFramework(t *testing.T) *TestFramework {
tf := &TestFramework{
stores: make(map[iotago.SlotIndex]*slotstore.BlockMetadataStore),
lastCommittedSlot: iotago.SlotIndex(0),
testBlocks: make(map[string]*blocks.Block),
testBlockIDs: make(map[string]iotago.BlockID),
api: iotago.V3API(
iotago.NewV3SnapshotProtocolParameters(
Expand Down Expand Up @@ -94,6 +96,16 @@ func (tf *TestFramework) finalizeSlot(slot iotago.SlotIndex) {
tf.lastFinalizedSlot = slot
}

func (tf *TestFramework) getBlock(alias string) *blocks.Block {
if block, exists := tf.testBlocks[alias]; exists {
return block
}

require.Errorf(tf.test, nil, "model block not found in the test framework")

return nil
}

func (tf *TestFramework) getBlockID(alias string) iotago.BlockID {
if blkID, exists := tf.testBlockIDs[alias]; exists {
return blkID
Expand All @@ -112,6 +124,7 @@ func (tf *TestFramework) createBlock(alias string, slot iotago.SlotIndex) *block

block := blocks.NewBlock(modelBlock)

tf.testBlocks[alias] = block
tf.testBlockIDs[alias] = block.ID()

return block
Expand Down Expand Up @@ -142,9 +155,9 @@ func (tf *TestFramework) triggerBlockRetainerAction(alias string, act action) er
case none:
// no action
case eventAccepted:
err = tf.Instance.OnBlockAccepted(tf.getBlockID(alias))
err = tf.Instance.OnBlockAccepted(tf.getBlock(alias))
case eventConfirmed:
err = tf.Instance.OnBlockConfirmed(tf.getBlockID(alias))
err = tf.Instance.OnBlockConfirmed(tf.getBlock(alias))
case eventDropped:
err = tf.Instance.OnBlockDropped(tf.getBlockID(alias))
default:
Expand Down
8 changes: 7 additions & 1 deletion pkg/retainer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,20 @@ import (
type BlockRetainerEvents struct {
// BlockRetained is triggered when a block is stored in the retainer.
BlockRetained *event.Event1[*blocks.Block]
// BlockAccepted is triggered when a block is stored in the retainer caused by a block acceptance event.
BlockAccepted *event.Event1[*blocks.Block]
// BlockConfirmed is triggered when a block is stored in the retainer caused by a block confirmed event.
BlockConfirmed *event.Event1[*blocks.Block]

event.Group[BlockRetainerEvents, *BlockRetainerEvents]
}

// NewBlockRetainerEvents contains the constructor of the Events object (it is generated by a generic factory).
var NewBlockRetainerEvents = event.CreateGroupConstructor(func() (newEvents *BlockRetainerEvents) {
return &BlockRetainerEvents{
BlockRetained: event.New1[*blocks.Block](),
BlockRetained: event.New1[*blocks.Block](),
BlockAccepted: event.New1[*blocks.Block](),
BlockConfirmed: event.New1[*blocks.Block](),
}
})

Expand Down

0 comments on commit d5e0427

Please sign in to comment.