From 86018b455e0ae4397ac1e4f28d92b6ad7999851f Mon Sep 17 00:00:00 2001 From: muXxer Date: Tue, 23 Apr 2024 13:16:29 +0200 Subject: [PATCH] Replace block metadata topics with `ListenToBlockMetadata` --- components/inx/server_blocks.go | 89 +++++++------------ go.mod | 6 +- go.sum | 8 +- pkg/retainer/blockretainer/block_retainer.go | 12 ++- .../blockretainer/tests/testframework.go | 2 +- pkg/retainer/events.go | 3 + tools/gendoc/go.mod | 4 +- tools/gendoc/go.sum | 8 +- 8 files changed, 60 insertions(+), 72 deletions(-) diff --git a/components/inx/server_blocks.go b/components/inx/server_blocks.go index 29fd3d6c4..6366c6033 100644 --- a/components/inx/server_blocks.go +++ b/components/inx/server_blocks.go @@ -7,6 +7,7 @@ import ( "google.golang.org/grpc/status" "github.com/iotaledger/hive.go/ierrors" + "github.com/iotaledger/hive.go/lo" "github.com/iotaledger/hive.go/runtime/contextutils" "github.com/iotaledger/hive.go/runtime/event" "github.com/iotaledger/hive.go/runtime/workerpool" @@ -47,7 +48,7 @@ func (s *Server) ListenToBlocks(_ *inx.NoParams, srv inx.INX_ListenToBlocksServe wp := workerpool.New("ListenToBlocks", workerpool.WithWorkerCount(workerCount)).Start() - unhook := deps.Protocol.Events.Engine.Booker.BlockBooked.Hook(func(block *blocks.Block) { + unhook := deps.Protocol.Events.Engine.BlockRetainer.BlockRetained.Hook(func(block *blocks.Block) { payload := inx.NewBlockWithBytes(block.ID(), block.ModelBlock().Data()) if ctx.Err() != nil { @@ -73,22 +74,13 @@ func (s *Server) ListenToBlocks(_ *inx.NoParams, srv inx.INX_ListenToBlocksServe return ctx.Err() } -func (s *Server) ListenToAcceptedBlocks(_ *inx.NoParams, srv inx.INX_ListenToAcceptedBlocksServer) error { +func (s *Server) ListenToBlockMetadata(_ *inx.NoParams, srv inx.INX_ListenToBlockMetadataServer) error { ctx, cancel := context.WithCancel(Component.Daemon().ContextStopped()) - wp := workerpool.New("ListenToAcceptedBlocks", workerpool.WithWorkerCount(workerCount)).Start() + wp := workerpool.New("ListenToBlockMetadata", workerpool.WithWorkerCount(workerCount)).Start() - unhook := deps.Protocol.Events.Engine.BlockRetainer.BlockAccepted.Hook(func(block *blocks.Block) { - payload, err := inx.WrapBlockMetadata(&api.BlockMetadataResponse{ - BlockID: block.ID(), - BlockState: api.BlockStateAccepted, - }) - if err != nil { - Component.LogErrorf("get block metadata error: %v", err) - cancel() - - return - } + sendBlockMetadata := func(blockMetadata *api.BlockMetadataResponse) { + payload := inx.WrapBlockMetadata(blockMetadata) if ctx.Err() != nil { // context is done, so we don't need to send the payload @@ -99,47 +91,34 @@ func (s *Server) ListenToAcceptedBlocks(_ *inx.NoParams, srv inx.INX_ListenToAcc Component.LogErrorf("send error: %v", err) cancel() } - }, event.WithWorkerPool(wp)).Unhook - - <-ctx.Done() - unhook() - - // We need to wait until all tasks are done, otherwise we might call - // "SendMsg" and "CloseSend" in parallel on the grpc stream, which is - // not safe according to the grpc docs. - wp.Shutdown() - wp.ShutdownComplete.Wait() - - return ctx.Err() -} - -func (s *Server) ListenToConfirmedBlocks(_ *inx.NoParams, srv inx.INX_ListenToConfirmedBlocksServer) error { - ctx, cancel := context.WithCancel(Component.Daemon().ContextStopped()) - - wp := workerpool.New("ListenToConfirmedBlocks", workerpool.WithWorkerCount(workerCount)).Start() - - unhook := deps.Protocol.Events.Engine.BlockRetainer.BlockConfirmed.Hook(func(block *blocks.Block) { - payload, err := inx.WrapBlockMetadata(&api.BlockMetadataResponse{ - BlockID: block.ID(), - BlockState: api.BlockStateConfirmed, - }) - if err != nil { - Component.LogErrorf("get block metadata error: %v", err) - cancel() - - return - } - - if ctx.Err() != nil { - // context is done, so we don't need to send the payload - return - } + } - if err := srv.Send(payload); err != nil { - Component.LogErrorf("send error: %v", err) - cancel() - } - }, event.WithWorkerPool(wp)).Unhook + unhook := lo.Batch( + deps.Protocol.Events.Engine.BlockRetainer.BlockRetained.Hook(func(block *blocks.Block) { + sendBlockMetadata(&api.BlockMetadataResponse{ + BlockID: block.ID(), + BlockState: api.BlockStatePending, + }) + }, event.WithWorkerPool(wp)).Unhook, + deps.Protocol.Events.Engine.BlockRetainer.BlockAccepted.Hook(func(block *blocks.Block) { + sendBlockMetadata(&api.BlockMetadataResponse{ + BlockID: block.ID(), + BlockState: api.BlockStateAccepted, + }) + }, event.WithWorkerPool(wp)).Unhook, + deps.Protocol.Events.Engine.BlockRetainer.BlockConfirmed.Hook(func(block *blocks.Block) { + sendBlockMetadata(&api.BlockMetadataResponse{ + BlockID: block.ID(), + BlockState: api.BlockStateConfirmed, + }) + }, event.WithWorkerPool(wp)).Unhook, + deps.Protocol.Events.Engine.BlockRetainer.BlockDropped.Hook(func(block *blocks.Block) { + sendBlockMetadata(&api.BlockMetadataResponse{ + BlockID: block.ID(), + BlockState: api.BlockStateDropped, + }) + }, event.WithWorkerPool(wp)).Unhook, + ) <-ctx.Done() unhook() @@ -207,5 +186,5 @@ func getINXBlockMetadata(blockID iotago.BlockID) (*inx.BlockMetadata, error) { return nil, ierrors.Wrap(err, "failed to get BlockMetadata") } - return inx.WrapBlockMetadata(blockMetadata) + return inx.WrapBlockMetadata(blockMetadata), nil } diff --git a/go.mod b/go.mod index 401de3525..0a3585079 100644 --- a/go.mod +++ b/go.mod @@ -26,8 +26,8 @@ require ( github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20240419094509-31dbb7270ad9 github.com/iotaledger/hive.go/sql v0.0.0-20240419094509-31dbb7270ad9 github.com/iotaledger/hive.go/stringify v0.0.0-20240419094509-31dbb7270ad9 - github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240419103152-aa12c9f5bd66 - github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240419095729-912f1c2df45d + github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240423111221-4248ffa5d850 + github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240423105148-cc9e62fe4f63 github.com/iotaledger/iota.go/v4 v4.0.0-20240419095144-054bd7d2ba61 github.com/labstack/echo/v4 v4.12.0 github.com/labstack/gommon v0.4.2 @@ -41,6 +41,7 @@ require ( github.com/sajari/regression v1.0.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.9.0 + github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 github.com/zyedidia/generic v1.2.1 go.uber.org/atomic v1.11.0 go.uber.org/dig v1.17.1 @@ -169,7 +170,6 @@ require ( github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect - github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/otel v1.25.0 // indirect go.opentelemetry.io/otel/metric v1.25.0 // indirect diff --git a/go.sum b/go.sum index 255b57515..1d7f9c78e 100644 --- a/go.sum +++ b/go.sum @@ -321,10 +321,10 @@ github.com/iotaledger/hive.go/sql v0.0.0-20240419094509-31dbb7270ad9 h1:vcYko6aV github.com/iotaledger/hive.go/sql v0.0.0-20240419094509-31dbb7270ad9/go.mod h1:4mRjOeG4Opy+5E8PeMOzXNGZVnunSdubcyK61j+E+Yk= github.com/iotaledger/hive.go/stringify v0.0.0-20240419094509-31dbb7270ad9 h1:NJE7qTEiG/fAQzn5aF0MucA2yQx3CitVBZVP0sh4WpQ= github.com/iotaledger/hive.go/stringify v0.0.0-20240419094509-31dbb7270ad9/go.mod h1:O4p7UmsfoeLqtAUwrKbq0lXMxjY/MLQSpZSavvvvGig= -github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240419103152-aa12c9f5bd66 h1:28gH76448EjukxCz1H0OIbM0Yeoq0HP2jk4+v1tDcWQ= -github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240419103152-aa12c9f5bd66/go.mod h1:aQWBB1p5CLWKFWBTXB6TwSGZu3piuNHTjhWYyE3H22I= -github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240419095729-912f1c2df45d h1:aTLIfyVtJHLMKgYEUY0tPNBv+B522JZbttH1DslX2ck= -github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240419095729-912f1c2df45d/go.mod h1:YYko1kTtJgfETXQqWHgJkHQv6gGYGDxjnwDC6FbXxic= +github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240423111221-4248ffa5d850 h1:PBFs3UuwpCdd7jqHozVx2/UMJCQ6fwZeIzkedv1bum4= +github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240423111221-4248ffa5d850/go.mod h1:kk+TNI0FkHRkSHuLXMkAmnbdxZjmizZgVo1vE2fXXJ8= +github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240423105148-cc9e62fe4f63 h1:vt8LvpthPv2iVgIDzHN0N3Gee5+KEmqm/3eeF5G6hyA= +github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240423105148-cc9e62fe4f63/go.mod h1:YYko1kTtJgfETXQqWHgJkHQv6gGYGDxjnwDC6FbXxic= github.com/iotaledger/iota-crypto-demo v0.0.0-20240419094816-40260bb800f7 h1:R7ogCKTQ2D5SfVoE6n9GQUsKwm4dcxqwnU863JVlVbw= github.com/iotaledger/iota-crypto-demo v0.0.0-20240419094816-40260bb800f7/go.mod h1:ntqq5J5Fu2SijiqPsjjdFkMm96UhGU/K0z3j6ARpHec= github.com/iotaledger/iota.go/v4 v4.0.0-20240419095144-054bd7d2ba61 h1:vC1YXh2b8WleeAJvqf76PtBDvOXNIaI2Xdn0eLi2YFU= diff --git a/pkg/retainer/blockretainer/block_retainer.go b/pkg/retainer/blockretainer/block_retainer.go index e54363ff1..dce778194 100644 --- a/pkg/retainer/blockretainer/block_retainer.go +++ b/pkg/retainer/blockretainer/block_retainer.go @@ -86,7 +86,7 @@ func NewProvider() module.Provider[*engine.Engine, retainer.BlockRetainer] { }, asyncOpt) e.Events.Scheduler.BlockDropped.Hook(func(block *blocks.Block, _ error) { - if err := r.OnBlockDropped(block.ID()); err != nil { + if err := r.OnBlockDropped(block); err != nil { r.errorHandler(ierrors.Wrap(err, "failed to store on BlockDropped in retainer")) } }) @@ -216,8 +216,14 @@ func (r *BlockRetainer) OnBlockConfirmed(block *blocks.Block) error { return nil } -func (r *BlockRetainer) OnBlockDropped(blockID iotago.BlockID) error { - return r.UpdateBlockMetadata(blockID, api.BlockStateDropped) +func (r *BlockRetainer) OnBlockDropped(block *blocks.Block) error { + if err := r.UpdateBlockMetadata(block.ID(), api.BlockStateDropped); err != nil { + return err + } + + r.events.BlockDropped.Trigger(block) + + return nil } func (r *BlockRetainer) UpdateBlockMetadata(blockID iotago.BlockID, state api.BlockState) error { diff --git a/pkg/retainer/blockretainer/tests/testframework.go b/pkg/retainer/blockretainer/tests/testframework.go index 8634d2079..b36b7d3a2 100644 --- a/pkg/retainer/blockretainer/tests/testframework.go +++ b/pkg/retainer/blockretainer/tests/testframework.go @@ -159,7 +159,7 @@ func (tf *TestFramework) triggerBlockRetainerAction(alias string, act action) er case eventConfirmed: err = tf.Instance.OnBlockConfirmed(tf.getBlock(alias)) case eventDropped: - err = tf.Instance.OnBlockDropped(tf.getBlockID(alias)) + err = tf.Instance.OnBlockDropped(tf.getBlock(alias)) default: err = ierrors.Errorf("unknown action %d", act) } diff --git a/pkg/retainer/events.go b/pkg/retainer/events.go index 639c79e26..c63d3d196 100644 --- a/pkg/retainer/events.go +++ b/pkg/retainer/events.go @@ -14,6 +14,8 @@ type BlockRetainerEvents struct { BlockAccepted *event.Event1[*blocks.Block] // BlockConfirmed is triggered when a block is stored in the retainer caused by a block confirmed event. BlockConfirmed *event.Event1[*blocks.Block] + // BlockDropped is triggered when a block is dropped. + BlockDropped *event.Event1[*blocks.Block] event.Group[BlockRetainerEvents, *BlockRetainerEvents] } @@ -24,6 +26,7 @@ var NewBlockRetainerEvents = event.CreateGroupConstructor(func() (newEvents *Blo BlockRetained: event.New1[*blocks.Block](), BlockAccepted: event.New1[*blocks.Block](), BlockConfirmed: event.New1[*blocks.Block](), + BlockDropped: event.New1[*blocks.Block](), } }) diff --git a/tools/gendoc/go.mod b/tools/gendoc/go.mod index 890088130..225d817d5 100644 --- a/tools/gendoc/go.mod +++ b/tools/gendoc/go.mod @@ -71,8 +71,8 @@ require ( github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20240419094509-31dbb7270ad9 // indirect github.com/iotaledger/hive.go/sql v0.0.0-20240419094509-31dbb7270ad9 // indirect github.com/iotaledger/hive.go/stringify v0.0.0-20240419094509-31dbb7270ad9 // indirect - github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240419103152-aa12c9f5bd66 // indirect - github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240419095729-912f1c2df45d // indirect + github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240423111221-4248ffa5d850 // indirect + github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240423105148-cc9e62fe4f63 // indirect github.com/iotaledger/iota-crypto-demo v0.0.0-20240419094816-40260bb800f7 // indirect github.com/iotaledger/iota.go/v4 v4.0.0-20240419095144-054bd7d2ba61 // indirect github.com/ipfs/boxo v0.19.0 // indirect diff --git a/tools/gendoc/go.sum b/tools/gendoc/go.sum index c3376551d..546132ede 100644 --- a/tools/gendoc/go.sum +++ b/tools/gendoc/go.sum @@ -325,10 +325,10 @@ github.com/iotaledger/hive.go/sql v0.0.0-20240419094509-31dbb7270ad9 h1:vcYko6aV github.com/iotaledger/hive.go/sql v0.0.0-20240419094509-31dbb7270ad9/go.mod h1:4mRjOeG4Opy+5E8PeMOzXNGZVnunSdubcyK61j+E+Yk= github.com/iotaledger/hive.go/stringify v0.0.0-20240419094509-31dbb7270ad9 h1:NJE7qTEiG/fAQzn5aF0MucA2yQx3CitVBZVP0sh4WpQ= github.com/iotaledger/hive.go/stringify v0.0.0-20240419094509-31dbb7270ad9/go.mod h1:O4p7UmsfoeLqtAUwrKbq0lXMxjY/MLQSpZSavvvvGig= -github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240419103152-aa12c9f5bd66 h1:28gH76448EjukxCz1H0OIbM0Yeoq0HP2jk4+v1tDcWQ= -github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240419103152-aa12c9f5bd66/go.mod h1:aQWBB1p5CLWKFWBTXB6TwSGZu3piuNHTjhWYyE3H22I= -github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240419095729-912f1c2df45d h1:aTLIfyVtJHLMKgYEUY0tPNBv+B522JZbttH1DslX2ck= -github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240419095729-912f1c2df45d/go.mod h1:YYko1kTtJgfETXQqWHgJkHQv6gGYGDxjnwDC6FbXxic= +github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240423111221-4248ffa5d850 h1:PBFs3UuwpCdd7jqHozVx2/UMJCQ6fwZeIzkedv1bum4= +github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240423111221-4248ffa5d850/go.mod h1:kk+TNI0FkHRkSHuLXMkAmnbdxZjmizZgVo1vE2fXXJ8= +github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240423105148-cc9e62fe4f63 h1:vt8LvpthPv2iVgIDzHN0N3Gee5+KEmqm/3eeF5G6hyA= +github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240423105148-cc9e62fe4f63/go.mod h1:YYko1kTtJgfETXQqWHgJkHQv6gGYGDxjnwDC6FbXxic= github.com/iotaledger/iota-crypto-demo v0.0.0-20240419094816-40260bb800f7 h1:R7ogCKTQ2D5SfVoE6n9GQUsKwm4dcxqwnU863JVlVbw= github.com/iotaledger/iota-crypto-demo v0.0.0-20240419094816-40260bb800f7/go.mod h1:ntqq5J5Fu2SijiqPsjjdFkMm96UhGU/K0z3j6ARpHec= github.com/iotaledger/iota.go/v4 v4.0.0-20240419095144-054bd7d2ba61 h1:vC1YXh2b8WleeAJvqf76PtBDvOXNIaI2Xdn0eLi2YFU=