diff --git a/components/debugapi/component.go b/components/debugapi/component.go index e20696db0..8aa6e4f52 100644 --- a/components/debugapi/component.go +++ b/components/debugapi/component.go @@ -86,7 +86,7 @@ func configure() error { debugAPIWorkerPool := workerpool.NewGroup("DebugAPI").CreatePool("DebugAPI", workerpool.WithWorkerCount(1)) - deps.Protocol.Events.Engine.Retainer.BlockRetained.Hook(func(block *blocks.Block) { + deps.Protocol.Events.Engine.BlockRetainer.BlockRetained.Hook(func(block *blocks.Block) { blocksPerSlot.Set(block.ID().Slot(), append(lo.Return1(blocksPerSlot.GetOrCreate(block.ID().Slot(), func() []*blocks.Block { return make([]*blocks.Block, 0) })), block)) diff --git a/components/inx/server_blocks.go b/components/inx/server_blocks.go index 81315beb7..1471e29df 100644 --- a/components/inx/server_blocks.go +++ b/components/inx/server_blocks.go @@ -186,7 +186,7 @@ func (s *Server) attachBlock(ctx context.Context, block *iotago.Block) (*inx.Blo mergedCtx, mergedCtxCancel := contextutils.MergeContexts(ctx, Component.Daemon().ContextStopped()) defer mergedCtxCancel() - blockID, err := deps.RequestHandler.SubmitBlockAndAwaitBooking(mergedCtx, block) + blockID, err := deps.RequestHandler.SubmitBlockAndAwaitRetainer(mergedCtx, block) if err != nil { return nil, status.Errorf(codes.Internal, "failed to attach block: %s", err.Error()) } diff --git a/components/restapi/core/blocks.go b/components/restapi/core/blocks.go index d91d965b5..4eb3d93a8 100644 --- a/components/restapi/core/blocks.go +++ b/components/restapi/core/blocks.go @@ -42,7 +42,7 @@ func sendBlock(c echo.Context) (*api.BlockCreatedResponse, error) { return nil, err } - blockID, err := deps.RequestHandler.SubmitBlockAndAwaitBooking(c.Request().Context(), iotaBlock) + blockID, err := deps.RequestHandler.SubmitBlockAndAwaitRetainer(c.Request().Context(), iotaBlock) if err != nil { return nil, ierrors.WithMessagef(echo.ErrInternalServerError, "failed to attach block: %w", err) } diff --git a/pkg/protocol/chains.go b/pkg/protocol/chains.go index 0f07b01bb..fb4b65af6 100644 --- a/pkg/protocol/chains.go +++ b/pkg/protocol/chains.go @@ -176,10 +176,14 @@ func attachEngineLogs(instance *engine.Engine) func() { instance.LogTrace("BlockProcessed", "block", blockID) }).Unhook, - events.Retainer.BlockRetained.Hook(func(block *blocks.Block) { + events.BlockRetainer.BlockRetained.Hook(func(block *blocks.Block) { instance.LogTrace("Retainer.BlockRetained", "block", block.ID()) }).Unhook, + events.TransactionRetainer.TransactionRetained.Hook(func(txID iotago.TransactionID) { + instance.LogTrace("Retainer.TransactionRetained", "transaction", txID) + }).Unhook, + events.Notarization.SlotCommitted.Hook(func(details *notarization.SlotCommittedDetails) { instance.LogTrace("NotarizationManager.SlotCommitted", "commitment", details.Commitment.ID(), "acceptedBlocks count", details.AcceptedBlocks.Size(), "accepted transactions", len(details.Mutations)) }).Unhook, diff --git a/pkg/protocol/engine/events.go b/pkg/protocol/engine/events.go index 7d302d877..805f6be9c 100644 --- a/pkg/protocol/engine/events.go +++ b/pkg/protocol/engine/events.go @@ -29,23 +29,24 @@ type Events struct { AcceptedBlockProcessed *event.Event1[*blocks.Block] Evict *event.Event1[iotago.SlotIndex] - PreSolidFilter *presolidfilter.Events - PostSolidFilter *postsolidfilter.Events - BlockRequester *eventticker.Events[iotago.SlotIndex, iotago.BlockID] - TipManager *tipmanager.Events - BlockDAG *blockdag.Events - Booker *booker.Events - Clock *clock.Events - BlockGadget *blockgadget.Events - SlotGadget *slotgadget.Events - SybilProtection *sybilprotection.Events - Ledger *ledger.Events - Notarization *notarization.Events - SpendDAG *spenddag.Events[iotago.TransactionID, mempool.StateID] - Scheduler *scheduler.Events - SeatManager *seatmanager.Events - SyncManager *syncmanager.Events - Retainer *retainer.Events + PreSolidFilter *presolidfilter.Events + PostSolidFilter *postsolidfilter.Events + BlockRequester *eventticker.Events[iotago.SlotIndex, iotago.BlockID] + TipManager *tipmanager.Events + BlockDAG *blockdag.Events + Booker *booker.Events + Clock *clock.Events + BlockGadget *blockgadget.Events + SlotGadget *slotgadget.Events + SybilProtection *sybilprotection.Events + Ledger *ledger.Events + Notarization *notarization.Events + SpendDAG *spenddag.Events[iotago.TransactionID, mempool.StateID] + Scheduler *scheduler.Events + SeatManager *seatmanager.Events + SyncManager *syncmanager.Events + BlockRetainer *retainer.BlockRetainerEvents + TransactionRetainer *retainer.TransactionRetainerEvents event.Group[Events, *Events] } @@ -71,6 +72,7 @@ var NewEvents = event.CreateGroupConstructor(func() (newEvents *Events) { Scheduler: scheduler.NewEvents(), SeatManager: seatmanager.NewEvents(), SyncManager: syncmanager.NewEvents(), - Retainer: retainer.NewEvents(), + BlockRetainer: retainer.NewBlockRetainerEvents(), + TransactionRetainer: retainer.NewTransactionRetainerEvents(), } }) diff --git a/pkg/requesthandler/blockissuance.go b/pkg/requesthandler/blockissuance.go index c7b65aa76..4bb949bd4 100644 --- a/pkg/requesthandler/blockissuance.go +++ b/pkg/requesthandler/blockissuance.go @@ -11,6 +11,7 @@ import ( "github.com/iotaledger/iota-core/pkg/protocol/engine/blocks" "github.com/iotaledger/iota-core/pkg/protocol/engine/filter/postsolidfilter" "github.com/iotaledger/iota-core/pkg/protocol/engine/filter/presolidfilter" + "github.com/iotaledger/iota-core/pkg/retainer/txretainer" iotago "github.com/iotaledger/iota.go/v4" ) @@ -24,27 +25,53 @@ func (r *RequestHandler) SubmitBlockWithoutAwaitingBooking(block *model.Block) e return r.submitBlock(block) } -// submitBlockAndAwaitEvent submits a block to be processed and waits for the event to be triggered. -func (r *RequestHandler) submitBlockAndAwaitEvent(ctx context.Context, block *model.Block, evt *event.Event1[*blocks.Block]) error { +// submitBlockAndAwaitRetainer submits a block to be processed and waits for the block gets retained. +func (r *RequestHandler) submitBlockAndAwaitRetainer(ctx context.Context, block *model.Block) error { filtered := make(chan error, 1) exit := make(chan struct{}) defer close(exit) // Make sure we don't wait forever here. If the block is not dispatched to the main engine, // it will never trigger one of the below events. - processingCtx, processingCtxCancel := context.WithTimeout(ctx, 5*time.Second) + processingCtx, processingCtxCancel := context.WithTimeout(ctx, 10*time.Second) defer processingCtxCancel() // Calculate the blockID so that we don't capture the block pointer in the event handlers. blockID := block.ID() - evtUnhook := evt.Hook(func(eventBlock *blocks.Block) { - if blockID != eventBlock.ID() { - return - } - select { - case filtered <- nil: - case <-exit: + + var successUnhook func() + // Hook to TransactionAttached event if the block contains a transaction. + signedTx, isTx := block.SignedTransaction() + if isTx { + txID := signedTx.Transaction.MustID() + // Check if the transaction is already retained. The onTransactionAttached event is only triggered if it's a new transaction. + // If the transaction is already retained, we hook to the BlockRetained event. + _, err := r.protocol.Engines.Main.Get().TxRetainer.TransactionMetadata(txID) + if ierrors.Is(err, txretainer.ErrEntryNotFound) { + successUnhook = r.protocol.Events.Engine.TransactionRetainer.TransactionRetained.Hook(func(transactionID iotago.TransactionID) { + if transactionID != txID { + return + } + select { + case filtered <- nil: + case <-exit: + } + }, event.WithWorkerPool(r.workerPool)).Unhook } - }, event.WithWorkerPool(r.workerPool)).Unhook + } + + // if no hook was set, hook to the block retained event. + if successUnhook == nil { + successUnhook = r.protocol.Events.Engine.BlockRetainer.BlockRetained.Hook(func(eventBlock *blocks.Block) { + if blockID != eventBlock.ID() { + return + } + select { + case filtered <- nil: + case <-exit: + } + }, event.WithWorkerPool(r.workerPool)).Unhook + } + prefilteredUnhook := r.protocol.Events.Engine.PreSolidFilter.BlockPreFiltered.Hook(func(event *presolidfilter.BlockPreFilteredEvent) { if blockID != event.Block.ID() { return @@ -65,7 +92,7 @@ func (r *RequestHandler) submitBlockAndAwaitEvent(ctx context.Context, block *mo } }, event.WithWorkerPool(r.workerPool)).Unhook - defer lo.BatchReverse(evtUnhook, prefilteredUnhook, postfilteredUnhook)() + defer lo.BatchReverse(successUnhook, prefilteredUnhook, postfilteredUnhook)() if err := r.submitBlock(block); err != nil { return ierrors.Wrapf(err, "failed to issue block %s", blockID) @@ -82,13 +109,13 @@ func (r *RequestHandler) submitBlockAndAwaitEvent(ctx context.Context, block *mo } } -func (r *RequestHandler) SubmitBlockAndAwaitBooking(ctx context.Context, iotaBlock *iotago.Block) (iotago.BlockID, error) { +func (r *RequestHandler) SubmitBlockAndAwaitRetainer(ctx context.Context, iotaBlock *iotago.Block) (iotago.BlockID, error) { modelBlock, err := model.BlockFromBlock(iotaBlock) if err != nil { return iotago.EmptyBlockID, ierrors.Wrap(err, "error serializing block to model block") } - if err = r.submitBlockAndAwaitEvent(ctx, modelBlock, r.protocol.Events.Engine.Retainer.BlockRetained); err != nil { + if err = r.submitBlockAndAwaitRetainer(ctx, modelBlock); err != nil { return iotago.EmptyBlockID, ierrors.Wrap(err, "error issuing model block") } diff --git a/pkg/retainer/blockretainer/block_retainer.go b/pkg/retainer/blockretainer/block_retainer.go index 5f648e320..5e75cbf5d 100644 --- a/pkg/retainer/blockretainer/block_retainer.go +++ b/pkg/retainer/blockretainer/block_retainer.go @@ -23,7 +23,7 @@ type ( ) type BlockRetainer struct { - events *retainer.Events + events *retainer.BlockRetainerEvents store StoreFunc cache *cache @@ -39,7 +39,7 @@ type BlockRetainer struct { func New(module module.Module, workersGroup *workerpool.Group, retainerStoreFunc StoreFunc, finalizedSlotFunc FinalizedSlotFunc, errorHandler func(error)) *BlockRetainer { b := &BlockRetainer{ Module: module, - events: retainer.NewEvents(), + events: retainer.NewBlockRetainerEvents(), workerPool: workersGroup.CreatePool("Retainer", workerpool.WithWorkerCount(1)), store: retainerStoreFunc, cache: newCache(), @@ -99,7 +99,7 @@ func NewProvider() module.Provider[*engine.Engine, retainer.BlockRetainer] { } }, asyncOpt) - e.Events.Retainer.BlockRetained.LinkTo(r.events.BlockRetained) + e.Events.BlockRetainer.BlockRetained.LinkTo(r.events.BlockRetained) r.InitializedEvent().Trigger() diff --git a/pkg/retainer/events.go b/pkg/retainer/events.go index 52dff40f5..0147f9d76 100644 --- a/pkg/retainer/events.go +++ b/pkg/retainer/events.go @@ -3,19 +3,35 @@ package retainer import ( "github.com/iotaledger/hive.go/runtime/event" "github.com/iotaledger/iota-core/pkg/protocol/engine/blocks" + iotago "github.com/iotaledger/iota.go/v4" ) -// Events is a collection of Retainer related Events. -type Events struct { +// BlockRetainerEvents is a collection of Retainer related BlockRetainerEvents. +type BlockRetainerEvents struct { // BlockRetained is triggered when a block is stored in the retainer. BlockRetained *event.Event1[*blocks.Block] - event.Group[Events, *Events] + event.Group[BlockRetainerEvents, *BlockRetainerEvents] } -// NewEvents contains the constructor of the Events object (it is generated by a generic factory). -var NewEvents = event.CreateGroupConstructor(func() (newEvents *Events) { - return &Events{ +// NewBlockRetainerEvents contains the constructor of the Events object (it is generated by a generic factory). +var NewBlockRetainerEvents = event.CreateGroupConstructor(func() (newEvents *BlockRetainerEvents) { + return &BlockRetainerEvents{ BlockRetained: event.New1[*blocks.Block](), } }) + +// TransactionRetainerEvents is a collection of Retainer related TransactionRetainerEvents. +type TransactionRetainerEvents struct { + // TransactionRetained is triggered when a transaction is stored in the retainer. + TransactionRetained *event.Event1[iotago.TransactionID] + + event.Group[TransactionRetainerEvents, *TransactionRetainerEvents] +} + +// NewTransactionRetainerEvents contains the constructor of the Events object (it is generated by a generic factory). +var NewTransactionRetainerEvents = event.CreateGroupConstructor(func() (newEvents *TransactionRetainerEvents) { + return &TransactionRetainerEvents{ + TransactionRetained: event.New1[iotago.TransactionID](), + } +}) diff --git a/pkg/retainer/txretainer/tx_retainer.go b/pkg/retainer/txretainer/tx_retainer.go index 6f37aa7db..faba1de84 100644 --- a/pkg/retainer/txretainer/tx_retainer.go +++ b/pkg/retainer/txretainer/tx_retainer.go @@ -90,6 +90,7 @@ type ( // TransactionRetainer keeps and resolves all the transaction-related metadata needed in the API and INX. type TransactionRetainer struct { + events *retainer.TransactionRetainerEvents workerPool *workerpool.WorkerPool txRetainerDatabase *transactionRetainerDatabase latestCommittedSlotFunc SlotFunc @@ -113,6 +114,7 @@ func WithDebugStoreErrorMessages(store bool) options.Option[TransactionRetainer] func New(parentModule module.Module, workersGroup *workerpool.Group, dbExecFunc storage.SQLDatabaseExecFunc, latestCommittedSlotFunc SlotFunc, finalizedSlotFunc SlotFunc, errorHandler func(error), opts ...options.Option[TransactionRetainer]) *TransactionRetainer { return module.InitSimpleLifecycle(options.Apply(&TransactionRetainer{ Module: parentModule.NewSubModule("TransactionRetainer"), + events: retainer.NewTransactionRetainerEvents(), workerPool: workersGroup.CreatePool("TxRetainer", workerpool.WithWorkerCount(1)), txRetainerCache: NewTransactionRetainerCache(), txRetainerDatabase: NewTransactionRetainerDB(dbExecFunc), @@ -158,6 +160,8 @@ func NewProvider(opts ...options.Option[TransactionRetainer]) module.Provider[*e // therefore we use false for the "validSignature" argument. r.UpdateTransactionMetadata(transactionMetadata.ID(), false, transactionMetadata.EarliestIncludedAttachment().Slot(), api.TransactionStatePending, nil) + r.events.TransactionRetained.Trigger(transactionMetadata.ID()) + // the transaction was accepted transactionMetadata.OnAccepted(func() { e.LogTrace("TxRetainer.TransactionAccepted", "tx", transactionMetadata.ID()) @@ -242,6 +246,10 @@ func NewProvider(opts ...options.Option[TransactionRetainer]) module.Provider[*e }, asyncOpt) }) + e.Events.TransactionRetainer.TransactionRetained.LinkTo(r.events.TransactionRetained) + + r.InitializedEvent().Trigger() + return r }) } diff --git a/pkg/tests/reward_test.go b/pkg/tests/reward_test.go index 3d8801186..860242dd9 100644 --- a/pkg/tests/reward_test.go +++ b/pkg/tests/reward_test.go @@ -373,10 +373,13 @@ func Test_Account_StakeAmountCalculation(t *testing.T) { ) block7 := lo.PanicOnErr(ts.IssueBasicBlockWithOptions("block7", ts.DefaultWallet(), tx7, mock.WithStrongParents(latestParents...))) + latestParents = ts.CommitUntilSlot(block7_8Slot, block7.ID()) + tx8 := ts.DefaultWallet().ClaimDelegatorRewards("TX8", "TX7:0") - block8 := lo.PanicOnErr(ts.IssueBasicBlockWithOptions("block8", ts.DefaultWallet(), tx8, mock.WithStrongParents(block7.ID()))) + block8 := lo.PanicOnErr(ts.IssueBasicBlockWithOptions("block8", ts.DefaultWallet(), tx8, mock.WithStrongParents(latestParents...))) - latestParents = ts.CommitUntilSlot(block7_8Slot, block8.ID()) + block8Slot := ts.CurrentSlot() + latestParents = ts.CommitUntilSlot(block8Slot, block8.ID()) // Delegated Stake should be unaffected since no new delegation was effectively added in that slot. ts.AssertAccountStake(accountID, stakedAmount, deleg1+deleg2, ts.Nodes()...) diff --git a/pkg/testsuite/mock/client.go b/pkg/testsuite/mock/client.go index 96416ad41..e6f533459 100644 --- a/pkg/testsuite/mock/client.go +++ b/pkg/testsuite/mock/client.go @@ -241,7 +241,7 @@ func (c *TestSuiteClient) Routes(_ context.Context) (*api.RoutesResponse, error) } func (c *TestSuiteClient) SubmitBlock(ctx context.Context, block *iotago.Block) (iotago.BlockID, error) { - return c.Node.RequestHandler.SubmitBlockAndAwaitBooking(ctx, block) + return c.Node.RequestHandler.SubmitBlockAndAwaitRetainer(ctx, block) } func (c *TestSuiteClient) TransactionIncludedBlock(_ context.Context, txID iotago.TransactionID) (*iotago.Block, error) { diff --git a/tools/docker-network/tests/eventapiframework.go b/tools/docker-network/tests/eventapiframework.go index 2886d8dfa..b05b70502 100644 --- a/tools/docker-network/tests/eventapiframework.go +++ b/tools/docker-network/tests/eventapiframework.go @@ -284,7 +284,13 @@ func (e *EventAPIDockerTestFramework) AssertTransactionMetadataByTransactionID(c select { case metadata := <-acceptedChan: if txID.Compare(metadata.TransactionID) == 0 { + // make sure the transaction state is available from the core API + resp, err := eventClt.Client.TransactionMetadata(ctx, txID) + require.NoError(e.Testing, err) + require.NotNil(e.Testing, resp) + e.finishChan <- struct{}{} + return }