Skip to content

Commit

Permalink
Merge pull request #929 from iotaledger/feat/blockmetadata-stream
Browse files Browse the repository at this point in the history
Replace block metadata topics with `ListenToBlockMetadata`
  • Loading branch information
muXxer authored Apr 24, 2024
2 parents 986e436 + 86018b4 commit 76fb277
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 72 deletions.
89 changes: 34 additions & 55 deletions components/inx/server_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"google.golang.org/grpc/status"

"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/hive.go/runtime/contextutils"
"github.com/iotaledger/hive.go/runtime/event"
"github.com/iotaledger/hive.go/runtime/workerpool"
Expand Down Expand Up @@ -47,7 +48,7 @@ func (s *Server) ListenToBlocks(_ *inx.NoParams, srv inx.INX_ListenToBlocksServe

wp := workerpool.New("ListenToBlocks", workerpool.WithWorkerCount(workerCount)).Start()

unhook := deps.Protocol.Events.Engine.Booker.BlockBooked.Hook(func(block *blocks.Block) {
unhook := deps.Protocol.Events.Engine.BlockRetainer.BlockRetained.Hook(func(block *blocks.Block) {
payload := inx.NewBlockWithBytes(block.ID(), block.ModelBlock().Data())

if ctx.Err() != nil {
Expand All @@ -73,22 +74,13 @@ func (s *Server) ListenToBlocks(_ *inx.NoParams, srv inx.INX_ListenToBlocksServe
return ctx.Err()
}

func (s *Server) ListenToAcceptedBlocks(_ *inx.NoParams, srv inx.INX_ListenToAcceptedBlocksServer) error {
func (s *Server) ListenToBlockMetadata(_ *inx.NoParams, srv inx.INX_ListenToBlockMetadataServer) error {
ctx, cancel := context.WithCancel(Component.Daemon().ContextStopped())

wp := workerpool.New("ListenToAcceptedBlocks", workerpool.WithWorkerCount(workerCount)).Start()
wp := workerpool.New("ListenToBlockMetadata", workerpool.WithWorkerCount(workerCount)).Start()

unhook := deps.Protocol.Events.Engine.BlockRetainer.BlockAccepted.Hook(func(block *blocks.Block) {
payload, err := inx.WrapBlockMetadata(&api.BlockMetadataResponse{
BlockID: block.ID(),
BlockState: api.BlockStateAccepted,
})
if err != nil {
Component.LogErrorf("get block metadata error: %v", err)
cancel()

return
}
sendBlockMetadata := func(blockMetadata *api.BlockMetadataResponse) {
payload := inx.WrapBlockMetadata(blockMetadata)

if ctx.Err() != nil {
// context is done, so we don't need to send the payload
Expand All @@ -99,47 +91,34 @@ func (s *Server) ListenToAcceptedBlocks(_ *inx.NoParams, srv inx.INX_ListenToAcc
Component.LogErrorf("send error: %v", err)
cancel()
}
}, event.WithWorkerPool(wp)).Unhook

<-ctx.Done()
unhook()

// We need to wait until all tasks are done, otherwise we might call
// "SendMsg" and "CloseSend" in parallel on the grpc stream, which is
// not safe according to the grpc docs.
wp.Shutdown()
wp.ShutdownComplete.Wait()

return ctx.Err()
}

func (s *Server) ListenToConfirmedBlocks(_ *inx.NoParams, srv inx.INX_ListenToConfirmedBlocksServer) error {
ctx, cancel := context.WithCancel(Component.Daemon().ContextStopped())

wp := workerpool.New("ListenToConfirmedBlocks", workerpool.WithWorkerCount(workerCount)).Start()

unhook := deps.Protocol.Events.Engine.BlockRetainer.BlockConfirmed.Hook(func(block *blocks.Block) {
payload, err := inx.WrapBlockMetadata(&api.BlockMetadataResponse{
BlockID: block.ID(),
BlockState: api.BlockStateConfirmed,
})
if err != nil {
Component.LogErrorf("get block metadata error: %v", err)
cancel()

return
}

if ctx.Err() != nil {
// context is done, so we don't need to send the payload
return
}
}

if err := srv.Send(payload); err != nil {
Component.LogErrorf("send error: %v", err)
cancel()
}
}, event.WithWorkerPool(wp)).Unhook
unhook := lo.Batch(
deps.Protocol.Events.Engine.BlockRetainer.BlockRetained.Hook(func(block *blocks.Block) {
sendBlockMetadata(&api.BlockMetadataResponse{
BlockID: block.ID(),
BlockState: api.BlockStatePending,
})
}, event.WithWorkerPool(wp)).Unhook,
deps.Protocol.Events.Engine.BlockRetainer.BlockAccepted.Hook(func(block *blocks.Block) {
sendBlockMetadata(&api.BlockMetadataResponse{
BlockID: block.ID(),
BlockState: api.BlockStateAccepted,
})
}, event.WithWorkerPool(wp)).Unhook,
deps.Protocol.Events.Engine.BlockRetainer.BlockConfirmed.Hook(func(block *blocks.Block) {
sendBlockMetadata(&api.BlockMetadataResponse{
BlockID: block.ID(),
BlockState: api.BlockStateConfirmed,
})
}, event.WithWorkerPool(wp)).Unhook,
deps.Protocol.Events.Engine.BlockRetainer.BlockDropped.Hook(func(block *blocks.Block) {
sendBlockMetadata(&api.BlockMetadataResponse{
BlockID: block.ID(),
BlockState: api.BlockStateDropped,
})
}, event.WithWorkerPool(wp)).Unhook,
)

<-ctx.Done()
unhook()
Expand Down Expand Up @@ -207,5 +186,5 @@ func getINXBlockMetadata(blockID iotago.BlockID) (*inx.BlockMetadata, error) {
return nil, ierrors.Wrap(err, "failed to get BlockMetadata")
}

return inx.WrapBlockMetadata(blockMetadata)
return inx.WrapBlockMetadata(blockMetadata), nil
}
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ require (
github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20240419094509-31dbb7270ad9
github.com/iotaledger/hive.go/sql v0.0.0-20240419094509-31dbb7270ad9
github.com/iotaledger/hive.go/stringify v0.0.0-20240419094509-31dbb7270ad9
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240419103152-aa12c9f5bd66
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240419095729-912f1c2df45d
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240423111221-4248ffa5d850
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240423105148-cc9e62fe4f63
github.com/iotaledger/iota.go/v4 v4.0.0-20240419095144-054bd7d2ba61
github.com/labstack/echo/v4 v4.12.0
github.com/labstack/gommon v0.4.2
Expand All @@ -41,6 +41,7 @@ require (
github.com/sajari/regression v1.0.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7
github.com/zyedidia/generic v1.2.1
go.uber.org/atomic v1.11.0
go.uber.org/dig v1.17.1
Expand Down Expand Up @@ -169,7 +170,6 @@ require (
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.25.0 // indirect
go.opentelemetry.io/otel/metric v1.25.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,10 @@ github.com/iotaledger/hive.go/sql v0.0.0-20240419094509-31dbb7270ad9 h1:vcYko6aV
github.com/iotaledger/hive.go/sql v0.0.0-20240419094509-31dbb7270ad9/go.mod h1:4mRjOeG4Opy+5E8PeMOzXNGZVnunSdubcyK61j+E+Yk=
github.com/iotaledger/hive.go/stringify v0.0.0-20240419094509-31dbb7270ad9 h1:NJE7qTEiG/fAQzn5aF0MucA2yQx3CitVBZVP0sh4WpQ=
github.com/iotaledger/hive.go/stringify v0.0.0-20240419094509-31dbb7270ad9/go.mod h1:O4p7UmsfoeLqtAUwrKbq0lXMxjY/MLQSpZSavvvvGig=
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240419103152-aa12c9f5bd66 h1:28gH76448EjukxCz1H0OIbM0Yeoq0HP2jk4+v1tDcWQ=
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240419103152-aa12c9f5bd66/go.mod h1:aQWBB1p5CLWKFWBTXB6TwSGZu3piuNHTjhWYyE3H22I=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240419095729-912f1c2df45d h1:aTLIfyVtJHLMKgYEUY0tPNBv+B522JZbttH1DslX2ck=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240419095729-912f1c2df45d/go.mod h1:YYko1kTtJgfETXQqWHgJkHQv6gGYGDxjnwDC6FbXxic=
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240423111221-4248ffa5d850 h1:PBFs3UuwpCdd7jqHozVx2/UMJCQ6fwZeIzkedv1bum4=
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240423111221-4248ffa5d850/go.mod h1:kk+TNI0FkHRkSHuLXMkAmnbdxZjmizZgVo1vE2fXXJ8=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240423105148-cc9e62fe4f63 h1:vt8LvpthPv2iVgIDzHN0N3Gee5+KEmqm/3eeF5G6hyA=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240423105148-cc9e62fe4f63/go.mod h1:YYko1kTtJgfETXQqWHgJkHQv6gGYGDxjnwDC6FbXxic=
github.com/iotaledger/iota-crypto-demo v0.0.0-20240419094816-40260bb800f7 h1:R7ogCKTQ2D5SfVoE6n9GQUsKwm4dcxqwnU863JVlVbw=
github.com/iotaledger/iota-crypto-demo v0.0.0-20240419094816-40260bb800f7/go.mod h1:ntqq5J5Fu2SijiqPsjjdFkMm96UhGU/K0z3j6ARpHec=
github.com/iotaledger/iota.go/v4 v4.0.0-20240419095144-054bd7d2ba61 h1:vC1YXh2b8WleeAJvqf76PtBDvOXNIaI2Xdn0eLi2YFU=
Expand Down
12 changes: 9 additions & 3 deletions pkg/retainer/blockretainer/block_retainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func NewProvider() module.Provider[*engine.Engine, retainer.BlockRetainer] {
}, asyncOpt)

e.Events.Scheduler.BlockDropped.Hook(func(block *blocks.Block, _ error) {
if err := r.OnBlockDropped(block.ID()); err != nil {
if err := r.OnBlockDropped(block); err != nil {
r.errorHandler(ierrors.Wrap(err, "failed to store on BlockDropped in retainer"))
}
})
Expand Down Expand Up @@ -216,8 +216,14 @@ func (r *BlockRetainer) OnBlockConfirmed(block *blocks.Block) error {
return nil
}

func (r *BlockRetainer) OnBlockDropped(blockID iotago.BlockID) error {
return r.UpdateBlockMetadata(blockID, api.BlockStateDropped)
func (r *BlockRetainer) OnBlockDropped(block *blocks.Block) error {
if err := r.UpdateBlockMetadata(block.ID(), api.BlockStateDropped); err != nil {
return err
}

r.events.BlockDropped.Trigger(block)

return nil
}

func (r *BlockRetainer) UpdateBlockMetadata(blockID iotago.BlockID, state api.BlockState) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/retainer/blockretainer/tests/testframework.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (tf *TestFramework) triggerBlockRetainerAction(alias string, act action) er
case eventConfirmed:
err = tf.Instance.OnBlockConfirmed(tf.getBlock(alias))
case eventDropped:
err = tf.Instance.OnBlockDropped(tf.getBlockID(alias))
err = tf.Instance.OnBlockDropped(tf.getBlock(alias))
default:
err = ierrors.Errorf("unknown action %d", act)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/retainer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type BlockRetainerEvents struct {
BlockAccepted *event.Event1[*blocks.Block]
// BlockConfirmed is triggered when a block is stored in the retainer caused by a block confirmed event.
BlockConfirmed *event.Event1[*blocks.Block]
// BlockDropped is triggered when a block is dropped.
BlockDropped *event.Event1[*blocks.Block]

event.Group[BlockRetainerEvents, *BlockRetainerEvents]
}
Expand All @@ -24,6 +26,7 @@ var NewBlockRetainerEvents = event.CreateGroupConstructor(func() (newEvents *Blo
BlockRetained: event.New1[*blocks.Block](),
BlockAccepted: event.New1[*blocks.Block](),
BlockConfirmed: event.New1[*blocks.Block](),
BlockDropped: event.New1[*blocks.Block](),
}
})

Expand Down
4 changes: 2 additions & 2 deletions tools/gendoc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ require (
github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20240419094509-31dbb7270ad9 // indirect
github.com/iotaledger/hive.go/sql v0.0.0-20240419094509-31dbb7270ad9 // indirect
github.com/iotaledger/hive.go/stringify v0.0.0-20240419094509-31dbb7270ad9 // indirect
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240419103152-aa12c9f5bd66 // indirect
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240419095729-912f1c2df45d // indirect
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240423111221-4248ffa5d850 // indirect
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240423105148-cc9e62fe4f63 // indirect
github.com/iotaledger/iota-crypto-demo v0.0.0-20240419094816-40260bb800f7 // indirect
github.com/iotaledger/iota.go/v4 v4.0.0-20240419095144-054bd7d2ba61 // indirect
github.com/ipfs/boxo v0.19.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions tools/gendoc/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,10 @@ github.com/iotaledger/hive.go/sql v0.0.0-20240419094509-31dbb7270ad9 h1:vcYko6aV
github.com/iotaledger/hive.go/sql v0.0.0-20240419094509-31dbb7270ad9/go.mod h1:4mRjOeG4Opy+5E8PeMOzXNGZVnunSdubcyK61j+E+Yk=
github.com/iotaledger/hive.go/stringify v0.0.0-20240419094509-31dbb7270ad9 h1:NJE7qTEiG/fAQzn5aF0MucA2yQx3CitVBZVP0sh4WpQ=
github.com/iotaledger/hive.go/stringify v0.0.0-20240419094509-31dbb7270ad9/go.mod h1:O4p7UmsfoeLqtAUwrKbq0lXMxjY/MLQSpZSavvvvGig=
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240419103152-aa12c9f5bd66 h1:28gH76448EjukxCz1H0OIbM0Yeoq0HP2jk4+v1tDcWQ=
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240419103152-aa12c9f5bd66/go.mod h1:aQWBB1p5CLWKFWBTXB6TwSGZu3piuNHTjhWYyE3H22I=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240419095729-912f1c2df45d h1:aTLIfyVtJHLMKgYEUY0tPNBv+B522JZbttH1DslX2ck=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240419095729-912f1c2df45d/go.mod h1:YYko1kTtJgfETXQqWHgJkHQv6gGYGDxjnwDC6FbXxic=
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240423111221-4248ffa5d850 h1:PBFs3UuwpCdd7jqHozVx2/UMJCQ6fwZeIzkedv1bum4=
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240423111221-4248ffa5d850/go.mod h1:kk+TNI0FkHRkSHuLXMkAmnbdxZjmizZgVo1vE2fXXJ8=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240423105148-cc9e62fe4f63 h1:vt8LvpthPv2iVgIDzHN0N3Gee5+KEmqm/3eeF5G6hyA=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240423105148-cc9e62fe4f63/go.mod h1:YYko1kTtJgfETXQqWHgJkHQv6gGYGDxjnwDC6FbXxic=
github.com/iotaledger/iota-crypto-demo v0.0.0-20240419094816-40260bb800f7 h1:R7ogCKTQ2D5SfVoE6n9GQUsKwm4dcxqwnU863JVlVbw=
github.com/iotaledger/iota-crypto-demo v0.0.0-20240419094816-40260bb800f7/go.mod h1:ntqq5J5Fu2SijiqPsjjdFkMm96UhGU/K0z3j6ARpHec=
github.com/iotaledger/iota.go/v4 v4.0.0-20240419095144-054bd7d2ba61 h1:vC1YXh2b8WleeAJvqf76PtBDvOXNIaI2Xdn0eLi2YFU=
Expand Down

0 comments on commit 76fb277

Please sign in to comment.