From 1e4b2f26633bed120ed1da300cb5fa6eab9f020b Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Wed, 29 Nov 2023 10:59:21 +0100 Subject: [PATCH 01/16] Feat: Added PR for bundled changes while debugging --- pkg/tests/loss_of_acceptance_test.go | 2 +- pkg/testsuite/mock/node.go | 101 ++++++++++++++------------- 2 files changed, 52 insertions(+), 51 deletions(-) diff --git a/pkg/tests/loss_of_acceptance_test.go b/pkg/tests/loss_of_acceptance_test.go index eac280eef..3929165cc 100644 --- a/pkg/tests/loss_of_acceptance_test.go +++ b/pkg/tests/loss_of_acceptance_test.go @@ -127,7 +127,7 @@ func TestLossOfAcceptanceFromSnapshot(t *testing.T) { node2 := ts.AddNode("node2") ts.Run(true, nil) - node2.Protocol.SetLogLevel(log.LevelTrace) + node2.Protocol.SetLogLevel(log.LevelDebug) // Issue up to slot 10, committing slot 8. { diff --git a/pkg/testsuite/mock/node.go b/pkg/testsuite/mock/node.go index a3b6b8c56..5d6531796 100644 --- a/pkg/testsuite/mock/node.go +++ b/pkg/testsuite/mock/node.go @@ -191,11 +191,11 @@ func (n *Node) hookLogging(failOnBlockFiltered bool) { }) } -func (n *Node) attachEngineLogsWithName(failOnBlockFiltered bool, instance *engine.Engine, engineName string) { +func (n *Node) attachEngineLogsWithName(failOnBlockFiltered bool, instance *engine.Engine) { events := instance.Events events.BlockDAG.BlockAttached.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] BlockDAG.BlockAttached: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("BlockDAG.BlockAttached", "block", block.ID()) n.mutex.Lock() defer n.mutex.Unlock() @@ -203,78 +203,80 @@ func (n *Node) attachEngineLogsWithName(failOnBlockFiltered bool, instance *engi }) events.BlockDAG.BlockSolid.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] BlockDAG.BlockSolid: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("BlockDAG.BlockSolid", "block", block.ID()) }) events.BlockDAG.BlockInvalid.Hook(func(block *blocks.Block, err error) { - fmt.Printf("%s > [%s] BlockDAG.BlockInvalid: %s - %s\n", n.Name, engineName, block.ID(), err) + instance.LogTrace("BlockDAG.BlockInvalid", "block", block.ID(), "err", err) }) events.BlockDAG.BlockMissing.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] BlockDAG.BlockMissing: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("BlockDAG.BlockMissing", "block", block.ID()) }) events.BlockDAG.MissingBlockAttached.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] BlockDAG.MissingBlockAttached: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("BlockDAG.MissingBlockAttached", "block", block.ID()) }) events.SeatManager.BlockProcessed.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] SybilProtection.BlockProcessed: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("SeatManager.BlockProcessed", "block", block.ID()) }) events.Booker.BlockBooked.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Booker.BlockBooked: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("Booker.BlockBooked", "block", block.ID()) }) events.Booker.BlockInvalid.Hook(func(block *blocks.Block, err error) { - fmt.Printf("%s > [%s] Booker.BlockInvalid: %s - %s\n", n.Name, engineName, block.ID(), err.Error()) + instance.LogTrace("Booker.BlockInvalid", "block", block.ID(), "err", err) }) events.Booker.TransactionInvalid.Hook(func(metadata mempool.TransactionMetadata, err error) { - fmt.Printf("%s > [%s] Booker.TransactionInvalid: %s - %s\n", n.Name, engineName, metadata.ID(), err.Error()) + instance.LogTrace("Booker.TransactionInvalid", "tx", metadata.ID(), "err", err) }) events.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Scheduler.BlockScheduled: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("Scheduler.BlockScheduled", "block", block.ID()) }) events.Scheduler.BlockEnqueued.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Scheduler.BlockEnqueued: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("Scheduler.BlockEnqueued", "block", block.ID()) }) events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Scheduler.BlockSkipped: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("Scheduler.BlockSkipped", "block", block.ID()) }) events.Scheduler.BlockDropped.Hook(func(block *blocks.Block, err error) { - fmt.Printf("%s > [%s] Scheduler.BlockDropped: %s - %s\n", n.Name, engineName, block.ID(), err.Error()) + instance.LogTrace("Scheduler.BlockDropped", "block", block.ID(), "err", err) }) events.Clock.AcceptedTimeUpdated.Hook(func(newTime time.Time) { - fmt.Printf("%s > [%s] Clock.AcceptedTimeUpdated: %s [Slot %d]\n", n.Name, engineName, newTime, instance.LatestAPI().TimeProvider().SlotFromTime(newTime)) + instance.LogTrace("Clock.AcceptedTimeUpdated", "time", newTime, "slot", instance.LatestAPI().TimeProvider().SlotFromTime(newTime)) }) events.Clock.ConfirmedTimeUpdated.Hook(func(newTime time.Time) { - fmt.Printf("%s > [%s] Clock.ConfirmedTimeUpdated: %s [Slot %d]\n", n.Name, engineName, newTime, instance.LatestAPI().TimeProvider().SlotFromTime(newTime)) + instance.LogTrace("Clock.ConfirmedTimeUpdated", "time", newTime, "slot", instance.LatestAPI().TimeProvider().SlotFromTime(newTime)) }) events.PreSolidFilter.BlockPreAllowed.Hook(func(block *model.Block) { - fmt.Printf("%s > [%s] PreSolidFilter.BlockPreAllowed: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("PreSolidFilter.BlockPreAllowed", "block", block.ID()) }) events.PreSolidFilter.BlockPreFiltered.Hook(func(event *presolidfilter.BlockPreFilteredEvent) { - fmt.Printf("%s > [%s] PreSolidFilter.BlockPreFiltered: %s - %s\n", n.Name, engineName, event.Block.ID(), event.Reason.Error()) + instance.LogTrace("PreSolidFilter.BlockPreFiltered", "block", event.Block.ID(), "err", event.Reason) + if failOnBlockFiltered { n.Testing.Fatal("no blocks should be prefiltered") } }) events.PostSolidFilter.BlockAllowed.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] PostSolidFilter.BlockAllowed: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("PostSolidFilter.BlockAllowed", "block", block.ID()) }) events.PostSolidFilter.BlockFiltered.Hook(func(event *postsolidfilter.BlockFilteredEvent) { - fmt.Printf("%s > [%s] PostSolidFilter.BlockFiltered: %s - %s\n", n.Name, engineName, event.Block.ID(), event.Reason.Error()) + instance.LogTrace("PostSolidFilter.BlockFiltered", "block", event.Block.ID(), "err", event.Reason) + if failOnBlockFiltered { n.Testing.Fatal("no blocks should be filtered") } @@ -285,11 +287,11 @@ func (n *Node) attachEngineLogsWithName(failOnBlockFiltered bool, instance *engi }) events.BlockRequester.Tick.Hook(func(blockID iotago.BlockID) { - fmt.Printf("%s > [%s] BlockRequester.Tick: %s\n", n.Name, engineName, blockID) + instance.LogTrace("BlockRequester.Tick", "block", blockID) }) events.BlockProcessed.Hook(func(blockID iotago.BlockID) { - fmt.Printf("%s > [%s] Engine.BlockProcessed: %s\n", n.Name, engineName, blockID) + instance.LogTrace("BlockProcessed", "block", blockID) }) events.Notarization.SlotCommitted.Hook(func(details *notarization.SlotCommittedDetails) { @@ -316,109 +318,108 @@ func (n *Node) attachEngineLogsWithName(failOnBlockFiltered bool, instance *engi require.NoError(n.Testing, err) } - fmt.Printf("%s > [%s] NotarizationManager.SlotCommitted: %s %s Accepted Blocks: %s\n %s\n Attestations: %s\n", n.Name, engineName, details.Commitment.ID(), details.Commitment, acceptedBlocks, roots, attestationBlockIDs) + instance.LogTrace("NotarizationManager.SlotCommitted", "commitment", details.Commitment.ID(), "acceptedBlocks", acceptedBlocks, "roots", roots, "attestations", attestationBlockIDs) }) events.Notarization.LatestCommitmentUpdated.Hook(func(commitment *model.Commitment) { - fmt.Printf("%s > [%s] NotarizationManager.LatestCommitmentUpdated: %s\n", n.Name, engineName, commitment.ID()) + instance.LogTrace("NotarizationManager.LatestCommitmentUpdated", "commitment", commitment.ID()) }) events.BlockGadget.BlockPreAccepted.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Consensus.BlockGadget.BlockPreAccepted: %s %s\n", n.Name, engineName, block.ID(), block.ProtocolBlock().Header.SlotCommitmentID) + instance.LogTrace("BlockGadget.BlockPreAccepted", "block", block.ID(), "slotCommitmentID", block.ProtocolBlock().Header.SlotCommitmentID) }) events.BlockGadget.BlockAccepted.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Consensus.BlockGadget.BlockAccepted: %s @ slot %s committing to %s\n", n.Name, engineName, block.ID(), block.ID().Slot(), block.ProtocolBlock().Header.SlotCommitmentID) + instance.LogTrace("BlockGadget.BlockAccepted", "block", block.ID(), "slotCommitmentID", block.ProtocolBlock().Header.SlotCommitmentID) }) events.BlockGadget.BlockPreConfirmed.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Consensus.BlockGadget.BlockPreConfirmed: %s %s\n", n.Name, engineName, block.ID(), block.ProtocolBlock().Header.SlotCommitmentID) + instance.LogTrace("BlockGadget.BlockPreConfirmed", "block", block.ID(), "slotCommitmentID", block.ProtocolBlock().Header.SlotCommitmentID) }) events.BlockGadget.BlockConfirmed.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Consensus.BlockGadget.BlockConfirmed: %s %s\n", n.Name, engineName, block.ID(), block.ProtocolBlock().Header.SlotCommitmentID) + instance.LogTrace("BlockGadget.BlockConfirmed", "block", block.ID(), "slotCommitmentID", block.ProtocolBlock().Header.SlotCommitmentID) }) events.SlotGadget.SlotFinalized.Hook(func(slot iotago.SlotIndex) { - fmt.Printf("%s > [%s] Consensus.SlotGadget.SlotFinalized: %s\n", n.Name, engineName, slot) + instance.LogTrace("SlotGadget.SlotFinalized", "slot", slot) }) events.SeatManager.OnlineCommitteeSeatAdded.Hook(func(seat account.SeatIndex, accountID iotago.AccountID) { - fmt.Printf("%s > [%s] SybilProtection.OnlineCommitteeSeatAdded: %d - %s\n", n.Name, engineName, seat, accountID) + instance.LogTrace("SybilProtection.OnlineCommitteeSeatAdded", "seat", seat, "accountID", accountID) }) events.SeatManager.OnlineCommitteeSeatRemoved.Hook(func(seat account.SeatIndex) { - fmt.Printf("%s > [%s] SybilProtection.OnlineCommitteeSeatRemoved: %d\n", n.Name, engineName, seat) + instance.LogTrace("SybilProtection.OnlineCommitteeSeatRemoved", "seat", seat) }) events.SybilProtection.CommitteeSelected.Hook(func(committee *account.Accounts, epoch iotago.EpochIndex) { - fmt.Printf("%s > [%s] SybilProtection.CommitteeSelected: epoch %d - %s\n", n.Name, engineName, epoch, committee.IDs()) + instance.LogTrace("SybilProtection.CommitteeSelected", "epoch", epoch, "committee", committee.IDs()) }) events.ConflictDAG.ConflictCreated.Hook(func(conflictID iotago.TransactionID) { - fmt.Printf("%s > [%s] ConflictDAG.ConflictCreated: %s\n", n.Name, engineName, conflictID) + instance.LogTrace("ConflictDAG.ConflictCreated", "conflictID", conflictID) }) events.ConflictDAG.ConflictEvicted.Hook(func(conflictID iotago.TransactionID) { - fmt.Printf("%s > [%s] ConflictDAG.ConflictEvicted: %s\n", n.Name, engineName, conflictID) + instance.LogTrace("ConflictDAG.ConflictEvicted", "conflictID", conflictID) }) + events.ConflictDAG.ConflictRejected.Hook(func(conflictID iotago.TransactionID) { - fmt.Printf("%s > [%s] ConflictDAG.ConflictRejected: %s\n", n.Name, engineName, conflictID) + instance.LogTrace("ConflictDAG.ConflictRejected", "conflictID", conflictID) }) events.ConflictDAG.ConflictAccepted.Hook(func(conflictID iotago.TransactionID) { - fmt.Printf("%s > [%s] ConflictDAG.ConflictAccepted: %s\n", n.Name, engineName, conflictID) + instance.LogTrace("ConflictDAG.ConflictAccepted", "conflictID", conflictID) }) instance.Ledger.OnTransactionAttached(func(transactionMetadata mempool.TransactionMetadata) { - fmt.Printf("%s > [%s] Ledger.TransactionAttached: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("Ledger.TransactionAttached", "tx", transactionMetadata.ID()) transactionMetadata.OnSolid(func() { - fmt.Printf("%s > [%s] MemPool.TransactionSolid: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionSolid", "tx", transactionMetadata.ID()) }) transactionMetadata.OnExecuted(func() { - fmt.Printf("%s > [%s] MemPool.TransactionExecuted: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionExecuted", "tx", transactionMetadata.ID()) }) transactionMetadata.OnBooked(func() { - fmt.Printf("%s > [%s] MemPool.TransactionBooked: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionBooked", "tx", transactionMetadata.ID()) }) transactionMetadata.OnConflicting(func() { - fmt.Printf("%s > [%s] MemPool.TransactionConflicting: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionConflicting", "tx", transactionMetadata.ID()) }) transactionMetadata.OnAccepted(func() { - fmt.Printf("%s > [%s] MemPool.TransactionAccepted: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionAccepted", "tx", transactionMetadata.ID()) }) transactionMetadata.OnRejected(func() { - fmt.Printf("%s > [%s] MemPool.TransactionRejected: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionRejected", "tx", transactionMetadata.ID()) }) transactionMetadata.OnInvalid(func(err error) { - fmt.Printf("%s > [%s] MemPool.TransactionInvalid(%s): %s\n", n.Name, engineName, err, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionInvalid", "tx", transactionMetadata.ID(), "err", err) }) transactionMetadata.OnOrphanedSlotUpdated(func(slot iotago.SlotIndex) { - fmt.Printf("%s > [%s] MemPool.TransactionOrphanedSlotUpdated in slot %d: %s\n", n.Name, engineName, slot, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionOrphanedSlotUpdated", "tx", transactionMetadata.ID(), "slot", slot) }) transactionMetadata.OnCommittedSlotUpdated(func(slot iotago.SlotIndex) { - fmt.Printf("%s > [%s] MemPool.TransactionCommittedSlotUpdated in slot %d: %s\n", n.Name, engineName, slot, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionCommittedSlotUpdated", "tx", transactionMetadata.ID(), "slot", slot) }) transactionMetadata.OnPending(func() { - fmt.Printf("%s > [%s] MemPool.TransactionPending: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionPending", "tx", transactionMetadata.ID()) }) }) } func (n *Node) attachEngineLogs(failOnBlockFiltered bool, instance *engine.Engine) { - engineName := fmt.Sprintf("%s - %s", lo.Cond(n.Protocol.Engines.Main.Get() != instance, "Candidate", "Main"), instance.Name()[:8]) - - n.attachEngineLogsWithName(failOnBlockFiltered, instance, engineName) + n.attachEngineLogsWithName(failOnBlockFiltered, instance) } func (n *Node) Wait() { From a467a260c8004cf7425b873a4e596c40904c039f Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Wed, 29 Nov 2023 11:20:02 +0100 Subject: [PATCH 02/16] Feat: added more logs --- pkg/protocol/protocol_blocks.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/protocol/protocol_blocks.go b/pkg/protocol/protocol_blocks.go index b18983f3b..4550f9818 100644 --- a/pkg/protocol/protocol_blocks.go +++ b/pkg/protocol/protocol_blocks.go @@ -108,7 +108,7 @@ func (b *BlocksProtocol) ProcessResponse(block *model.Block, from peer.ID) { if !b.droppedBlocksBuffer.Add(block.ProtocolBlock().Header.SlotCommitmentID, types.NewTuple(block, from)) { b.LogError("failed to add dropped block referencing unsolid commitment to dropped blocks buffer", "commitmentID", block.ProtocolBlock().Header.SlotCommitmentID, "blockID", block.ID()) } else { - b.LogTrace("dropped block referencing unsolid commitment added to dropped blocks buffer", "commitmentID", block.ProtocolBlock().Header.SlotCommitmentID, "blockID", block.ID()) + b.LogDebug("dropped block referencing unsolid commitment added to dropped blocks buffer", "commitmentID", block.ProtocolBlock().Header.SlotCommitmentID, "blockID", block.ID()) } return From ed3888792cbee5123a8e61606ceb697510adad76 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Thu, 30 Nov 2023 00:09:37 +0100 Subject: [PATCH 03/16] Feat: added log output --- pkg/protocol/protocol_blocks.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/protocol/protocol_blocks.go b/pkg/protocol/protocol_blocks.go index 4550f9818..41a675a9f 100644 --- a/pkg/protocol/protocol_blocks.go +++ b/pkg/protocol/protocol_blocks.go @@ -112,9 +112,11 @@ func (b *BlocksProtocol) ProcessResponse(block *model.Block, from peer.ID) { } return + } else if block.ProtocolBlock().Header.SlotCommitmentID.Slot() >= 19 { + b.LogError("received block", "blockID", block.ID(), "commitment", commitment.LogName()) + } else { + b.LogTrace("received block", "blockID", block.ID(), "commitment", commitment.LogName()) } - - b.LogTrace("received block", "blockID", block.ID(), "commitment", commitment.LogName()) }) } From 0db2bf006e7cc359b74adb4abfda4e0aa528d603 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Thu, 30 Nov 2023 09:38:55 +0100 Subject: [PATCH 04/16] Feat: changed some code --- pkg/protocol/protocol_blocks.go | 37 +++++++++++++++++----------- pkg/tests/loss_of_acceptance_test.go | 2 -- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/pkg/protocol/protocol_blocks.go b/pkg/protocol/protocol_blocks.go index 41a675a9f..1a2e948c6 100644 --- a/pkg/protocol/protocol_blocks.go +++ b/pkg/protocol/protocol_blocks.go @@ -53,22 +53,29 @@ func newBlocksProtocol(protocol *Protocol) *BlocksProtocol { }) }) - protocol.Chains.WithElements(func(chain *Chain) func() { - return chain.Engine.WithNonEmptyValue(func(engineInstance *engine.Engine) (shutdown func()) { - return engineInstance.Events.BlockRequester.Tick.Hook(b.SendRequest).Unhook - }) - }) - - protocol.Chains.Main.Get().Engine.OnUpdateWithContext(func(_ *engine.Engine, engine *engine.Engine, unsubscribeOnEngineChange func(subscriptionFactory func() (unsubscribe func()))) { - if engine != nil { - unsubscribeOnEngineChange(func() (unsubscribe func()) { - return lo.Batch( - engine.Events.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook, - engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook, - ) - }) - } + //protocol.Chains.WithElements(func(chain *Chain) func() { + // return chain.Engine.WithNonEmptyValue(func(engineInstance *engine.Engine) (shutdown func()) { + // return engineInstance.Events.BlockRequester.Tick.Hook(b.SendRequest).Unhook + // }) + //}) + + protocol.Chains.WithInitializedEngines(func(chain *Chain, engine *engine.Engine) (shutdown func()) { + return lo.Batch( + engine.Events.BlockRequester.Tick.Hook(b.SendRequest).Unhook, + engine.Events.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook, + engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook, + ) }) + //protocol.Chains.Main.Get().Engine.OnUpdateWithContext(func(_ *engine.Engine, engine *engine.Engine, unsubscribeOnEngineChange func(subscriptionFactory func() (unsubscribe func()))) { + // if engine != nil { + // unsubscribeOnEngineChange(func() (unsubscribe func()) { + // return lo.Batch( + // engine.Events.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook, + // engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook, + // ) + // }) + // } + //}) }) return b diff --git a/pkg/tests/loss_of_acceptance_test.go b/pkg/tests/loss_of_acceptance_test.go index 3929165cc..c0d1d2420 100644 --- a/pkg/tests/loss_of_acceptance_test.go +++ b/pkg/tests/loss_of_acceptance_test.go @@ -173,8 +173,6 @@ func TestLossOfAcceptanceFromSnapshot(t *testing.T) { { ts.IssueBlocksAtSlots("", []iotago.SlotIndex{21, 22}, 2, "block0", ts.Nodes("node0-restarted"), true, nil) - time.Sleep(10 * time.Second) - ts.AssertEqualStoredCommitmentAtIndex(20, ts.Nodes()...) ts.AssertLatestCommitmentSlotIndex(20, ts.Nodes()...) } From 13cd73a53e62b61424b2a6428f2250b2ad622c37 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Thu, 30 Nov 2023 10:48:30 +0100 Subject: [PATCH 05/16] Feat: added log --- pkg/tests/loss_of_acceptance_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/tests/loss_of_acceptance_test.go b/pkg/tests/loss_of_acceptance_test.go index c0d1d2420..d5afd8a66 100644 --- a/pkg/tests/loss_of_acceptance_test.go +++ b/pkg/tests/loss_of_acceptance_test.go @@ -171,6 +171,8 @@ func TestLossOfAcceptanceFromSnapshot(t *testing.T) { // Need to issue to slot 22 so that all other nodes can warp sync up to slot 19 and then commit slot 20 themselves. { + fmt.Println("ISSUE BLOCKS SO THAT NODES CAN WARP SYNC UP TO SLOT 19") + ts.IssueBlocksAtSlots("", []iotago.SlotIndex{21, 22}, 2, "block0", ts.Nodes("node0-restarted"), true, nil) ts.AssertEqualStoredCommitmentAtIndex(20, ts.Nodes()...) From 8f60e3de6b65295314e5abf7ecdbc05f48988088 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Thu, 30 Nov 2023 10:52:06 +0100 Subject: [PATCH 06/16] Feat: add logging --- pkg/protocol/protocol_blocks.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/protocol/protocol_blocks.go b/pkg/protocol/protocol_blocks.go index 1a2e948c6..4833486cd 100644 --- a/pkg/protocol/protocol_blocks.go +++ b/pkg/protocol/protocol_blocks.go @@ -45,7 +45,7 @@ func newBlocksProtocol(protocol *Protocol) *BlocksProtocol { return commitment.ReplayDroppedBlocks.OnUpdate(func(_ bool, replayBlocks bool) { if replayBlocks { for _, droppedBlock := range b.droppedBlocksBuffer.GetValues(commitment.ID()) { - b.LogTrace("replaying dropped block", "commitmentID", commitment.ID(), "blockID", droppedBlock.A.ID()) + b.LogError("replaying dropped block", "commitmentID", commitment.ID(), "blockID", droppedBlock.A.ID()) b.ProcessResponse(droppedBlock.A, droppedBlock.B) } From a52498764e14c006cf87628100b38045f5f90a53 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Thu, 30 Nov 2023 13:15:30 +0100 Subject: [PATCH 07/16] Feat: added logging --- pkg/tests/loss_of_acceptance_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/tests/loss_of_acceptance_test.go b/pkg/tests/loss_of_acceptance_test.go index d5afd8a66..96609bcb6 100644 --- a/pkg/tests/loss_of_acceptance_test.go +++ b/pkg/tests/loss_of_acceptance_test.go @@ -127,7 +127,7 @@ func TestLossOfAcceptanceFromSnapshot(t *testing.T) { node2 := ts.AddNode("node2") ts.Run(true, nil) - node2.Protocol.SetLogLevel(log.LevelDebug) + node2.Protocol.SetLogLevel(log.LevelTrace) // Issue up to slot 10, committing slot 8. { From ff87a2e64754bab1758949513df16385fc05e271 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Thu, 30 Nov 2023 15:54:04 +0100 Subject: [PATCH 08/16] Feat: added solidity stuff to the logging - let's see --- .../engine/blockdag/inmemoryblockdag/blockdag.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go b/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go index 6a9c3cdf4..d7fb579c9 100644 --- a/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go +++ b/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go @@ -1,8 +1,10 @@ package inmemoryblockdag import ( + "fmt" "sync/atomic" + "github.com/iotaledger/hive.go/ds" "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/runtime/event" "github.com/iotaledger/hive.go/runtime/module" @@ -74,6 +76,8 @@ func (b *BlockDAG) setupBlock(block *blocks.Block) { var unsolidParentsCount atomic.Int32 unsolidParentsCount.Store(int32(len(block.Parents()))) + unsolidParents := ds.NewSet[iotago.BlockID]() + block.ForEachParent(func(parent iotago.Parent) { parentBlock, exists := b.blockCache.Block(parent.ID) if !exists { @@ -82,7 +86,13 @@ func (b *BlockDAG) setupBlock(block *blocks.Block) { return } + unsolidParents.Add(parent.ID) + fmt.Println("unsolid Parents of ", block.ID(), unsolidParents) + parentBlock.Solid().OnUpdateOnce(func(_ bool, _ bool) { + unsolidParents.Delete(parent.ID) + fmt.Println("unsolid Parents of ", block.ID(), unsolidParents) + if unsolidParentsCount.Add(-1) == 0 { if block.SetSolid() { b.events.BlockSolid.Trigger(block) From 8b97187593daf890a630706e290f601d2e53142e Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Thu, 30 Nov 2023 16:52:05 +0100 Subject: [PATCH 09/16] Feat: add logging --- pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go b/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go index d7fb579c9..7a2c1828f 100644 --- a/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go +++ b/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go @@ -93,8 +93,10 @@ func (b *BlockDAG) setupBlock(block *blocks.Block) { unsolidParents.Delete(parent.ID) fmt.Println("unsolid Parents of ", block.ID(), unsolidParents) - if unsolidParentsCount.Add(-1) == 0 { + if counter := unsolidParentsCount.Add(-1); counter == 0 { + fmt.Println("unsolid counter", counter) if block.SetSolid() { + fmt.Println("unsolid Trigger", counter) b.events.BlockSolid.Trigger(block) } } From 32ebf8b507af13f68a0b802a0048bb5522349643 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Fri, 1 Dec 2023 13:22:15 +0100 Subject: [PATCH 10/16] Fix: fixed tests --- pkg/core/buffer/unsolid_commitment_buffer.go | 2 +- pkg/protocol/chain.go | 6 +- pkg/protocol/commitment.go | 1 + .../blockdag/inmemoryblockdag/blockdag.go | 14 +---- pkg/protocol/protocol_warp_sync.go | 2 +- pkg/testsuite/mock/node.go | 60 +++++++++++++++++-- 6 files changed, 62 insertions(+), 23 deletions(-) diff --git a/pkg/core/buffer/unsolid_commitment_buffer.go b/pkg/core/buffer/unsolid_commitment_buffer.go index 27df0dd51..a6417a4d7 100644 --- a/pkg/core/buffer/unsolid_commitment_buffer.go +++ b/pkg/core/buffer/unsolid_commitment_buffer.go @@ -51,7 +51,7 @@ func (u *UnsolidCommitmentBuffer[V]) Add(commitmentID iotago.CommitmentID, value u.mutex.RLock() defer u.mutex.RUnlock() - if commitmentID.Slot() <= u.lastEvictedSlot { + if u.lastEvictedSlot != 0 && commitmentID.Slot() <= u.lastEvictedSlot { return false } diff --git a/pkg/protocol/chain.go b/pkg/protocol/chain.go index 8c26912dd..aeaa16c02 100644 --- a/pkg/protocol/chain.go +++ b/pkg/protocol/chain.go @@ -92,7 +92,7 @@ func newChain(chains *Chains) *Chain { ClaimedWeight: reactive.NewVariable[uint64](), AttestedWeight: reactive.NewVariable[uint64](), VerifiedWeight: reactive.NewVariable[uint64](), - WarpSyncMode: reactive.NewVariable[bool](), + WarpSyncMode: reactive.NewVariable[bool]().Init(true), WarpSyncThreshold: reactive.NewVariable[iotago.SlotIndex](), OutOfSyncThreshold: reactive.NewVariable[iotago.SlotIndex](), RequestAttestations: reactive.NewVariable[bool](), @@ -326,9 +326,9 @@ func (c *Chain) deriveOutOfSyncThreshold(latestSeenSlot reactive.ReadableVariabl // committable age or 0 if this would cause an overflow to the negative numbers). func (c *Chain) deriveWarpSyncThreshold(latestSeenSlot reactive.ReadableVariable[iotago.SlotIndex], engineInstance *engine.Engine) func() { return c.WarpSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex { - warpSyncOffset := engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge() + warpSyncOffset := engineInstance.LatestAPI().ProtocolParameters().MinCommittableAge() if warpSyncOffset < latestSeenSlot { - return latestSeenSlot - warpSyncOffset + return latestSeenSlot - warpSyncOffset + 1 } return 0 diff --git a/pkg/protocol/commitment.go b/pkg/protocol/commitment.go index ca06a32ab..d739bc43f 100644 --- a/pkg/protocol/commitment.go +++ b/pkg/protocol/commitment.go @@ -158,6 +158,7 @@ func (c *Commitment) initDerivedProperties() (shutdown func()) { return lo.Batch( // mark commitments that are marked as root as verified c.IsCommitted.InheritFrom(c.IsRoot), + c.IsAboveLatestVerifiedCommitment.InheritFrom(c.IsRoot), // mark commitments that are marked as verified as attested, fully booked and committable c.IsAttested.InheritFrom(c.IsCommitted), diff --git a/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go b/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go index 7a2c1828f..6a9c3cdf4 100644 --- a/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go +++ b/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go @@ -1,10 +1,8 @@ package inmemoryblockdag import ( - "fmt" "sync/atomic" - "github.com/iotaledger/hive.go/ds" "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/runtime/event" "github.com/iotaledger/hive.go/runtime/module" @@ -76,8 +74,6 @@ func (b *BlockDAG) setupBlock(block *blocks.Block) { var unsolidParentsCount atomic.Int32 unsolidParentsCount.Store(int32(len(block.Parents()))) - unsolidParents := ds.NewSet[iotago.BlockID]() - block.ForEachParent(func(parent iotago.Parent) { parentBlock, exists := b.blockCache.Block(parent.ID) if !exists { @@ -86,17 +82,9 @@ func (b *BlockDAG) setupBlock(block *blocks.Block) { return } - unsolidParents.Add(parent.ID) - fmt.Println("unsolid Parents of ", block.ID(), unsolidParents) - parentBlock.Solid().OnUpdateOnce(func(_ bool, _ bool) { - unsolidParents.Delete(parent.ID) - fmt.Println("unsolid Parents of ", block.ID(), unsolidParents) - - if counter := unsolidParentsCount.Add(-1); counter == 0 { - fmt.Println("unsolid counter", counter) + if unsolidParentsCount.Add(-1) == 0 { if block.SetSolid() { - fmt.Println("unsolid Trigger", counter) b.events.BlockSolid.Trigger(block) } } diff --git a/pkg/protocol/protocol_warp_sync.go b/pkg/protocol/protocol_warp_sync.go index 844881d89..fb6a3c111 100644 --- a/pkg/protocol/protocol_warp_sync.go +++ b/pkg/protocol/protocol_warp_sync.go @@ -251,7 +251,7 @@ func (w *WarpSyncProtocol) ProcessResponse(commitmentID iotago.CommitmentID, blo // Let's assume that MCA is 5: when we want to book 15, we expect to have the commitment of 10 to load // accounts from it, hence why we make committable the slot at - MCA + 1 with respect of the current slot. minimumCommittableAge := w.protocol.APIForSlot(commitmentID.Slot()).ProtocolParameters().MinCommittableAge() - if committableCommitment, exists := chain.Commitment(commitmentID.Slot() - minimumCommittableAge); exists { + if committableCommitment, exists := chain.Commitment(commitmentID.Slot() - minimumCommittableAge + 1); exists { committableCommitment.IsCommittable.Set(true) } }) diff --git a/pkg/testsuite/mock/node.go b/pkg/testsuite/mock/node.go index 2d1ca30fc..b9031ee2e 100644 --- a/pkg/testsuite/mock/node.go +++ b/pkg/testsuite/mock/node.go @@ -45,6 +45,11 @@ func UnregisterIDAliases() { idAliases = make(map[peer.ID]string) } +type InvalidSignedTransactionEvent struct { + Metadata mempool.SignedTransactionMetadata + Error error +} + type Node struct { Testing *testing.T @@ -72,10 +77,11 @@ type Node struct { logHandler slog.Handler enableEngineLogging bool - mutex syncutils.RWMutex - attachedBlocks []*blocks.Block - currentSlot iotago.SlotIndex - filteredBlockEvents []*postsolidfilter.BlockFilteredEvent + mutex syncutils.RWMutex + attachedBlocks []*blocks.Block + currentSlot iotago.SlotIndex + filteredBlockEvents []*postsolidfilter.BlockFilteredEvent + invalidTransactionEvents map[iotago.SignedTransactionID]InvalidSignedTransactionEvent } func NewNode(t *testing.T, net *Network, partition string, name string, validator bool, logHandler slog.Handler) *Node { @@ -113,7 +119,8 @@ func NewNode(t *testing.T, net *Network, partition string, name string, validato logHandler: logHandler, enableEngineLogging: true, - attachedBlocks: make([]*blocks.Block, 0), + attachedBlocks: make([]*blocks.Block, 0), + invalidTransactionEvents: make(map[iotago.SignedTransactionID]InvalidSignedTransactionEvent), } } @@ -179,6 +186,33 @@ func (n *Node) hookEvents() { n.filteredBlockEvents = append(n.filteredBlockEvents, event) }) + + n.Protocol.Engines.Main.Get().Ledger.MemPool().OnSignedTransactionAttached( + func(signedTransactionMetadata mempool.SignedTransactionMetadata) { + signedTxID := signedTransactionMetadata.ID() + + signedTransactionMetadata.OnSignaturesInvalid(func(err error) { + n.mutex.Lock() + defer n.mutex.Unlock() + + n.invalidTransactionEvents[signedTxID] = InvalidSignedTransactionEvent{ + Metadata: signedTransactionMetadata, + Error: err, + } + }) + + transactionMetadata := signedTransactionMetadata.TransactionMetadata() + + transactionMetadata.OnInvalid(func(err error) { + n.mutex.Lock() + defer n.mutex.Unlock() + + n.invalidTransactionEvents[signedTxID] = InvalidSignedTransactionEvent{ + Metadata: signedTransactionMetadata, + Error: err, + } + }) + }) } func (n *Node) hookLogging(failOnBlockFiltered bool) { @@ -373,6 +407,14 @@ func (n *Node) attachEngineLogsWithName(failOnBlockFiltered bool, instance *engi instance.LogTrace("ConflictDAG.SpendAccepted", "conflictID", conflictID) }) + instance.Ledger.MemPool().OnSignedTransactionAttached( + func(signedTransactionMetadata mempool.SignedTransactionMetadata) { + signedTransactionMetadata.OnSignaturesInvalid(func(err error) { + instance.LogTrace("MemPool.SignedTransactionSignaturesInvalid", "tx", signedTransactionMetadata.ID(), "err", err) + }) + }, + ) + instance.Ledger.OnTransactionAttached(func(transactionMetadata mempool.TransactionMetadata) { instance.LogTrace("Ledger.TransactionAttached", "tx", transactionMetadata.ID()) @@ -487,6 +529,14 @@ func (n *Node) MainEngineSwitchedCount() int { return int(n.mainEngineSwitchedCount.Load()) } +func (n *Node) TransactionFailure(txID iotago.SignedTransactionID) (InvalidSignedTransactionEvent, bool) { + n.mutex.RLock() + defer n.mutex.RUnlock() + event, exists := n.invalidTransactionEvents[txID] + + return event, exists +} + func (n *Node) AttachedBlocks() []*blocks.Block { n.mutex.RLock() defer n.mutex.RUnlock() From b3049b5f74f58fa254968ddb8febf6328b42df2c Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Fri, 1 Dec 2023 13:25:36 +0100 Subject: [PATCH 11/16] Refactor: reverted changes --- pkg/protocol/protocol_blocks.go | 26 ++++---------------------- 1 file changed, 4 insertions(+), 22 deletions(-) diff --git a/pkg/protocol/protocol_blocks.go b/pkg/protocol/protocol_blocks.go index 4833486cd..3afd74807 100644 --- a/pkg/protocol/protocol_blocks.go +++ b/pkg/protocol/protocol_blocks.go @@ -45,7 +45,7 @@ func newBlocksProtocol(protocol *Protocol) *BlocksProtocol { return commitment.ReplayDroppedBlocks.OnUpdate(func(_ bool, replayBlocks bool) { if replayBlocks { for _, droppedBlock := range b.droppedBlocksBuffer.GetValues(commitment.ID()) { - b.LogError("replaying dropped block", "commitmentID", commitment.ID(), "blockID", droppedBlock.A.ID()) + b.LogTrace("replaying dropped block", "commitmentID", commitment.ID(), "blockID", droppedBlock.A.ID()) b.ProcessResponse(droppedBlock.A, droppedBlock.B) } @@ -53,12 +53,6 @@ func newBlocksProtocol(protocol *Protocol) *BlocksProtocol { }) }) - //protocol.Chains.WithElements(func(chain *Chain) func() { - // return chain.Engine.WithNonEmptyValue(func(engineInstance *engine.Engine) (shutdown func()) { - // return engineInstance.Events.BlockRequester.Tick.Hook(b.SendRequest).Unhook - // }) - //}) - protocol.Chains.WithInitializedEngines(func(chain *Chain, engine *engine.Engine) (shutdown func()) { return lo.Batch( engine.Events.BlockRequester.Tick.Hook(b.SendRequest).Unhook, @@ -66,16 +60,6 @@ func newBlocksProtocol(protocol *Protocol) *BlocksProtocol { engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook, ) }) - //protocol.Chains.Main.Get().Engine.OnUpdateWithContext(func(_ *engine.Engine, engine *engine.Engine, unsubscribeOnEngineChange func(subscriptionFactory func() (unsubscribe func()))) { - // if engine != nil { - // unsubscribeOnEngineChange(func() (unsubscribe func()) { - // return lo.Batch( - // engine.Events.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook, - // engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook, - // ) - // }) - // } - //}) }) return b @@ -115,15 +99,13 @@ func (b *BlocksProtocol) ProcessResponse(block *model.Block, from peer.ID) { if !b.droppedBlocksBuffer.Add(block.ProtocolBlock().Header.SlotCommitmentID, types.NewTuple(block, from)) { b.LogError("failed to add dropped block referencing unsolid commitment to dropped blocks buffer", "commitmentID", block.ProtocolBlock().Header.SlotCommitmentID, "blockID", block.ID()) } else { - b.LogDebug("dropped block referencing unsolid commitment added to dropped blocks buffer", "commitmentID", block.ProtocolBlock().Header.SlotCommitmentID, "blockID", block.ID()) + b.LogTrace("dropped block referencing unsolid commitment added to dropped blocks buffer", "commitmentID", block.ProtocolBlock().Header.SlotCommitmentID, "blockID", block.ID()) } return - } else if block.ProtocolBlock().Header.SlotCommitmentID.Slot() >= 19 { - b.LogError("received block", "blockID", block.ID(), "commitment", commitment.LogName()) - } else { - b.LogTrace("received block", "blockID", block.ID(), "commitment", commitment.LogName()) } + + b.LogTrace("received block", "blockID", block.ID(), "commitment", commitment.LogName()) }) } From 9c845ec9d8b03983b10ec37f5ee25ca24aa1f23f Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Fri, 1 Dec 2023 13:26:42 +0100 Subject: [PATCH 12/16] Refactor: reverted more changes --- pkg/tests/loss_of_acceptance_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/tests/loss_of_acceptance_test.go b/pkg/tests/loss_of_acceptance_test.go index 62cf680d8..93ec48b57 100644 --- a/pkg/tests/loss_of_acceptance_test.go +++ b/pkg/tests/loss_of_acceptance_test.go @@ -171,8 +171,6 @@ func TestLossOfAcceptanceFromSnapshot(t *testing.T) { // Need to issue to slot 22 so that all other nodes can warp sync up to slot 19 and then commit slot 20 themselves. { - fmt.Println("ISSUE BLOCKS SO THAT NODES CAN WARP SYNC UP TO SLOT 19") - ts.IssueBlocksAtSlots("", []iotago.SlotIndex{21, 22}, 2, "block0", ts.Nodes("node0-restarted"), true, false) ts.AssertEqualStoredCommitmentAtIndex(20, ts.Nodes()...) From 1e8e9ddac14752b5882a73722e0d31efbaba4089 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Fri, 1 Dec 2023 13:51:48 +0100 Subject: [PATCH 13/16] Refactor: reverted more changes --- pkg/protocol/chain.go | 10 ++++++++++ pkg/protocol/commitment.go | 2 +- pkg/testsuite/mock/node.go | 16 ++++++++-------- pkg/testsuite/storage_settings.go | 15 +++++++++++++-- 4 files changed, 32 insertions(+), 11 deletions(-) diff --git a/pkg/protocol/chain.go b/pkg/protocol/chain.go index aeaa16c02..90378abe0 100644 --- a/pkg/protocol/chain.go +++ b/pkg/protocol/chain.go @@ -157,6 +157,8 @@ func (c *Chain) DispatchBlock(block *model.Block, src peer.ID) (dispatched bool) func (c *Chain) Commitment(slot iotago.SlotIndex) (commitment *Commitment, exists bool) { for currentChain := c; currentChain != nil; { switch forkingPoint := currentChain.ForkingPoint.Get(); { + case forkingPoint == nil: + return nil, false case forkingPoint.Slot() == slot: return forkingPoint, true case slot > forkingPoint.Slot(): @@ -335,6 +337,14 @@ func (c *Chain) deriveWarpSyncThreshold(latestSeenSlot reactive.ReadableVariable }, latestSeenSlot)) } +func warpSyncThreshold(latestSeenSlot iotago.SlotIndex, minCommittableAge iotago.SlotIndex) iotago.SlotIndex { + if minCommittableAge < latestSeenSlot { + return latestSeenSlot - minCommittableAge + 1 + } + + return 0 +} + // addCommitment adds the given commitment to this chain. func (c *Chain) addCommitment(newCommitment *Commitment) (shutdown func()) { c.commitments.Set(newCommitment.Slot(), newCommitment) diff --git a/pkg/protocol/commitment.go b/pkg/protocol/commitment.go index d739bc43f..8a04de88a 100644 --- a/pkg/protocol/commitment.go +++ b/pkg/protocol/commitment.go @@ -158,7 +158,7 @@ func (c *Commitment) initDerivedProperties() (shutdown func()) { return lo.Batch( // mark commitments that are marked as root as verified c.IsCommitted.InheritFrom(c.IsRoot), - c.IsAboveLatestVerifiedCommitment.InheritFrom(c.IsRoot), + c.ReplayDroppedBlocks.InheritFrom(c.IsRoot), // mark commitments that are marked as verified as attested, fully booked and committable c.IsAttested.InheritFrom(c.IsCommitted), diff --git a/pkg/testsuite/mock/node.go b/pkg/testsuite/mock/node.go index b9031ee2e..32394616a 100644 --- a/pkg/testsuite/mock/node.go +++ b/pkg/testsuite/mock/node.go @@ -392,19 +392,19 @@ func (n *Node) attachEngineLogsWithName(failOnBlockFiltered bool, instance *engi }) events.SpendDAG.SpenderCreated.Hook(func(conflictID iotago.TransactionID) { - instance.LogTrace("ConflictDAG.SpendCreated", "conflictID", conflictID) + instance.LogTrace("SpendDAG.SpenderCreated", "conflictID", conflictID) }) events.SpendDAG.SpenderEvicted.Hook(func(conflictID iotago.TransactionID) { - instance.LogTrace("ConflictDAG.SpendEvicted", "conflictID", conflictID) + instance.LogTrace("SpendDAG.SpenderEvicted", "conflictID", conflictID) }) events.SpendDAG.SpenderRejected.Hook(func(conflictID iotago.TransactionID) { - instance.LogTrace("ConflictDAG.SpendRejected", "conflictID", conflictID) + instance.LogTrace("SpendDAG.SpenderRejected", "conflictID", conflictID) }) events.SpendDAG.SpenderAccepted.Hook(func(conflictID iotago.TransactionID) { - instance.LogTrace("ConflictDAG.SpendAccepted", "conflictID", conflictID) + instance.LogTrace("SpendDAG.SpenderAccepted", "conflictID", conflictID) }) instance.Ledger.MemPool().OnSignedTransactionAttached( @@ -525,10 +525,6 @@ func (n *Node) FilteredBlocks() []*postsolidfilter.BlockFilteredEvent { return n.filteredBlockEvents } -func (n *Node) MainEngineSwitchedCount() int { - return int(n.mainEngineSwitchedCount.Load()) -} - func (n *Node) TransactionFailure(txID iotago.SignedTransactionID) (InvalidSignedTransactionEvent, bool) { n.mutex.RLock() defer n.mutex.RUnlock() @@ -537,6 +533,10 @@ func (n *Node) TransactionFailure(txID iotago.SignedTransactionID) (InvalidSigne return event, exists } +func (n *Node) MainEngineSwitchedCount() int { + return int(n.mainEngineSwitchedCount.Load()) +} + func (n *Node) AttachedBlocks() []*blocks.Block { n.mutex.RLock() defer n.mutex.RUnlock() diff --git a/pkg/testsuite/storage_settings.go b/pkg/testsuite/storage_settings.go index 956797be7..04409b25b 100644 --- a/pkg/testsuite/storage_settings.go +++ b/pkg/testsuite/storage_settings.go @@ -68,8 +68,15 @@ func (t *TestSuite) AssertCommitmentSlotIndexExists(slot iotago.SlotIndex, nodes return ierrors.Errorf("AssertCommitmentSlotIndexExists: %s: commitment at index %v not found", node.Name, slot) } + // Make sure the main chain exists + mainChain := node.Protocol.Chains.Main.Get() + if mainChain == nil { + return ierrors.Errorf("AssertCommitmentSlotIndexExists: %s: main chain not found when checking for commitment at index %v", node.Name, slot) + } + // Make sure the commitment is also available in the ChainManager. - if node.Protocol.Chains.Main.Get().LatestCommitment.Get().ID().Slot() < slot { + latestCommitment := mainChain.LatestCommitment.Get() + if latestCommitment == nil || latestCommitment.ID().Slot() < slot { return ierrors.Errorf("AssertCommitmentSlotIndexExists: %s: commitment at index %v not found in ChainManager", node.Name, slot) } @@ -126,8 +133,12 @@ func (t *TestSuite) AssertChainID(expectedChainID iotago.CommitmentID, nodes ... for _, node := range nodes { t.Eventually(func() error { - actualChainID := node.Protocol.Chains.Main.Get().ForkingPoint.Get().ID() + mainChain := node.Protocol.Chains.Main.Get() + if mainChain == nil { + return ierrors.Errorf("AssertChainID: %s: main chain not found", node.Name) + } + actualChainID := mainChain.ForkingPoint.Get().ID() if expectedChainID != actualChainID { fmt.Println(expectedChainID, actualChainID) From 52694bef06d4504eca82f04c65c9f48acf308509 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Fri, 1 Dec 2023 15:07:31 +0100 Subject: [PATCH 14/16] Fix: fixed bugs and refactored code --- pkg/protocol/chain.go | 162 ++++++++++++----------------- pkg/protocol/commitment.go | 21 ++-- pkg/protocol/protocol_warp_sync.go | 13 +-- 3 files changed, 86 insertions(+), 110 deletions(-) diff --git a/pkg/protocol/chain.go b/pkg/protocol/chain.go index 90378abe0..c7d4eadc6 100644 --- a/pkg/protocol/chain.go +++ b/pkg/protocol/chain.go @@ -45,8 +45,8 @@ type Chain struct { // latest verified commitment. VerifiedWeight reactive.Variable[uint64] - // WarpSyncMode contains a flag that indicates whether this chain is in warp sync mode. - WarpSyncMode reactive.Variable[bool] + // WarpSyncModeEnabled contains a flag that indicates whether this chain is in warp sync mode. + WarpSyncModeEnabled reactive.Variable[bool] // WarpSyncThreshold contains the slot at which the chain will exit warp sync mode which is derived from the latest // network slot minus the max committable age. @@ -92,7 +92,7 @@ func newChain(chains *Chains) *Chain { ClaimedWeight: reactive.NewVariable[uint64](), AttestedWeight: reactive.NewVariable[uint64](), VerifiedWeight: reactive.NewVariable[uint64](), - WarpSyncMode: reactive.NewVariable[bool]().Init(true), + WarpSyncModeEnabled: reactive.NewVariable[bool]().Init(true), WarpSyncThreshold: reactive.NewVariable[iotago.SlotIndex](), OutOfSyncThreshold: reactive.NewVariable[iotago.SlotIndex](), RequestAttestations: reactive.NewVariable[bool](), @@ -188,7 +188,7 @@ func (c *Chain) initLogger() (shutdown func()) { c.Logger, shutdown = c.chains.NewEntityLogger("") return lo.Batch( - c.WarpSyncMode.LogUpdates(c, log.LevelTrace, "WarpSyncMode"), + c.WarpSyncModeEnabled.LogUpdates(c, log.LevelTrace, "WarpSyncModeEnabled"), c.WarpSyncThreshold.LogUpdates(c, log.LevelTrace, "WarpSyncThreshold"), c.OutOfSyncThreshold.LogUpdates(c, log.LevelTrace, "OutOfSyncThreshold"), c.ForkingPoint.LogUpdates(c, log.LevelTrace, "ForkingPoint", (*Commitment).LogName), @@ -209,10 +209,21 @@ func (c *Chain) initLogger() (shutdown func()) { // initDerivedProperties initializes the behavior of this chain by setting up the relations between its properties. func (c *Chain) initDerivedProperties() (shutdown func()) { return lo.Batch( - c.deriveClaimedWeight(), - c.deriveVerifiedWeight(), - c.deriveLatestAttestedWeight(), - c.deriveWarpSyncMode(), + c.ClaimedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestCommitment *Commitment) uint64 { + return latestCommitment.cumulativeWeight() + }, c.LatestCommitment)), + + c.VerifiedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestProducedCommitment *Commitment) uint64 { + return latestProducedCommitment.cumulativeWeight() + }, c.LatestProducedCommitment)), + + c.WarpSyncModeEnabled.DeriveValueFrom(reactive.NewDerivedVariable3(func(warpSyncMode bool, latestProducedCommitment *Commitment, warpSyncThreshold iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool { + return warpSyncModeEnabled(warpSyncMode, latestProducedCommitment, warpSyncThreshold, outOfSyncThreshold) + }, c.LatestProducedCommitment, c.WarpSyncThreshold, c.OutOfSyncThreshold, c.WarpSyncModeEnabled.Get())), + + c.LatestAttestedCommitment.WithNonEmptyValue(func(latestAttestedCommitment *Commitment) (shutdown func()) { + return c.AttestedWeight.InheritFrom(latestAttestedCommitment.CumulativeAttestedWeight) + }), c.ForkingPoint.WithValue(func(forkingPoint *Commitment) (shutdown func()) { return c.deriveParentChain(forkingPoint) @@ -224,65 +235,18 @@ func (c *Chain) initDerivedProperties() (shutdown func()) { c.Engine.WithNonEmptyValue(func(engineInstance *engine.Engine) (shutdown func()) { return lo.Batch( - c.deriveWarpSyncThreshold(c.chains.LatestSeenSlot, engineInstance), - c.deriveOutOfSyncThreshold(c.chains.LatestSeenSlot, engineInstance), + c.WarpSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex { + return warpSyncThreshold(engineInstance, latestSeenSlot) + }, c.chains.LatestSeenSlot)), + + c.OutOfSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex { + return outOfSyncThreshold(engineInstance, latestSeenSlot) + }, c.chains.LatestSeenSlot)), ) }), ) } -// deriveWarpSyncMode defines how a chain determines whether it is in warp sync mode or not. -func (c *Chain) deriveWarpSyncMode() func() { - return c.WarpSyncMode.DeriveValueFrom(reactive.NewDerivedVariable3(func(warpSyncMode bool, latestProducedCommitment *Commitment, warpSyncThreshold iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool { - // latest produced commitment is nil if we have not produced any commitment yet (intermediary state during - // startup) - if latestProducedCommitment == nil { - return warpSyncMode - } - - // if warp sync mode is enabled, keep it enabled until we are no longer below the warp sync threshold - if warpSyncMode { - return latestProducedCommitment.ID().Slot() < warpSyncThreshold - } - - // if warp sync mode is disabled, enable it only if we fall below the out of sync threshold - return latestProducedCommitment.ID().Slot() < outOfSyncThreshold - }, c.LatestProducedCommitment, c.WarpSyncThreshold, c.OutOfSyncThreshold, c.WarpSyncMode.Get())) -} - -// deriveClaimedWeight defines how a chain determines its claimed weight (by setting the cumulative weight of the -// latest commitment). -func (c *Chain) deriveClaimedWeight() (shutdown func()) { - return c.ClaimedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestCommitment *Commitment) uint64 { - if latestCommitment == nil { - return 0 - } - - return latestCommitment.CumulativeWeight() - }, c.LatestCommitment)) -} - -// deriveLatestAttestedWeight defines how a chain determines its attested weight (by inheriting the cumulative attested -// weight of the latest attested commitment). It uses inheritance instead of simply setting the value as the cumulative -// attested weight can change over time depending on the attestations that are received. -func (c *Chain) deriveLatestAttestedWeight() func() { - return c.LatestAttestedCommitment.WithNonEmptyValue(func(latestAttestedCommitment *Commitment) (shutdown func()) { - return c.AttestedWeight.InheritFrom(latestAttestedCommitment.CumulativeAttestedWeight) - }) -} - -// deriveVerifiedWeight defines how a chain determines its verified weight (by setting the cumulative weight of the -// latest produced commitment). -func (c *Chain) deriveVerifiedWeight() func() { - return c.VerifiedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestProducedCommitment *Commitment) uint64 { - if latestProducedCommitment == nil { - return 0 - } - - return latestProducedCommitment.CumulativeWeight() - }, c.LatestProducedCommitment)) -} - // deriveChildChains defines how a chain determines its ChildChains (by adding each child to the set). func (c *Chain) deriveChildChains(child *Chain) func() { c.ChildChains.Add(child) @@ -312,39 +276,6 @@ func (c *Chain) deriveParentChain(forkingPoint *Commitment) (shutdown func()) { return nil } -// deriveOutOfSyncThreshold defines how a chain determines its "out of sync" threshold (the latest seen slot minus 2 -// times the max committable age or 0 if this would cause an overflow to the negative numbers). -func (c *Chain) deriveOutOfSyncThreshold(latestSeenSlot reactive.ReadableVariable[iotago.SlotIndex], engineInstance *engine.Engine) func() { - return c.OutOfSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex { - if outOfSyncOffset := 2 * engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge(); outOfSyncOffset < latestSeenSlot { - return latestSeenSlot - outOfSyncOffset - } - - return 0 - }, latestSeenSlot)) -} - -// deriveWarpSyncThreshold defines how a chain determines its warp sync threshold (the latest seen slot minus the max -// committable age or 0 if this would cause an overflow to the negative numbers). -func (c *Chain) deriveWarpSyncThreshold(latestSeenSlot reactive.ReadableVariable[iotago.SlotIndex], engineInstance *engine.Engine) func() { - return c.WarpSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex { - warpSyncOffset := engineInstance.LatestAPI().ProtocolParameters().MinCommittableAge() - if warpSyncOffset < latestSeenSlot { - return latestSeenSlot - warpSyncOffset + 1 - } - - return 0 - }, latestSeenSlot)) -} - -func warpSyncThreshold(latestSeenSlot iotago.SlotIndex, minCommittableAge iotago.SlotIndex) iotago.SlotIndex { - if minCommittableAge < latestSeenSlot { - return latestSeenSlot - minCommittableAge + 1 - } - - return 0 -} - // addCommitment adds the given commitment to this chain. func (c *Chain) addCommitment(newCommitment *Commitment) (shutdown func()) { c.commitments.Set(newCommitment.Slot(), newCommitment) @@ -373,7 +304,7 @@ func (c *Chain) dispatchBlockToSpawnedEngine(block *model.Block, src peer.ID) (d } // perform additional checks if we are in warp sync mode (only let blocks pass that we requested) - if c.WarpSyncMode.Get() { + if c.WarpSyncModeEnabled.Get() { // abort if the target commitment does not exist targetCommitment, targetCommitmentExists := c.Commitment(targetSlot) if !targetCommitmentExists { @@ -410,3 +341,42 @@ func (c *Chain) verifiedWeight() reactive.Variable[uint64] { func (c *Chain) attestedWeight() reactive.Variable[uint64] { return c.AttestedWeight } + +// warpSyncThreshold returns the slot index at which the warp sync should stop. +func warpSyncThreshold(engineInstance *engine.Engine, latestSlot iotago.SlotIndex) iotago.SlotIndex { + // TODO: explain why we do - 1 here + warpSyncOffset := engineInstance.LatestAPI().ProtocolParameters().MinCommittableAge() - 1 + + // prevent overflow to negative numbers + if warpSyncOffset >= latestSlot { + return 0 + } + + return latestSlot - warpSyncOffset +} + +// outOfSyncThreshold returns the slot index at which the node is considered out of sync. +func outOfSyncThreshold(engineInstance *engine.Engine, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex { + if outOfSyncOffset := 2 * engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge(); outOfSyncOffset < latestSeenSlot { + return latestSeenSlot - outOfSyncOffset + } + + return 0 +} + +// warpSyncModeEnabled determines whether warp sync mode should be enabled or not. +func warpSyncModeEnabled(enabled bool, latestProducedCommitment *Commitment, warpSyncThreshold iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool { + // latest produced commitment is nil if we have not produced any commitment yet (intermediary state during + // startup) + if latestProducedCommitment == nil { + return enabled + } + + // if warp sync mode is enabled, keep it enabled until we are no longer below the warp sync threshold + if enabled { + return latestProducedCommitment.ID().Slot() < warpSyncThreshold + } + + // if warp sync mode is disabled, enable it only if we fall below the out of sync threshold + return latestProducedCommitment.ID().Slot() < outOfSyncThreshold +} diff --git a/pkg/protocol/commitment.go b/pkg/protocol/commitment.go index 8a04de88a..920799b29 100644 --- a/pkg/protocol/commitment.go +++ b/pkg/protocol/commitment.go @@ -275,17 +275,17 @@ func (c *Commitment) deriveRequestAttestations(chain *Chain, parent *Commitment) // deriveWarpSyncBlocks derives the WarpSyncBlocks flag of this Commitment which is true if our Chain is requesting // warp sync, and we are the directly above the latest verified Commitment. func (c *Commitment) deriveWarpSyncBlocks(chain *Chain, parent *Commitment) func() { - return c.WarpSyncBlocks.DeriveValueFrom(reactive.NewDerivedVariable4(func(_ bool, engineInstance *engine.Engine, warpSync bool, parentIsFullyBooked bool, isFullyBooked bool) bool { - return engineInstance != nil && warpSync && parentIsFullyBooked && !isFullyBooked - }, chain.Engine, chain.WarpSyncMode, parent.IsFullyBooked, c.IsFullyBooked)) + return c.WarpSyncBlocks.DeriveValueFrom(reactive.NewDerivedVariable4(func(_ bool, engineInstance *engine.Engine, warpSyncModeEnabled bool, parentIsFullyBooked bool, isFullyBooked bool) bool { + return engineInstance != nil && warpSyncModeEnabled && parentIsFullyBooked && !isFullyBooked + }, chain.Engine, chain.WarpSyncModeEnabled, parent.IsFullyBooked, c.IsFullyBooked)) } // deriveReplayDroppedBlocks derives the ReplayDroppedBlocks flag of this Commitment which is true if our Chain has an // engine, is no longer requesting warp sync, and we are above the latest verified Commitment. func (c *Commitment) deriveReplayDroppedBlocks(chain *Chain) func() { - return c.ReplayDroppedBlocks.DeriveValueFrom(reactive.NewDerivedVariable3(func(_ bool, engineInstance *engine.Engine, warpSyncing bool, isAboveLatestVerifiedCommitment bool) bool { - return engineInstance != nil && !warpSyncing && isAboveLatestVerifiedCommitment - }, chain.Engine, chain.WarpSyncMode, c.IsAboveLatestVerifiedCommitment)) + return c.ReplayDroppedBlocks.DeriveValueFrom(reactive.NewDerivedVariable3(func(_ bool, engineInstance *engine.Engine, warpSyncModeEnabled bool, isAboveLatestVerifiedCommitment bool) bool { + return engineInstance != nil && !warpSyncModeEnabled && isAboveLatestVerifiedCommitment + }, chain.Engine, chain.WarpSyncModeEnabled, c.IsAboveLatestVerifiedCommitment)) } // forceChain forces the Chain of this Commitment to the given Chain by promoting it to the main child of its parent if @@ -297,3 +297,12 @@ func (c *Commitment) forceChain(targetChain *Chain) { } } } + +// cumulativeWeight returns the cumulative weight of this Commitment while gracefully handling nil receivers. +func (c *Commitment) cumulativeWeight() uint64 { + if c == nil { + return 0 + } + + return c.CumulativeWeight() +} diff --git a/pkg/protocol/protocol_warp_sync.go b/pkg/protocol/protocol_warp_sync.go index fb6a3c111..1e898c90c 100644 --- a/pkg/protocol/protocol_warp_sync.go +++ b/pkg/protocol/protocol_warp_sync.go @@ -47,7 +47,7 @@ func newWarpSyncProtocol(protocol *Protocol) *WarpSyncProtocol { protocol.Constructed.OnTrigger(func() { protocol.Chains.WithInitializedEngines(func(chain *Chain, engine *engine.Engine) (shutdown func()) { - return chain.WarpSyncMode.OnUpdate(func(_ bool, warpSyncMode bool) { + return chain.WarpSyncModeEnabled.OnUpdate(func(_ bool, warpSyncMode bool) { if warpSyncMode { engine.Workers.WaitChildren() engine.Reset() @@ -110,7 +110,7 @@ func (w *WarpSyncProtocol) ProcessResponse(commitmentID iotago.CommitmentID, blo return } - if !chain.WarpSyncMode.Get() { + if !chain.WarpSyncModeEnabled.Get() { w.LogTrace("response for chain without warp-sync", "chain", chain.LogName(), "fromPeer", from) return @@ -161,7 +161,7 @@ func (w *WarpSyncProtocol) ProcessResponse(commitmentID iotago.CommitmentID, blo targetEngine.Workers.WaitChildren() - if !chain.WarpSyncMode.Get() { + if !chain.WarpSyncModeEnabled.Get() { w.LogTrace("response for chain without warp-sync", "chain", chain.LogName(), "fromPeer", from) return blocksToWarpSync @@ -177,7 +177,7 @@ func (w *WarpSyncProtocol) ProcessResponse(commitmentID iotago.CommitmentID, blo // 2. Mark all blocks as accepted // 3. Force commitment of the slot forceCommitmentFunc := func() { - if !chain.WarpSyncMode.Get() { + if !chain.WarpSyncModeEnabled.Get() { return } @@ -248,10 +248,7 @@ func (w *WarpSyncProtocol) ProcessResponse(commitmentID iotago.CommitmentID, blo } commitment.IsFullyBooked.OnUpdateOnce(func(_ bool, _ bool) { - // Let's assume that MCA is 5: when we want to book 15, we expect to have the commitment of 10 to load - // accounts from it, hence why we make committable the slot at - MCA + 1 with respect of the current slot. - minimumCommittableAge := w.protocol.APIForSlot(commitmentID.Slot()).ProtocolParameters().MinCommittableAge() - if committableCommitment, exists := chain.Commitment(commitmentID.Slot() - minimumCommittableAge + 1); exists { + if committableCommitment, exists := chain.Commitment(warpSyncThreshold(targetEngine, commitmentID.Slot())); exists { committableCommitment.IsCommittable.Set(true) } }) From 8ad929b0b05b429c22c10b04e337af39062ac334 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Sun, 3 Dec 2023 01:57:17 +0100 Subject: [PATCH 15/16] Refactor: refactored warpsync to be able to handle loss of acceptance --- pkg/protocol/chain.go | 78 ++++++++++++------------------ pkg/protocol/commitment.go | 3 +- pkg/protocol/protocol_warp_sync.go | 16 ++++-- 3 files changed, 43 insertions(+), 54 deletions(-) diff --git a/pkg/protocol/chain.go b/pkg/protocol/chain.go index c7d4eadc6..b03b7a2d0 100644 --- a/pkg/protocol/chain.go +++ b/pkg/protocol/chain.go @@ -29,6 +29,9 @@ type Chain struct { // LatestAttestedCommitment contains the latest commitment of this chain for which attestations were received. LatestAttestedCommitment reactive.Variable[*Commitment] + // LatestFullyBookedCommitment contains the latest commitment of this chain for which all blocks were booked. + LatestFullyBookedCommitment reactive.Variable[*Commitment] + // LatestProducedCommitment contains the latest commitment of this chain that we produced ourselves by booking the // corresponding blocks in the Engine. LatestProducedCommitment reactive.Variable[*Commitment] @@ -48,10 +51,6 @@ type Chain struct { // WarpSyncModeEnabled contains a flag that indicates whether this chain is in warp sync mode. WarpSyncModeEnabled reactive.Variable[bool] - // WarpSyncThreshold contains the slot at which the chain will exit warp sync mode which is derived from the latest - // network slot minus the max committable age. - WarpSyncThreshold reactive.Variable[iotago.SlotIndex] - // OutOfSyncThreshold contains the slot at which the chain will consider itself to be out of sync and switch to warp // sync mode. It is derived from the latest network slot minus two times the max committable age. OutOfSyncThreshold reactive.Variable[iotago.SlotIndex] @@ -83,22 +82,22 @@ type Chain struct { // newChain creates a new chain instance. func newChain(chains *Chains) *Chain { c := &Chain{ - ForkingPoint: reactive.NewVariable[*Commitment](), - ParentChain: reactive.NewVariable[*Chain](), - ChildChains: reactive.NewSet[*Chain](), - LatestCommitment: reactive.NewVariable[*Commitment](), - LatestAttestedCommitment: reactive.NewVariable[*Commitment](), - LatestProducedCommitment: reactive.NewVariable[*Commitment](), - ClaimedWeight: reactive.NewVariable[uint64](), - AttestedWeight: reactive.NewVariable[uint64](), - VerifiedWeight: reactive.NewVariable[uint64](), - WarpSyncModeEnabled: reactive.NewVariable[bool]().Init(true), - WarpSyncThreshold: reactive.NewVariable[iotago.SlotIndex](), - OutOfSyncThreshold: reactive.NewVariable[iotago.SlotIndex](), - RequestAttestations: reactive.NewVariable[bool](), - StartEngine: reactive.NewVariable[bool](), - Engine: reactive.NewVariable[*engine.Engine](), - IsEvicted: reactive.NewEvent(), + ForkingPoint: reactive.NewVariable[*Commitment](), + ParentChain: reactive.NewVariable[*Chain](), + ChildChains: reactive.NewSet[*Chain](), + LatestCommitment: reactive.NewVariable[*Commitment](), + LatestAttestedCommitment: reactive.NewVariable[*Commitment](), + LatestFullyBookedCommitment: reactive.NewVariable[*Commitment](), + LatestProducedCommitment: reactive.NewVariable[*Commitment](), + ClaimedWeight: reactive.NewVariable[uint64](), + AttestedWeight: reactive.NewVariable[uint64](), + VerifiedWeight: reactive.NewVariable[uint64](), + WarpSyncModeEnabled: reactive.NewVariable[bool]().Init(true), + OutOfSyncThreshold: reactive.NewVariable[iotago.SlotIndex](), + RequestAttestations: reactive.NewVariable[bool](), + StartEngine: reactive.NewVariable[bool](), + Engine: reactive.NewVariable[*engine.Engine](), + IsEvicted: reactive.NewEvent(), chains: chains, commitments: shrinkingmap.New[iotago.SlotIndex, *Commitment](), @@ -189,13 +188,14 @@ func (c *Chain) initLogger() (shutdown func()) { return lo.Batch( c.WarpSyncModeEnabled.LogUpdates(c, log.LevelTrace, "WarpSyncModeEnabled"), - c.WarpSyncThreshold.LogUpdates(c, log.LevelTrace, "WarpSyncThreshold"), c.OutOfSyncThreshold.LogUpdates(c, log.LevelTrace, "OutOfSyncThreshold"), c.ForkingPoint.LogUpdates(c, log.LevelTrace, "ForkingPoint", (*Commitment).LogName), c.ClaimedWeight.LogUpdates(c, log.LevelTrace, "ClaimedWeight"), c.AttestedWeight.LogUpdates(c, log.LevelTrace, "AttestedWeight"), c.VerifiedWeight.LogUpdates(c, log.LevelTrace, "VerifiedWeight"), c.LatestCommitment.LogUpdates(c, log.LevelTrace, "LatestCommitment", (*Commitment).LogName), + c.LatestAttestedCommitment.LogUpdates(c, log.LevelTrace, "LatestAttestedCommitment", (*Commitment).LogName), + c.LatestFullyBookedCommitment.LogUpdates(c, log.LevelTrace, "LatestFullyBookedCommitment", (*Commitment).LogName), c.LatestProducedCommitment.LogUpdates(c, log.LevelDebug, "LatestProducedCommitment", (*Commitment).LogName), c.RequestAttestations.LogUpdates(c, log.LevelTrace, "RequestAttestations"), c.StartEngine.LogUpdates(c, log.LevelDebug, "StartEngine"), @@ -217,9 +217,9 @@ func (c *Chain) initDerivedProperties() (shutdown func()) { return latestProducedCommitment.cumulativeWeight() }, c.LatestProducedCommitment)), - c.WarpSyncModeEnabled.DeriveValueFrom(reactive.NewDerivedVariable3(func(warpSyncMode bool, latestProducedCommitment *Commitment, warpSyncThreshold iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool { - return warpSyncModeEnabled(warpSyncMode, latestProducedCommitment, warpSyncThreshold, outOfSyncThreshold) - }, c.LatestProducedCommitment, c.WarpSyncThreshold, c.OutOfSyncThreshold, c.WarpSyncModeEnabled.Get())), + c.WarpSyncModeEnabled.DeriveValueFrom(reactive.NewDerivedVariable3(func(warpSyncMode bool, latestFullyBookedCommitment *Commitment, latestSeenSlot iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool { + return warpSyncModeEnabled(warpSyncMode, latestFullyBookedCommitment, latestSeenSlot, outOfSyncThreshold) + }, c.LatestFullyBookedCommitment, c.chains.LatestSeenSlot, c.OutOfSyncThreshold, c.WarpSyncModeEnabled.Get())), c.LatestAttestedCommitment.WithNonEmptyValue(func(latestAttestedCommitment *Commitment) (shutdown func()) { return c.AttestedWeight.InheritFrom(latestAttestedCommitment.CumulativeAttestedWeight) @@ -234,15 +234,9 @@ func (c *Chain) initDerivedProperties() (shutdown func()) { }), c.Engine.WithNonEmptyValue(func(engineInstance *engine.Engine) (shutdown func()) { - return lo.Batch( - c.WarpSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex { - return warpSyncThreshold(engineInstance, latestSeenSlot) - }, c.chains.LatestSeenSlot)), - - c.OutOfSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex { - return outOfSyncThreshold(engineInstance, latestSeenSlot) - }, c.chains.LatestSeenSlot)), - ) + return c.OutOfSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex { + return outOfSyncThreshold(engineInstance, latestSeenSlot) + }, c.chains.LatestSeenSlot)) }), ) } @@ -284,6 +278,7 @@ func (c *Chain) addCommitment(newCommitment *Commitment) (shutdown func()) { return lo.Batch( newCommitment.IsAttested.OnTrigger(func() { c.LatestAttestedCommitment.Set(newCommitment) }), + newCommitment.IsFullyBooked.OnTrigger(func() { c.LatestFullyBookedCommitment.Set(newCommitment) }), newCommitment.IsCommitted.OnTrigger(func() { c.LatestProducedCommitment.Set(newCommitment) }), ) } @@ -342,19 +337,6 @@ func (c *Chain) attestedWeight() reactive.Variable[uint64] { return c.AttestedWeight } -// warpSyncThreshold returns the slot index at which the warp sync should stop. -func warpSyncThreshold(engineInstance *engine.Engine, latestSlot iotago.SlotIndex) iotago.SlotIndex { - // TODO: explain why we do - 1 here - warpSyncOffset := engineInstance.LatestAPI().ProtocolParameters().MinCommittableAge() - 1 - - // prevent overflow to negative numbers - if warpSyncOffset >= latestSlot { - return 0 - } - - return latestSlot - warpSyncOffset -} - // outOfSyncThreshold returns the slot index at which the node is considered out of sync. func outOfSyncThreshold(engineInstance *engine.Engine, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex { if outOfSyncOffset := 2 * engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge(); outOfSyncOffset < latestSeenSlot { @@ -365,7 +347,7 @@ func outOfSyncThreshold(engineInstance *engine.Engine, latestSeenSlot iotago.Slo } // warpSyncModeEnabled determines whether warp sync mode should be enabled or not. -func warpSyncModeEnabled(enabled bool, latestProducedCommitment *Commitment, warpSyncThreshold iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool { +func warpSyncModeEnabled(enabled bool, latestProducedCommitment *Commitment, latestSeenSlot iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool { // latest produced commitment is nil if we have not produced any commitment yet (intermediary state during // startup) if latestProducedCommitment == nil { @@ -374,7 +356,7 @@ func warpSyncModeEnabled(enabled bool, latestProducedCommitment *Commitment, war // if warp sync mode is enabled, keep it enabled until we are no longer below the warp sync threshold if enabled { - return latestProducedCommitment.ID().Slot() < warpSyncThreshold + return latestProducedCommitment.ID().Slot() < latestSeenSlot } // if warp sync mode is disabled, enable it only if we fall below the out of sync threshold diff --git a/pkg/protocol/commitment.go b/pkg/protocol/commitment.go index 920799b29..1b956deba 100644 --- a/pkg/protocol/commitment.go +++ b/pkg/protocol/commitment.go @@ -160,10 +160,9 @@ func (c *Commitment) initDerivedProperties() (shutdown func()) { c.IsCommitted.InheritFrom(c.IsRoot), c.ReplayDroppedBlocks.InheritFrom(c.IsRoot), - // mark commitments that are marked as verified as attested, fully booked and committable + // mark commitments that are marked as verified as attested and fully booked c.IsAttested.InheritFrom(c.IsCommitted), c.IsFullyBooked.InheritFrom(c.IsCommitted), - c.IsCommittable.InheritFrom(c.IsCommitted), c.Parent.WithNonEmptyValue(func(parent *Commitment) func() { // the weight can be fixed as a one time operation (as it only relies on static information from the parent diff --git a/pkg/protocol/protocol_warp_sync.go b/pkg/protocol/protocol_warp_sync.go index 1e898c90c..0b23ea6e3 100644 --- a/pkg/protocol/protocol_warp_sync.go +++ b/pkg/protocol/protocol_warp_sync.go @@ -247,17 +247,25 @@ func (w *WarpSyncProtocol) ProcessResponse(commitmentID iotago.CommitmentID, blo }) } + // Once all blocks are fully booked we can mark the commitment that is minCommittableAge older as this + // commitment to be committable. commitment.IsFullyBooked.OnUpdateOnce(func(_ bool, _ bool) { - if committableCommitment, exists := chain.Commitment(warpSyncThreshold(targetEngine, commitmentID.Slot())); exists { - committableCommitment.IsCommittable.Set(true) + if committableCommitment, exists := chain.Commitment(commitmentID.Slot() - targetEngine.LatestAPI().ProtocolParameters().MinCommittableAge()); exists { + w.workerPool.Submit(func() { + committableCommitment.IsCommittable.Set(true) + }) } }) - commitment.IsCommittable.OnUpdateOnce(func(_ bool, _ bool) { - w.workerPool.Submit(forceCommitmentFunc) + // force commit one by one and wait for the parent to be committed before we can commit the next one + commitment.Parent.WithNonEmptyValue(func(parent *Commitment) (teardown func()) { + return parent.IsCommitted.WithNonEmptyValue(func(_ bool) (teardown func()) { + return commitment.IsCommittable.OnTrigger(forceCommitmentFunc) + }) }) if totalBlocks == 0 { + commitment.IsCommittable.Set(true) commitment.IsFullyBooked.Set(true) return blocksToWarpSync From 89f0213796298dc9c54b9c1ef3f50e96edbc0188 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Sun, 3 Dec 2023 02:14:32 +0100 Subject: [PATCH 16/16] Refactor: fixed typo + race condition --- pkg/protocol/chain.go | 12 ++++++------ pkg/protocol/commitment_verifier.go | 9 +++++++++ pkg/protocol/protocol_warp_sync.go | 4 ++-- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/pkg/protocol/chain.go b/pkg/protocol/chain.go index b03b7a2d0..86b1d1d30 100644 --- a/pkg/protocol/chain.go +++ b/pkg/protocol/chain.go @@ -217,8 +217,8 @@ func (c *Chain) initDerivedProperties() (shutdown func()) { return latestProducedCommitment.cumulativeWeight() }, c.LatestProducedCommitment)), - c.WarpSyncModeEnabled.DeriveValueFrom(reactive.NewDerivedVariable3(func(warpSyncMode bool, latestFullyBookedCommitment *Commitment, latestSeenSlot iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool { - return warpSyncModeEnabled(warpSyncMode, latestFullyBookedCommitment, latestSeenSlot, outOfSyncThreshold) + c.WarpSyncModeEnabled.DeriveValueFrom(reactive.NewDerivedVariable3(func(enabled bool, latestFullyBookedCommitment *Commitment, latestSeenSlot iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool { + return warpSyncModeEnabled(enabled, latestFullyBookedCommitment, latestSeenSlot, outOfSyncThreshold) }, c.LatestFullyBookedCommitment, c.chains.LatestSeenSlot, c.OutOfSyncThreshold, c.WarpSyncModeEnabled.Get())), c.LatestAttestedCommitment.WithNonEmptyValue(func(latestAttestedCommitment *Commitment) (shutdown func()) { @@ -347,18 +347,18 @@ func outOfSyncThreshold(engineInstance *engine.Engine, latestSeenSlot iotago.Slo } // warpSyncModeEnabled determines whether warp sync mode should be enabled or not. -func warpSyncModeEnabled(enabled bool, latestProducedCommitment *Commitment, latestSeenSlot iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool { +func warpSyncModeEnabled(enabled bool, latestFullyBookedCommitment *Commitment, latestSeenSlot iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool { // latest produced commitment is nil if we have not produced any commitment yet (intermediary state during // startup) - if latestProducedCommitment == nil { + if latestFullyBookedCommitment == nil { return enabled } // if warp sync mode is enabled, keep it enabled until we are no longer below the warp sync threshold if enabled { - return latestProducedCommitment.ID().Slot() < latestSeenSlot + return latestFullyBookedCommitment.ID().Slot() < latestSeenSlot } // if warp sync mode is disabled, enable it only if we fall below the out of sync threshold - return latestProducedCommitment.ID().Slot() < outOfSyncThreshold + return latestFullyBookedCommitment.ID().Slot() < outOfSyncThreshold } diff --git a/pkg/protocol/commitment_verifier.go b/pkg/protocol/commitment_verifier.go index ab600fbc9..3ee2b4cc8 100644 --- a/pkg/protocol/commitment_verifier.go +++ b/pkg/protocol/commitment_verifier.go @@ -5,6 +5,7 @@ import ( "github.com/iotaledger/hive.go/ds" "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/kvstore/mapdb" + "github.com/iotaledger/hive.go/runtime/syncutils" "github.com/iotaledger/iota-core/pkg/model" "github.com/iotaledger/iota-core/pkg/protocol/engine" "github.com/iotaledger/iota-core/pkg/protocol/engine/accounts" @@ -22,6 +23,9 @@ type CommitmentVerifier struct { // validatorAccountsData is the accounts data of the validators for the current epoch as known at lastCommonSlotBeforeFork. // Initially, it is set to the accounts data of the validators for the epoch of the last common commitment before the fork. validatorAccountsData map[iotago.AccountID]*accounts.AccountData + + // mutex is used to synchronize access to validatorAccountsData and epoch. + mutex syncutils.RWMutex } func newCommitmentVerifier(mainEngine *engine.Engine, lastCommonCommitmentBeforeFork *model.Commitment) (*CommitmentVerifier, error) { @@ -76,6 +80,7 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio // This is necessary because the committee might have rotated at the epoch boundary and different validators might be part of it. // In case anything goes wrong we keep using previously known accounts data (initially set to the accounts data // of the validators for the epoch of the last common commitment before the fork). + c.mutex.Lock() apiForSlot := c.engine.APIForSlot(commitment.Slot()) commitmentEpoch := apiForSlot.TimeProvider().EpochFromSlot(commitment.Slot()) if commitmentEpoch > c.epoch { @@ -92,6 +97,7 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio } } } + c.mutex.Unlock() // 3. Verify attestations. blockIDs, seatCount, err := c.verifyAttestations(attestations) @@ -107,6 +113,9 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio } func (c *CommitmentVerifier) verifyAttestations(attestations []*iotago.Attestation) (iotago.BlockIDs, uint64, error) { + c.mutex.RLock() + defer c.mutex.RUnlock() + visitedIdentities := ds.NewSet[iotago.AccountID]() var blockIDs iotago.BlockIDs var seatCount uint64 diff --git a/pkg/protocol/protocol_warp_sync.go b/pkg/protocol/protocol_warp_sync.go index 0b23ea6e3..374aaa3c6 100644 --- a/pkg/protocol/protocol_warp_sync.go +++ b/pkg/protocol/protocol_warp_sync.go @@ -47,8 +47,8 @@ func newWarpSyncProtocol(protocol *Protocol) *WarpSyncProtocol { protocol.Constructed.OnTrigger(func() { protocol.Chains.WithInitializedEngines(func(chain *Chain, engine *engine.Engine) (shutdown func()) { - return chain.WarpSyncModeEnabled.OnUpdate(func(_ bool, warpSyncMode bool) { - if warpSyncMode { + return chain.WarpSyncModeEnabled.OnUpdate(func(_ bool, warpSyncModeEnabled bool) { + if warpSyncModeEnabled { engine.Workers.WaitChildren() engine.Reset() }