From 06e4c6a97d71211ed1fde9933ea4a8d0ddc229d8 Mon Sep 17 00:00:00 2001 From: Piotr Macek <4007944+piotrm50@users.noreply.github.com> Date: Fri, 29 Mar 2024 13:45:48 +0100 Subject: [PATCH 01/20] Disable gossip when in warpsync mode --- pkg/protocol/blocks.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/protocol/blocks.go b/pkg/protocol/blocks.go index 394c8f140..b80ae80a5 100644 --- a/pkg/protocol/blocks.go +++ b/pkg/protocol/blocks.go @@ -65,8 +65,16 @@ func newBlocks(protocol *Protocol) *Blocks { protocol.Chains.WithInitializedEngines(func(chain *Chain, engine *engine.Engine) (shutdown func()) { return lo.BatchReverse( engine.Events.BlockRequester.Tick.Hook(b.SendRequest).Unhook, - engine.Events.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook, - engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook, + engine.Events.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { + if !chain.WarpSyncMode.Get() { + b.SendResponse(block.ModelBlock()) + } + }).Unhook, + engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { + if !chain.WarpSyncMode.Get() { + b.SendResponse(block.ModelBlock()) + } + }).Unhook, ) }) }) From 92a8e297aca148beb605f44c15a880764aa5ab00 Mon Sep 17 00:00:00 2001 From: Piotr Macek <4007944+piotrm50@users.noreply.github.com> Date: Fri, 29 Mar 2024 13:46:11 +0100 Subject: [PATCH 02/20] Disable gossip when in warpsync mode --- .../engine/notarization/slotnotarization/manager.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/protocol/engine/notarization/slotnotarization/manager.go b/pkg/protocol/engine/notarization/slotnotarization/manager.go index d14e14167..987b9f0a3 100644 --- a/pkg/protocol/engine/notarization/slotnotarization/manager.go +++ b/pkg/protocol/engine/notarization/slotnotarization/manager.go @@ -115,7 +115,7 @@ func (m *Manager) tryCommitUntil(commitUntilSlot iotago.SlotIndex) { } func (m *Manager) ForceCommit(slot iotago.SlotIndex) (*model.Commitment, error) { - m.LogInfof("Force commit slot %d", slot) + m.LogInfof("force commit slot %d", slot) if m.ShutdownEvent().WasTriggered() { return nil, ierrors.New("notarization manager was stopped") @@ -135,6 +135,8 @@ func (m *Manager) ForceCommit(slot iotago.SlotIndex) (*model.Commitment, error) return nil, ierrors.Wrapf(err, "failed to create commitment for slot %d", slot) } + m.LogInfof("forced committment of %s", commitment.ID()) + return commitment, nil } @@ -147,6 +149,8 @@ func (m *Manager) ForceCommitUntil(commitUntilSlot iotago.SlotIndex) error { } } + m.LogInfo("successfully forced commitment until", "slot", commitUntilSlot) + return nil } @@ -184,6 +188,7 @@ func (m *Manager) tryCommitSlotUntil(acceptedBlockIndex iotago.SlotIndex) { if _, err := m.createCommitment(i); err != nil { m.errorHandler(ierrors.Wrapf(err, "failed to create commitment for slot %d", i)) + return } } From 4b9d3bc5a92be4e1f1c2f6e1985cc66b9d593883 Mon Sep 17 00:00:00 2001 From: Piotr Macek <4007944+piotrm50@users.noreply.github.com> Date: Fri, 29 Mar 2024 14:35:47 +0100 Subject: [PATCH 03/20] Force commit only when no newer commitment is known --- pkg/testsuite/mock/blockissuer_acceptance_loss.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/testsuite/mock/blockissuer_acceptance_loss.go b/pkg/testsuite/mock/blockissuer_acceptance_loss.go index 748ba3b5e..db9eaa86d 100644 --- a/pkg/testsuite/mock/blockissuer_acceptance_loss.go +++ b/pkg/testsuite/mock/blockissuer_acceptance_loss.go @@ -14,6 +14,12 @@ func (i *BlockIssuer) reviveChain(issuingTime time.Time, node *Node) (*iotago.Co issuingSlot := apiForSlot.TimeProvider().SlotFromTime(issuingTime) + // If the chain manager is aware of a commitments on the main chain, then do not force commit. + // The node should wait to warpsync those slots and use those commitments to avoid potentially creating a diverging commitment. + if issuingSlot > apiForSlot.ProtocolParameters().MaxCommittableAge() && + node.Protocol.Chains.Main.Get().LatestCommitment.Get().Slot() < issuingSlot-apiForSlot.ProtocolParameters().MaxCommittableAge() { + + } // Force commitments until minCommittableAge relative to the block's issuing time. We basically "pretend" that // this block was already accepted at the time of issuing so that we have a commitment to reference. if issuingSlot < apiForSlot.ProtocolParameters().MinCommittableAge() { // Should never happen as we're beyond maxCommittableAge which is > minCommittableAge. From bcf5dcbf4a71cb5de11e13d51d0a2058f7f7a518 Mon Sep 17 00:00:00 2001 From: Piotr Macek <4007944+piotrm50@users.noreply.github.com> Date: Fri, 29 Mar 2024 14:36:07 +0100 Subject: [PATCH 04/20] Optimize bottlenecks --- components/prometheus/metrics_scheduler.go | 20 -------------------- pkg/protocol/chains.go | 10 ++++++++-- 2 files changed, 8 insertions(+), 22 deletions(-) diff --git a/components/prometheus/metrics_scheduler.go b/components/prometheus/metrics_scheduler.go index 277c76b6a..f86414c71 100644 --- a/components/prometheus/metrics_scheduler.go +++ b/components/prometheus/metrics_scheduler.go @@ -3,7 +3,6 @@ package prometheus import ( "time" - "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/runtime/event" "github.com/iotaledger/iota-core/components/prometheus/collector" "github.com/iotaledger/iota-core/pkg/protocol/engine/blocks" @@ -16,7 +15,6 @@ const ( queueSizePerNodeCount = "queue_size_per_node_count" validatorQueueSizePerNodeCount = "validator_queue_size_per_node_count" schedulerProcessedBlocks = "processed_blocks" - manaAmountPerNode = "mana_per_node" scheduledBlockLabel = "scheduled" skippedBlockLabel = "skipped" droppedBlockLabel = "dropped" @@ -116,24 +114,6 @@ var SchedulerMetrics = collector.NewCollection(schedulerNamespace, }, event.WithWorkerPool(Component.WorkerPool)) }), )), - collector.WithMetric(collector.NewMetric(manaAmountPerNode, - collector.WithType(collector.Gauge), - collector.WithLabels("issuer_id"), - collector.WithPruningDelay(10*time.Minute), - collector.WithHelp("Current amount of mana of each issuer in the queue."), - collector.WithInitFunc(func() { - deps.Protocol.Events.Engine.Scheduler.BlockEnqueued.Hook(func(block *blocks.Block) { - mana, err := deps.Protocol.Engines.Main.Get().Ledger.ManaManager().GetManaOnAccount(block.ProtocolBlock().Header.IssuerID, block.SlotCommitmentID().Slot()) - if err != nil { - deps.Protocol.Engines.Main.Get().ErrorHandler("metrics")(ierrors.Wrapf(err, "failed to retrieve mana on account %s for slot %d", block.ProtocolBlock().Header.IssuerID.ToHex(), block.SlotCommitmentID().Slot())) - - return - } - - deps.Collector.Update(schedulerNamespace, manaAmountPerNode, float64(mana), block.ProtocolBlock().Header.IssuerID.String()) - }, event.WithWorkerPool(Component.WorkerPool)) - }), - )), collector.WithMetric(collector.NewMetric(schedulerProcessedBlocks, collector.WithType(collector.Counter), collector.WithLabels("state"), diff --git a/pkg/protocol/chains.go b/pkg/protocol/chains.go index d0cecca08..0b222680b 100644 --- a/pkg/protocol/chains.go +++ b/pkg/protocol/chains.go @@ -378,12 +378,18 @@ func (c *Chains) deriveLatestSeenSlot(protocol *Protocol) func() { return lo.BatchReverse( c.WithInitializedEngines(func(_ *Chain, engine *engine.Engine) (shutdown func()) { return engine.LatestCommitment.OnUpdate(func(_ *model.Commitment, latestCommitment *model.Commitment) { - c.LatestSeenSlot.Set(latestCommitment.Slot()) + // Check the value to avoid having to acquire write locks inside of the Set method. + if c.LatestSeenSlot.Get() < latestCommitment.Slot() { + c.LatestSeenSlot.Set(latestCommitment.Slot()) + } }) }), protocol.Network.OnBlockReceived(func(block *model.Block, _ peer.ID) { - c.LatestSeenSlot.Set(block.ProtocolBlock().Header.SlotCommitmentID.Slot()) + // Check the value to avoid having to acquire write locks inside of the Set method. + if c.LatestSeenSlot.Get() < block.ProtocolBlock().Header.SlotCommitmentID.Slot() { + c.LatestSeenSlot.Set(block.ProtocolBlock().Header.SlotCommitmentID.Slot()) + } }), ) }) From 4b7134c290359dfe3d2b84c2ef61b62ccdc7c641 Mon Sep 17 00:00:00 2001 From: Piotr Macek <4007944+piotrm50@users.noreply.github.com> Date: Fri, 29 Mar 2024 15:22:37 +0100 Subject: [PATCH 05/20] Allow only one force-commitment process. --- .../notarization/slotnotarization/manager.go | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/pkg/protocol/engine/notarization/slotnotarization/manager.go b/pkg/protocol/engine/notarization/slotnotarization/manager.go index 987b9f0a3..1d4fb75a7 100644 --- a/pkg/protocol/engine/notarization/slotnotarization/manager.go +++ b/pkg/protocol/engine/notarization/slotnotarization/manager.go @@ -1,6 +1,7 @@ package slotnotarization import ( + "sync/atomic" "time" "github.com/iotaledger/hive.go/ierrors" @@ -45,6 +46,9 @@ type Manager struct { commitmentMutex syncutils.Mutex + // ForceCommitMode contains a flag that indicates whether the manager is in force commit mode. + ForceCommitMode atomic.Bool + module.Module } @@ -109,13 +113,15 @@ func (m *Manager) Shutdown() { // tryCommitUntil tries to create slot commitments until the new provided acceptance time. func (m *Manager) tryCommitUntil(commitUntilSlot iotago.SlotIndex) { - if slot := commitUntilSlot; slot > m.storage.Settings().LatestCommitment().Slot() { - m.tryCommitSlotUntil(slot) + if !m.ForceCommitMode.Load() { + if slot := commitUntilSlot; slot > m.storage.Settings().LatestCommitment().Slot() { + m.tryCommitSlotUntil(slot) + } } } func (m *Manager) ForceCommit(slot iotago.SlotIndex) (*model.Commitment, error) { - m.LogInfof("force commit slot %d", slot) + m.LogInfof("force commit", "slot", slot) if m.ShutdownEvent().WasTriggered() { return nil, ierrors.New("notarization manager was stopped") @@ -135,12 +141,16 @@ func (m *Manager) ForceCommit(slot iotago.SlotIndex) (*model.Commitment, error) return nil, ierrors.Wrapf(err, "failed to create commitment for slot %d", slot) } - m.LogInfof("forced committment of %s", commitment.ID()) + m.LogInfo("forced committment", "commitmentID", commitment.ID()) return commitment, nil } func (m *Manager) ForceCommitUntil(commitUntilSlot iotago.SlotIndex) error { + if m.ForceCommitMode.Swap(true) { + return ierrors.New("force commitment already in progress") + } + m.LogInfo("force committing until", "slot", commitUntilSlot) for i := m.storage.Settings().LatestCommitment().Slot() + 1; i <= commitUntilSlot; i++ { @@ -151,6 +161,8 @@ func (m *Manager) ForceCommitUntil(commitUntilSlot iotago.SlotIndex) error { m.LogInfo("successfully forced commitment until", "slot", commitUntilSlot) + m.ForceCommitMode.Store(false) + return nil } From 8f9079a257f9b999ef3a48973deb2ca6d71a477b Mon Sep 17 00:00:00 2001 From: Piotr Macek <4007944+piotrm50@users.noreply.github.com> Date: Fri, 29 Mar 2024 17:13:31 +0100 Subject: [PATCH 06/20] Fix linter suggestion --- pkg/protocol/blocks.go | 1 - pkg/protocol/engine/notarization/slotnotarization/manager.go | 2 +- pkg/testsuite/mock/blockissuer_acceptance_loss.go | 3 ++- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/protocol/blocks.go b/pkg/protocol/blocks.go index b80ae80a5..537875fc3 100644 --- a/pkg/protocol/blocks.go +++ b/pkg/protocol/blocks.go @@ -61,7 +61,6 @@ func newBlocks(protocol *Protocol) *Blocks { }) }) - //nolint:revive protocol.Chains.WithInitializedEngines(func(chain *Chain, engine *engine.Engine) (shutdown func()) { return lo.BatchReverse( engine.Events.BlockRequester.Tick.Hook(b.SendRequest).Unhook, diff --git a/pkg/protocol/engine/notarization/slotnotarization/manager.go b/pkg/protocol/engine/notarization/slotnotarization/manager.go index 1d4fb75a7..ba963aa76 100644 --- a/pkg/protocol/engine/notarization/slotnotarization/manager.go +++ b/pkg/protocol/engine/notarization/slotnotarization/manager.go @@ -141,7 +141,7 @@ func (m *Manager) ForceCommit(slot iotago.SlotIndex) (*model.Commitment, error) return nil, ierrors.Wrapf(err, "failed to create commitment for slot %d", slot) } - m.LogInfo("forced committment", "commitmentID", commitment.ID()) + m.LogInfo("forced commitment", "commitmentID", commitment.ID()) return commitment, nil } diff --git a/pkg/testsuite/mock/blockissuer_acceptance_loss.go b/pkg/testsuite/mock/blockissuer_acceptance_loss.go index db9eaa86d..26809e225 100644 --- a/pkg/testsuite/mock/blockissuer_acceptance_loss.go +++ b/pkg/testsuite/mock/blockissuer_acceptance_loss.go @@ -18,8 +18,9 @@ func (i *BlockIssuer) reviveChain(issuingTime time.Time, node *Node) (*iotago.Co // The node should wait to warpsync those slots and use those commitments to avoid potentially creating a diverging commitment. if issuingSlot > apiForSlot.ProtocolParameters().MaxCommittableAge() && node.Protocol.Chains.Main.Get().LatestCommitment.Get().Slot() < issuingSlot-apiForSlot.ProtocolParameters().MaxCommittableAge() { - + return nil, iotago.EmptyBlockID, ierrors.Errorf("chain manager is aware of a newer commitment", issuingSlot, apiForSlot.ProtocolParameters().MinCommittableAge()) } + // Force commitments until minCommittableAge relative to the block's issuing time. We basically "pretend" that // this block was already accepted at the time of issuing so that we have a commitment to reference. if issuingSlot < apiForSlot.ProtocolParameters().MinCommittableAge() { // Should never happen as we're beyond maxCommittableAge which is > minCommittableAge. From 9b435131a54747d7792f099c5345422cd411905f Mon Sep 17 00:00:00 2001 From: Piotr Macek <4007944+piotrm50@users.noreply.github.com> Date: Fri, 29 Mar 2024 17:13:55 +0100 Subject: [PATCH 07/20] Force commit only if chainmanager does not know newer commitment. --- components/inx/server_commitments.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/components/inx/server_commitments.go b/components/inx/server_commitments.go index 8721e313d..298c15b9f 100644 --- a/components/inx/server_commitments.go +++ b/components/inx/server_commitments.go @@ -158,6 +158,15 @@ func (s *Server) ListenToCommitments(req *inx.SlotRangeRequest, srv inx.INX_List } func (s *Server) ForceCommitUntil(_ context.Context, slot *inx.SlotRequest) (*inx.NoParams, error) { + // If the chain manager is aware of a commitments on the main chain, then do not force commit. + // The node should wait to warpsync those slots and use those commitments to avoid potentially creating a diverging commitment. + unwrappedSlot := slot.Unwrap() + protocolParams := deps.Protocol.APIForSlot(unwrappedSlot).ProtocolParameters() + if latestChainCommitment := deps.Protocol.Chains.Main.Get().LatestCommitment.Get(); unwrappedSlot > protocolParams.MaxCommittableAge() && + latestChainCommitment.Slot() < unwrappedSlot-protocolParams.MaxCommittableAge() { + return nil, ierrors.Errorf("chain manager is aware of a newer commitment (%s) than target slot %d", latestChainCommitment, unwrappedSlot) + } + err := deps.Protocol.Engines.Main.Get().Notarization.ForceCommitUntil(slot.Unwrap()) if err != nil { return nil, ierrors.Wrapf(err, "error while performing force commit until %d", slot.GetSlot()) From cdc1c9594de0cc317caaed9941f7d2a7bf7c3012 Mon Sep 17 00:00:00 2001 From: muXxer Date: Sat, 30 Mar 2024 20:21:14 +0100 Subject: [PATCH 08/20] Fix errorf format --- pkg/testsuite/mock/blockissuer_acceptance_loss.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/testsuite/mock/blockissuer_acceptance_loss.go b/pkg/testsuite/mock/blockissuer_acceptance_loss.go index 26809e225..cf7afb7d9 100644 --- a/pkg/testsuite/mock/blockissuer_acceptance_loss.go +++ b/pkg/testsuite/mock/blockissuer_acceptance_loss.go @@ -18,7 +18,7 @@ func (i *BlockIssuer) reviveChain(issuingTime time.Time, node *Node) (*iotago.Co // The node should wait to warpsync those slots and use those commitments to avoid potentially creating a diverging commitment. if issuingSlot > apiForSlot.ProtocolParameters().MaxCommittableAge() && node.Protocol.Chains.Main.Get().LatestCommitment.Get().Slot() < issuingSlot-apiForSlot.ProtocolParameters().MaxCommittableAge() { - return nil, iotago.EmptyBlockID, ierrors.Errorf("chain manager is aware of a newer commitment", issuingSlot, apiForSlot.ProtocolParameters().MinCommittableAge()) + return nil, iotago.EmptyBlockID, ierrors.Errorf("chain manager is aware of a newer commitment, slot: %d, minCommittableAge: %d", issuingSlot, apiForSlot.ProtocolParameters().MinCommittableAge()) } // Force commitments until minCommittableAge relative to the block's issuing time. We basically "pretend" that From 03115c2f7bccd0c7c854261bed427d725ab024fc Mon Sep 17 00:00:00 2001 From: muXxer Date: Sat, 30 Mar 2024 20:21:19 +0100 Subject: [PATCH 09/20] Remove scheduler_mana_per_node from the dashboards --- .../dashboards/local_dashboard_old.json | 62 --------- .../dashboards/scheduler-metrics.json | 69 ---------- .../dashboards/local_dashboard_old.json | 124 ------------------ .../dashboards/scheduler-metrics.json | 69 ---------- 4 files changed, 324 deletions(-) diff --git a/deploy/ansible/roles/metrics/files/grafana/provisioning/dashboards/local_dashboard_old.json b/deploy/ansible/roles/metrics/files/grafana/provisioning/dashboards/local_dashboard_old.json index 80cbdaa3f..71764139e 100644 --- a/deploy/ansible/roles/metrics/files/grafana/provisioning/dashboards/local_dashboard_old.json +++ b/deploy/ansible/roles/metrics/files/grafana/provisioning/dashboards/local_dashboard_old.json @@ -3844,68 +3844,6 @@ "title": "Scheduler metrics", "type": "row" }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "description": "", - "fieldConfig": { - "defaults": { - "color": { - "mode": "thresholds" - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green" - } - ] - }, - "unit": "none" - }, - "overrides": [] - }, - "gridPos": { - "h": 6, - "w": 15, - "x": 0, - "y": 115 - }, - "id": 146, - "options": { - "displayMode": "lcd", - "minVizHeight": 10, - "minVizWidth": 0, - "orientation": "horizontal", - "reduceOptions": { - "calcs": [ - "last" - ], - "fields": "", - "values": false - }, - "showUnfilled": true, - "text": {}, - "valueMode": "color" - }, - "pluginVersion": "9.5.6", - "targets": [ - { - "exemplar": true, - "expr": "topk(15, sum by (node_id) (scheduler_queue_size_per_node_work{instance=~\"$instance\"}/scheduler_mana_per_node{instance=~\"$instance\"}))", - "instant": true, - "interval": "", - "legendFormat": "Node: {{node_id}}", - "queryType": "randomWalk", - "refId": "A" - } - ], - "title": "Mana-scaled queue size", - "type": "bargauge" - }, { "datasource": { "type": "prometheus", diff --git a/deploy/ansible/roles/metrics/files/grafana/provisioning/dashboards/scheduler-metrics.json b/deploy/ansible/roles/metrics/files/grafana/provisioning/dashboards/scheduler-metrics.json index e76dd6b1c..386427ba7 100644 --- a/deploy/ansible/roles/metrics/files/grafana/provisioning/dashboards/scheduler-metrics.json +++ b/deploy/ansible/roles/metrics/files/grafana/provisioning/dashboards/scheduler-metrics.json @@ -46,75 +46,6 @@ "title": "Scheduler metrics", "type": "row" }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "description": "", - "fieldConfig": { - "defaults": { - "color": { - "mode": "thresholds" - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - } - ] - }, - "unit": "none" - }, - "overrides": [] - }, - "gridPos": { - "h": 6, - "w": 15, - "x": 0, - "y": 1 - }, - "id": 146, - "options": { - "displayMode": "lcd", - "minVizHeight": 10, - "minVizWidth": 0, - "orientation": "horizontal", - "reduceOptions": { - "calcs": [ - "last" - ], - "fields": "", - "values": false - }, - "showUnfilled": true, - "text": {}, - "valueMode": "color" - }, - "pluginVersion": "9.5.6", - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "editorMode": "code", - "exemplar": true, - "expr": "topk(15, sum by (issuer_id) (scheduler_queue_size_per_node_count{instance=~\"$instance\"}/scheduler_mana_per_node{instance=~\"$instance\"}))", - "hide": false, - "instant": true, - "interval": "", - "legendFormat": "Issuer: {{issuer_id}}", - "queryType": "randomWalk", - "refId": "A" - } - ], - "title": "Mana-scaled queue size", - "type": "bargauge" - }, { "datasource": { "type": "prometheus", diff --git a/tools/docker-network/grafana/provisioning/dashboards/local_dashboard_old.json b/tools/docker-network/grafana/provisioning/dashboards/local_dashboard_old.json index 80cbdaa3f..222e18a68 100644 --- a/tools/docker-network/grafana/provisioning/dashboards/local_dashboard_old.json +++ b/tools/docker-network/grafana/provisioning/dashboards/local_dashboard_old.json @@ -3765,68 +3765,6 @@ "title": "Total number of finalized conflicts", "type": "stat" }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "thresholds" - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green" - }, - { - "color": "red", - "value": 80 - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 4, - "w": 6, - "x": 18, - "y": 110 - }, - "id": 126, - "options": { - "colorMode": "value", - "graphMode": "area", - "justifyMode": "auto", - "orientation": "auto", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false - }, - "text": {}, - "textMode": "auto" - }, - "pluginVersion": "9.5.6", - "targets": [ - { - "exemplar": true, - "expr": "conflict_resolved_total{instance=~\"$instance\"}", - "interval": "", - "legendFormat": "", - "queryType": "randomWalk", - "refId": "A" - } - ], - "title": "Total number of confirmed conflicts", - "type": "stat" - }, { "collapsed": false, "datasource": { @@ -3844,68 +3782,6 @@ "title": "Scheduler metrics", "type": "row" }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "description": "", - "fieldConfig": { - "defaults": { - "color": { - "mode": "thresholds" - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green" - } - ] - }, - "unit": "none" - }, - "overrides": [] - }, - "gridPos": { - "h": 6, - "w": 15, - "x": 0, - "y": 115 - }, - "id": 146, - "options": { - "displayMode": "lcd", - "minVizHeight": 10, - "minVizWidth": 0, - "orientation": "horizontal", - "reduceOptions": { - "calcs": [ - "last" - ], - "fields": "", - "values": false - }, - "showUnfilled": true, - "text": {}, - "valueMode": "color" - }, - "pluginVersion": "9.5.6", - "targets": [ - { - "exemplar": true, - "expr": "topk(15, sum by (node_id) (scheduler_queue_size_per_node_work{instance=~\"$instance\"}/scheduler_mana_per_node{instance=~\"$instance\"}))", - "instant": true, - "interval": "", - "legendFormat": "Node: {{node_id}}", - "queryType": "randomWalk", - "refId": "A" - } - ], - "title": "Mana-scaled queue size", - "type": "bargauge" - }, { "datasource": { "type": "prometheus", diff --git a/tools/docker-network/grafana/provisioning/dashboards/scheduler-metrics.json b/tools/docker-network/grafana/provisioning/dashboards/scheduler-metrics.json index e76dd6b1c..386427ba7 100644 --- a/tools/docker-network/grafana/provisioning/dashboards/scheduler-metrics.json +++ b/tools/docker-network/grafana/provisioning/dashboards/scheduler-metrics.json @@ -46,75 +46,6 @@ "title": "Scheduler metrics", "type": "row" }, - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "description": "", - "fieldConfig": { - "defaults": { - "color": { - "mode": "thresholds" - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - } - ] - }, - "unit": "none" - }, - "overrides": [] - }, - "gridPos": { - "h": 6, - "w": 15, - "x": 0, - "y": 1 - }, - "id": 146, - "options": { - "displayMode": "lcd", - "minVizHeight": 10, - "minVizWidth": 0, - "orientation": "horizontal", - "reduceOptions": { - "calcs": [ - "last" - ], - "fields": "", - "values": false - }, - "showUnfilled": true, - "text": {}, - "valueMode": "color" - }, - "pluginVersion": "9.5.6", - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "PBFA97CFB590B2093" - }, - "editorMode": "code", - "exemplar": true, - "expr": "topk(15, sum by (issuer_id) (scheduler_queue_size_per_node_count{instance=~\"$instance\"}/scheduler_mana_per_node{instance=~\"$instance\"}))", - "hide": false, - "instant": true, - "interval": "", - "legendFormat": "Issuer: {{issuer_id}}", - "queryType": "randomWalk", - "refId": "A" - } - ], - "title": "Mana-scaled queue size", - "type": "bargauge" - }, { "datasource": { "type": "prometheus", From 4a4f0a6c9b50852d55cf4601e56d6c95d00a4b34 Mon Sep 17 00:00:00 2001 From: muXxer Date: Sat, 30 Mar 2024 20:25:24 +0100 Subject: [PATCH 10/20] Fix log --- pkg/protocol/engine/notarization/slotnotarization/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/protocol/engine/notarization/slotnotarization/manager.go b/pkg/protocol/engine/notarization/slotnotarization/manager.go index ba963aa76..c1fbac137 100644 --- a/pkg/protocol/engine/notarization/slotnotarization/manager.go +++ b/pkg/protocol/engine/notarization/slotnotarization/manager.go @@ -121,7 +121,7 @@ func (m *Manager) tryCommitUntil(commitUntilSlot iotago.SlotIndex) { } func (m *Manager) ForceCommit(slot iotago.SlotIndex) (*model.Commitment, error) { - m.LogInfof("force commit", "slot", slot) + m.LogInfo("force commit", "slot", slot) if m.ShutdownEvent().WasTriggered() { return nil, ierrors.New("notarization manager was stopped") From 04ae0744c21086def1422278ba508873707ee515 Mon Sep 17 00:00:00 2001 From: Piotr Macek <4007944+piotrm50@users.noreply.github.com> Date: Tue, 2 Apr 2024 08:34:29 +0200 Subject: [PATCH 11/20] Fix force commitment condition --- components/inx/server_commitments.go | 6 ++---- pkg/testsuite/mock/blockissuer_acceptance_loss.go | 6 +++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/components/inx/server_commitments.go b/components/inx/server_commitments.go index 298c15b9f..b4cbfa165 100644 --- a/components/inx/server_commitments.go +++ b/components/inx/server_commitments.go @@ -161,13 +161,11 @@ func (s *Server) ForceCommitUntil(_ context.Context, slot *inx.SlotRequest) (*in // If the chain manager is aware of a commitments on the main chain, then do not force commit. // The node should wait to warpsync those slots and use those commitments to avoid potentially creating a diverging commitment. unwrappedSlot := slot.Unwrap() - protocolParams := deps.Protocol.APIForSlot(unwrappedSlot).ProtocolParameters() - if latestChainCommitment := deps.Protocol.Chains.Main.Get().LatestCommitment.Get(); unwrappedSlot > protocolParams.MaxCommittableAge() && - latestChainCommitment.Slot() < unwrappedSlot-protocolParams.MaxCommittableAge() { + if latestChainCommitment := deps.Protocol.Chains.Main.Get().LatestCommitment.Get(); latestChainCommitment.Slot() >= unwrappedSlot { return nil, ierrors.Errorf("chain manager is aware of a newer commitment (%s) than target slot %d", latestChainCommitment, unwrappedSlot) } - err := deps.Protocol.Engines.Main.Get().Notarization.ForceCommitUntil(slot.Unwrap()) + err := deps.Protocol.Engines.Main.Get().Notarization.ForceCommitUntil(unwrappedSlot) if err != nil { return nil, ierrors.Wrapf(err, "error while performing force commit until %d", slot.GetSlot()) } diff --git a/pkg/testsuite/mock/blockissuer_acceptance_loss.go b/pkg/testsuite/mock/blockissuer_acceptance_loss.go index cf7afb7d9..634f61ead 100644 --- a/pkg/testsuite/mock/blockissuer_acceptance_loss.go +++ b/pkg/testsuite/mock/blockissuer_acceptance_loss.go @@ -16,9 +16,9 @@ func (i *BlockIssuer) reviveChain(issuingTime time.Time, node *Node) (*iotago.Co // If the chain manager is aware of a commitments on the main chain, then do not force commit. // The node should wait to warpsync those slots and use those commitments to avoid potentially creating a diverging commitment. - if issuingSlot > apiForSlot.ProtocolParameters().MaxCommittableAge() && - node.Protocol.Chains.Main.Get().LatestCommitment.Get().Slot() < issuingSlot-apiForSlot.ProtocolParameters().MaxCommittableAge() { - return nil, iotago.EmptyBlockID, ierrors.Errorf("chain manager is aware of a newer commitment, slot: %d, minCommittableAge: %d", issuingSlot, apiForSlot.ProtocolParameters().MinCommittableAge()) + if issuingSlot > apiForSlot.ProtocolParameters().MinCommittableAge() && + node.Protocol.Chains.Main.Get().LatestCommitment.Get().Slot() >= issuingSlot-apiForSlot.ProtocolParameters().MinCommittableAge() { + return nil, iotago.EmptyBlockID, ierrors.Errorf("chain manager is aware of a newer commitment, slot: %d, minCommittableAge: %d", issuingSlot-apiForSlot.ProtocolParameters().MinCommittableAge(), apiForSlot.ProtocolParameters().MinCommittableAge()) } // Force commitments until minCommittableAge relative to the block's issuing time. We basically "pretend" that From bda5f5e1622d148ad38dc00e5713e2afc0b02625 Mon Sep 17 00:00:00 2001 From: Piotr Macek <4007944+piotrm50@users.noreply.github.com> Date: Tue, 2 Apr 2024 08:55:57 +0200 Subject: [PATCH 12/20] Fix a deadlock during warpsync and publishing commitment. --- pkg/protocol/commitments.go | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/pkg/protocol/commitments.go b/pkg/protocol/commitments.go index 8828e7c23..793564248 100644 --- a/pkg/protocol/commitments.go +++ b/pkg/protocol/commitments.go @@ -141,25 +141,28 @@ func (c *Commitments) initRequester() (shutdown func()) { // publishRootCommitment publishes the root commitment of the main engine. func (c *Commitments) publishRootCommitment(mainChain *Chain, mainEngine *engine.Engine) func() { return mainEngine.RootCommitment.OnUpdate(func(_ *model.Commitment, rootCommitment *model.Commitment) { - publishedCommitment, published, err := c.publishCommitment(rootCommitment) - if err != nil { - c.LogError("failed to publish new root commitment", "id", rootCommitment.ID(), "error", err) + // Use workerpool to avoid a deadlock when + c.workerPool.Submit(func() { + publishedCommitment, published, err := c.publishCommitment(rootCommitment) + if err != nil { + c.LogError("failed to publish new root commitment", "id", rootCommitment.ID(), "error", err) - return - } + return + } - publishedCommitment.IsRoot.Set(true) - if published { - publishedCommitment.Chain.Set(mainChain) - } + publishedCommitment.IsRoot.Set(true) + if published { + publishedCommitment.Chain.Set(mainChain) + } - // Update the forking point of a chain only if the root is empty or root belongs to the main chain or the published commitment is on the main chain. - // to avoid updating ForkingPoint of the new mainChain into the past. - if c.Root.Get() == nil || c.Root.Get().Chain.Get() == mainChain || publishedCommitment.Chain.Get() == mainChain { - mainChain.ForkingPoint.Set(publishedCommitment) - } + // Update the forking point of a chain only if the root is empty or root belongs to the main chain or the published commitment is on the main chain. + // to avoid updating ForkingPoint of the new mainChain into the past. + if c.Root.Get() == nil || c.Root.Get().Chain.Get() == mainChain || publishedCommitment.Chain.Get() == mainChain { + mainChain.ForkingPoint.Set(publishedCommitment) + } - c.Root.Set(publishedCommitment) + c.Root.Set(publishedCommitment) + }) }) } From 818aaab9382d24cdcfe59347fe29f32b4559a4df Mon Sep 17 00:00:00 2001 From: Piotr Macek <4007944+piotrm50@users.noreply.github.com> Date: Tue, 2 Apr 2024 09:07:00 +0200 Subject: [PATCH 13/20] Improve a comment --- pkg/protocol/commitments.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/protocol/commitments.go b/pkg/protocol/commitments.go index 793564248..8aaaf9dbe 100644 --- a/pkg/protocol/commitments.go +++ b/pkg/protocol/commitments.go @@ -141,7 +141,9 @@ func (c *Commitments) initRequester() (shutdown func()) { // publishRootCommitment publishes the root commitment of the main engine. func (c *Commitments) publishRootCommitment(mainChain *Chain, mainEngine *engine.Engine) func() { return mainEngine.RootCommitment.OnUpdate(func(_ *model.Commitment, rootCommitment *model.Commitment) { - // Use workerpool to avoid a deadlock when + // Use workerpool to avoid a deadlock when warpSync mode is being enabled at the same time. + // Two goroutines deadlock on Commitment.IsSynced + // https://github.com/iotaledger/iota-core/issues/898 c.workerPool.Submit(func() { publishedCommitment, published, err := c.publishCommitment(rootCommitment) if err != nil { From 5b3d91e3f77a46eab3dcd2658c42a71be1c06f4a Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Mon, 15 Apr 2024 18:02:43 +0800 Subject: [PATCH 14/20] Fix race condition in warp sync where we wait for engine tasks to finish --- pkg/protocol/commitments.go | 35 +++++++++++++++-------------------- pkg/protocol/warp_sync.go | 11 +++++++++-- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/pkg/protocol/commitments.go b/pkg/protocol/commitments.go index 8aaaf9dbe..8828e7c23 100644 --- a/pkg/protocol/commitments.go +++ b/pkg/protocol/commitments.go @@ -141,30 +141,25 @@ func (c *Commitments) initRequester() (shutdown func()) { // publishRootCommitment publishes the root commitment of the main engine. func (c *Commitments) publishRootCommitment(mainChain *Chain, mainEngine *engine.Engine) func() { return mainEngine.RootCommitment.OnUpdate(func(_ *model.Commitment, rootCommitment *model.Commitment) { - // Use workerpool to avoid a deadlock when warpSync mode is being enabled at the same time. - // Two goroutines deadlock on Commitment.IsSynced - // https://github.com/iotaledger/iota-core/issues/898 - c.workerPool.Submit(func() { - publishedCommitment, published, err := c.publishCommitment(rootCommitment) - if err != nil { - c.LogError("failed to publish new root commitment", "id", rootCommitment.ID(), "error", err) + publishedCommitment, published, err := c.publishCommitment(rootCommitment) + if err != nil { + c.LogError("failed to publish new root commitment", "id", rootCommitment.ID(), "error", err) - return - } + return + } - publishedCommitment.IsRoot.Set(true) - if published { - publishedCommitment.Chain.Set(mainChain) - } + publishedCommitment.IsRoot.Set(true) + if published { + publishedCommitment.Chain.Set(mainChain) + } - // Update the forking point of a chain only if the root is empty or root belongs to the main chain or the published commitment is on the main chain. - // to avoid updating ForkingPoint of the new mainChain into the past. - if c.Root.Get() == nil || c.Root.Get().Chain.Get() == mainChain || publishedCommitment.Chain.Get() == mainChain { - mainChain.ForkingPoint.Set(publishedCommitment) - } + // Update the forking point of a chain only if the root is empty or root belongs to the main chain or the published commitment is on the main chain. + // to avoid updating ForkingPoint of the new mainChain into the past. + if c.Root.Get() == nil || c.Root.Get().Chain.Get() == mainChain || publishedCommitment.Chain.Get() == mainChain { + mainChain.ForkingPoint.Set(publishedCommitment) + } - c.Root.Set(publishedCommitment) - }) + c.Root.Set(publishedCommitment) }) } diff --git a/pkg/protocol/warp_sync.go b/pkg/protocol/warp_sync.go index ced1b1f37..9cbe3515f 100644 --- a/pkg/protocol/warp_sync.go +++ b/pkg/protocol/warp_sync.go @@ -48,8 +48,15 @@ func newWarpSync(protocol *Protocol) *WarpSync { protocol.Chains.WithInitializedEngines(func(chain *Chain, engine *engine.Engine) (shutdown func()) { return chain.WarpSyncMode.OnUpdate(func(_ bool, warpSyncModeEnabled bool) { if warpSyncModeEnabled { - engine.Workers.WaitChildren() - engine.Reset() + // We need to wait for all workers of the engine to finish and reset in a separate worker, + // since otherwise we're locking downstream (c.LatestSyncedSlot, c.chains.LatestSeenSlot, c.OutOfSyncThreshold of the chain). + // Which in turn can lead to a deadlock where the engine can't update the LatestSyncedSlot. + // By running it in the warpsync's single worker we also make sure that the engine is reset before + // actually warp syncing/processing new slots. + c.workerPool.Submit(func() { + engine.Workers.WaitChildren() + engine.Reset() + }) } }) }) From f2842d971f1f72ab7d2d82de4bf5b43e135cbbcd Mon Sep 17 00:00:00 2001 From: muXxer Date: Mon, 15 Apr 2024 14:20:30 +0200 Subject: [PATCH 15/20] Add snapshots API --- components/protocol/component.go | 2 + components/restapi/management/component.go | 17 +- components/restapi/management/pruning.go | 4 +- components/restapi/management/snapshots.go | 42 +++++ go.mod | 2 +- go.sum | 4 +- pkg/protocol/engine/engine.go | 32 +++- .../tests/api_management_test.go | 156 +++++++++++++++++- tools/gendoc/go.mod | 2 +- tools/gendoc/go.sum | 4 +- 10 files changed, 242 insertions(+), 23 deletions(-) create mode 100644 components/restapi/management/snapshots.go diff --git a/components/protocol/component.go b/components/protocol/component.go index 7a4d7f0ff..1213303d9 100644 --- a/components/protocol/component.go +++ b/components/protocol/component.go @@ -90,6 +90,7 @@ func initConfigParams(c *dig.Container) error { DatabaseEngine db.Engine `name:"databaseEngine"` BaseToken *BaseToken ProtocolParameters []iotago.ProtocolParameters + SnapshotFilePath string `name:"snapshotFilePath"` } if err := c.Provide(func() cfgResult { @@ -102,6 +103,7 @@ func initConfigParams(c *dig.Container) error { DatabaseEngine: dbEngine, BaseToken: &ParamsProtocol.BaseToken, ProtocolParameters: readProtocolParameters(), + SnapshotFilePath: ParamsProtocol.Snapshot.Path, } }); err != nil { Component.LogPanic(err.Error()) diff --git a/components/restapi/management/component.go b/components/restapi/management/component.go index c3be6c16b..234928b58 100644 --- a/components/restapi/management/component.go +++ b/components/restapi/management/component.go @@ -35,6 +35,7 @@ type dependencies struct { Protocol *protocol.Protocol PeeringConfigManager *p2p.ConfigManager NetworkManager network.Manager + SnapshotFilePath string `name:"snapshotFilePath"` } func configure() error { @@ -79,14 +80,14 @@ func configure() error { return responseByHeader(c, resp, http.StatusOK) }) - // routeGroup.POST(api.ManagementEndpointSnapshotsCreate, func(c echo.Context) error { - // resp, err := createSnapshots(c) - // if err != nil { - // return err - // } - // - // return responseByHeader(c, resp, http.StatusOK) - // }) + routeGroup.POST(api.ManagementEndpointSnapshotsCreate, func(c echo.Context) error { + resp, err := createSnapshots(c) + if err != nil { + return err + } + + return responseByHeader(c, resp, http.StatusOK) + }) return nil } diff --git a/components/restapi/management/pruning.go b/components/restapi/management/pruning.go index 5d5acc03b..47e8ae335 100644 --- a/components/restapi/management/pruning.go +++ b/components/restapi/management/pruning.go @@ -10,8 +10,8 @@ import ( ) func pruneDatabase(c echo.Context) (*api.PruneDatabaseResponse, error) { - if deps.Protocol.Engines.Main.Get().Storage.IsPruning() { - return nil, ierrors.WithMessage(echo.ErrServiceUnavailable, "node is already pruning") + if deps.Protocol.Engines.Main.Get().IsSnapshotting() || deps.Protocol.Engines.Main.Get().Storage.IsPruning() { + return nil, ierrors.WithMessage(echo.ErrServiceUnavailable, "node is already creating a snapshot or pruning is running") } request := &api.PruneDatabaseRequest{} diff --git a/components/restapi/management/snapshots.go b/components/restapi/management/snapshots.go new file mode 100644 index 000000000..6ec07aafc --- /dev/null +++ b/components/restapi/management/snapshots.go @@ -0,0 +1,42 @@ +package management + +import ( + "fmt" + "path/filepath" + "strings" + + "github.com/labstack/echo/v4" + + "github.com/iotaledger/hive.go/ierrors" + "github.com/iotaledger/inx-app/pkg/httpserver" + "github.com/iotaledger/iota.go/v4/api" +) + +func createSnapshots(c echo.Context) (*api.CreateSnapshotResponse, error) { + if deps.Protocol.Engines.Main.Get().IsSnapshotting() || deps.Protocol.Engines.Main.Get().Storage.IsPruning() { + return nil, ierrors.WithMessage(echo.ErrServiceUnavailable, "node is already creating a snapshot or pruning is running") + } + + request := &api.CreateSnapshotRequest{} + if err := c.Bind(request); err != nil { + return nil, ierrors.WithMessagef(httpserver.ErrInvalidParameter, "invalid request: %w", err) + } + if request.Slot == 0 { + return nil, ierrors.WithMessage(httpserver.ErrInvalidParameter, "slot needs to be specified") + } + + directory := filepath.Dir(deps.SnapshotFilePath) + fileName := filepath.Base(deps.SnapshotFilePath) + fileExt := filepath.Ext(fileName) + fileNameWithoutExt := strings.TrimSuffix(fileName, fileExt) + filePath := filepath.Join(directory, fmt.Sprintf("%s_%d%s", fileNameWithoutExt, request.Slot, fileExt)) + + if err := deps.Protocol.Engines.Main.Get().WriteSnapshot(filePath, request.Slot); err != nil { + return nil, ierrors.WithMessagef(echo.ErrInternalServerError, "creating snapshot failed: %w", err) + } + + return &api.CreateSnapshotResponse{ + Slot: request.Slot, + FilePath: filePath, + }, nil +} diff --git a/go.mod b/go.mod index 535f5b2aa..a68c3f289 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/iotaledger/hive.go/stringify v0.0.0-20240326102522-2e37ab3611a3 github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240307101848-db58eb9353ec github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240307100839-48553e1d2022 - github.com/iotaledger/iota.go/v4 v4.0.0-20240325092426-32979eef3205 + github.com/iotaledger/iota.go/v4 v4.0.0-20240415115618-57e9e887bf49 github.com/labstack/echo/v4 v4.11.4 github.com/labstack/gommon v0.4.2 github.com/libp2p/go-libp2p v0.33.1 diff --git a/go.sum b/go.sum index dd4f9f46a..3a1fbcf8b 100644 --- a/go.sum +++ b/go.sum @@ -326,8 +326,8 @@ github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240307100839-48553e1d2022 h1:I178Sa github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240307100839-48553e1d2022/go.mod h1:jTFxIWiMUdAwO263jlJCSWcNLqEkgYEVOFXfjp5aNJM= github.com/iotaledger/iota-crypto-demo v0.0.0-20240320124000-d02f37a4fdff h1:Do8fakxvFaj7dLckoo/z+mRyBdZo8QvT8HcgnQlG2Sg= github.com/iotaledger/iota-crypto-demo v0.0.0-20240320124000-d02f37a4fdff/go.mod h1:aVEutEWFnhDNJBxtVuzy2BeTN+8FAlnR83k7hKV0CFE= -github.com/iotaledger/iota.go/v4 v4.0.0-20240325092426-32979eef3205 h1:nn7nCEezVLmFExiONAJ2XAgZKOJJ+iWrwfDBFdYTKSY= -github.com/iotaledger/iota.go/v4 v4.0.0-20240325092426-32979eef3205/go.mod h1:qn/63CB0/jE1em6ewqDSiz+ovS+E/os7K5b7g2pmJFg= +github.com/iotaledger/iota.go/v4 v4.0.0-20240415115618-57e9e887bf49 h1:1uYaqFeokRrmgkX813vYdn1KTLUGMa97OxJVcOfHm7c= +github.com/iotaledger/iota.go/v4 v4.0.0-20240415115618-57e9e887bf49/go.mod h1:qn/63CB0/jE1em6ewqDSiz+ovS+E/os7K5b7g2pmJFg= github.com/ipfs/boxo v0.18.0 h1:MOL9/AgoV3e7jlVMInicaSdbgralfqSsbkc31dZ9tmw= github.com/ipfs/boxo v0.18.0/go.mod h1:pIZgTWdm3k3pLF9Uq6MB8JEcW07UDwNJjlXW1HELW80= github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= diff --git a/pkg/protocol/engine/engine.go b/pkg/protocol/engine/engine.go index 6ebb6541b..56b7694dc 100644 --- a/pkg/protocol/engine/engine.go +++ b/pkg/protocol/engine/engine.go @@ -4,6 +4,7 @@ import ( "io" "os" "path/filepath" + "sync/atomic" "time" "github.com/libp2p/go-libp2p/core/peer" @@ -43,6 +44,10 @@ import ( iotago "github.com/iotaledger/iota.go/v4" ) +var ( + ErrSnapshottingInProgress = ierrors.New("snapshotting is already in progress") +) + // region Engine ///////////////////////////////////////////////////////////////////////////////////////////////////// type Engine struct { @@ -84,6 +89,8 @@ type Engine struct { chainID iotago.CommitmentID mutex syncutils.RWMutex + isSnapshotting atomic.Bool + optsSnapshotPath string optsEntryPointsDepth int optsSnapshotDepth int @@ -351,10 +358,27 @@ func (e *Engine) CommitmentAPI(commitmentID iotago.CommitmentID) (*CommitmentAPI return NewCommitmentAPI(e, commitmentID), nil } -func (e *Engine) WriteSnapshot(filePath string, targetSlot ...iotago.SlotIndex) (err error) { +func (e *Engine) IsSnapshotting() bool { + return e.isSnapshotting.Load() +} + +func (e *Engine) WriteSnapshot(filePath string, targetSlot ...iotago.SlotIndex) error { + if e.isSnapshotting.Swap(true) { + return ErrSnapshottingInProgress + } + defer e.isSnapshotting.Store(false) + + latestCommittedSlot := e.Storage.Settings().LatestCommitment().Slot() + if len(targetSlot) == 0 { - targetSlot = append(targetSlot, e.Storage.Settings().LatestCommitment().Slot()) - } else if lastPrunedEpoch, hasPruned := e.Storage.LastPrunedEpoch(); hasPruned && e.APIForSlot(targetSlot[0]).TimeProvider().EpochFromSlot(targetSlot[0]) <= lastPrunedEpoch { + targetSlot = append(targetSlot, latestCommittedSlot) + } + + if targetSlot[0] > latestCommittedSlot { + return ierrors.Errorf("impossible to create a snapshot for slot %d because it is not committed yet (latest committed slot %d)", targetSlot[0], latestCommittedSlot) + } + + if lastPrunedEpoch, hasPruned := e.Storage.LastPrunedEpoch(); hasPruned && e.APIForSlot(targetSlot[0]).TimeProvider().EpochFromSlot(targetSlot[0]) <= lastPrunedEpoch { return ierrors.Errorf("impossible to create a snapshot for slot %d because it is pruned (last pruned slot %d)", targetSlot[0], lo.Return1(e.Storage.LastPrunedEpoch())) } @@ -366,7 +390,7 @@ func (e *Engine) WriteSnapshot(filePath string, targetSlot ...iotago.SlotIndex) return ierrors.Wrap(err, "failed to close snapshot file") } - return + return nil } func (e *Engine) ImportSettings(reader io.ReadSeeker) (err error) { diff --git a/tools/docker-network/tests/api_management_test.go b/tools/docker-network/tests/api_management_test.go index f245bdedb..6cc27f4d9 100644 --- a/tools/docker-network/tests/api_management_test.go +++ b/tools/docker-network/tests/api_management_test.go @@ -20,14 +20,14 @@ func getContextWithTimeout(duration time.Duration) context.Context { return ctx } -// Test_PeerManagementAPI tests if the peer management API returns the expected results. +// Test_ManagementAPI_Peers tests if the peer management API returns the expected results. // 1. Run docker network. // 2. List all peers of node 1. // 3. Delete a peer from node 1. // 4. List all peers of node 1 again and check if the peer was deleted. // 5. Re-Add the peer to node 1. // 6. List all peers of node 1 again and check if the peer was added. -func Test_PeerManagementAPI(t *testing.T) { +func Test_ManagementAPI_Peers(t *testing.T) { d := NewDockerTestFramework(t, WithProtocolParametersOptions( iotago.WithTimeProviderOptions(5, time.Now().Unix(), 10, 4), @@ -116,7 +116,7 @@ func Test_PeerManagementAPI(t *testing.T) { } } -func Test_PeerManagementAPI_BadRequests(t *testing.T) { +func Test_ManagementAPI_Peers_BadRequests(t *testing.T) { d := NewDockerTestFramework(t, WithProtocolParametersOptions( iotago.WithTimeProviderOptions(5, time.Now().Unix(), 10, 4), @@ -179,3 +179,153 @@ func Test_PeerManagementAPI_BadRequests(t *testing.T) { t.Run(test.name, test.testFunc) } } + +func Test_ManagementAPI_Pruning(t *testing.T) { + d := NewDockerTestFramework(t, + WithProtocolParametersOptions( + iotago.WithSupplyOptions(1813620509061365, 63, 1, 3, 0, 0, 0), + iotago.WithTimeProviderOptions(5, time.Now().Unix(), 1, 3), + iotago.WithLivenessOptions(2, 2, 3, 4, 5), + iotago.WithCongestionControlOptions(1, 1, 1, 400_000, 250_000, 50_000_000, 1000, 100), + iotago.WithRewardsOptions(8, 10, 2, 384), + iotago.WithTargetCommitteeSize(4), + )) + defer d.Stop() + + d.AddValidatorNode("V1", "docker-network-inx-validator-1-1", "http://localhost:8050", "rms1pzg8cqhfxqhq7pt37y8cs4v5u4kcc48lquy2k73ehsdhf5ukhya3y5rx2w6") + d.AddValidatorNode("V2", "docker-network-inx-validator-2-1", "http://localhost:8060", "rms1pqm4xk8e9ny5w5rxjkvtp249tfhlwvcshyr3pc0665jvp7g3hc875k538hl") + d.AddValidatorNode("V3", "docker-network-inx-validator-3-1", "http://localhost:8070", "rms1pp4wuuz0y42caz48vv876qfpmffswsvg40zz8v79sy8cp0jfxm4kunflcgt") + d.AddValidatorNode("V4", "docker-network-inx-validator-4-1", "http://localhost:8040", "rms1pr8cxs3dzu9xh4cduff4dd4cxdthpjkpwmz2244f75m0urslrsvtsshrrjw") + d.AddNode("node5", "docker-network-node-5-1", "http://localhost:8090") + + runErr := d.Run() + require.NoError(t, runErr) + + d.WaitUntilNetworkReady() + + nodeClientV1 := d.Client("V1") + + // get the management client + managementClient, err := nodeClientV1.Management(getContextWithTimeout(5 * time.Second)) + require.NoError(t, err) + + nextEpochStartSlotIndex := func(slot iotago.SlotIndex) iotago.SlotIndex { + currentEpoch := nodeClientV1.CommittedAPI().TimeProvider().EpochFromSlot(slot) + return nodeClientV1.CommittedAPI().TimeProvider().EpochStart(currentEpoch + 1) + } + + type test struct { + name string + testFunc func(t *testing.T) + } + + tests := []*test{ + { + name: "Test_PruneDatabase_ByEpoch", + testFunc: func(t *testing.T) { + // wait for the next epoch to start + info, err := nodeClientV1.Info(getContextWithTimeout(5 * time.Second)) + require.NoError(t, err) + d.AwaitCommitment(nextEpochStartSlotIndex(info.Status.LatestCommitmentID.Slot())) + + // prune database by epoch + pruneDatabaseResponse, err := managementClient.PruneDatabaseByEpoch(getContextWithTimeout(5*time.Second), 1) + require.NoError(t, err) + require.NotNil(t, pruneDatabaseResponse) + }, + }, + { + name: "Test_PruneDatabase_ByDepth", + testFunc: func(t *testing.T) { + // wait for the next epoch to start + info, err := nodeClientV1.Info(getContextWithTimeout(5 * time.Second)) + require.NoError(t, err) + d.AwaitCommitment(nextEpochStartSlotIndex(info.Status.LatestCommitmentID.Slot())) + + // prune database by depth + pruneDatabaseResponse, err := managementClient.PruneDatabaseByDepth(getContextWithTimeout(5*time.Second), 1) + require.NoError(t, err) + require.NotNil(t, pruneDatabaseResponse) + }, + }, + { + name: "Test_PruneDatabase_BySize", + testFunc: func(t *testing.T) { + // wait for the next epoch to start + info, err := nodeClientV1.Info(getContextWithTimeout(5 * time.Second)) + require.NoError(t, err) + d.AwaitCommitment(nextEpochStartSlotIndex(info.Status.LatestCommitmentID.Slot())) + + // prune database by size + pruneDatabaseResponse, err := managementClient.PruneDatabaseBySize(getContextWithTimeout(5*time.Second), "1M") + require.NoError(t, err) + require.NotNil(t, pruneDatabaseResponse) + }, + }, + } + + for _, test := range tests { + t.Run(test.name, test.testFunc) + } +} + +func Test_ManagementAPI_Snapshots(t *testing.T) { + d := NewDockerTestFramework(t, + WithProtocolParametersOptions( + iotago.WithSupplyOptions(1813620509061365, 63, 1, 3, 0, 0, 0), + iotago.WithTimeProviderOptions(5, time.Now().Unix(), 1, 3), + iotago.WithLivenessOptions(2, 2, 3, 4, 5), + iotago.WithCongestionControlOptions(1, 1, 1, 400_000, 250_000, 50_000_000, 1000, 100), + iotago.WithRewardsOptions(8, 10, 2, 384), + iotago.WithTargetCommitteeSize(4), + )) + defer d.Stop() + + d.AddValidatorNode("V1", "docker-network-inx-validator-1-1", "http://localhost:8050", "rms1pzg8cqhfxqhq7pt37y8cs4v5u4kcc48lquy2k73ehsdhf5ukhya3y5rx2w6") + d.AddValidatorNode("V2", "docker-network-inx-validator-2-1", "http://localhost:8060", "rms1pqm4xk8e9ny5w5rxjkvtp249tfhlwvcshyr3pc0665jvp7g3hc875k538hl") + d.AddValidatorNode("V3", "docker-network-inx-validator-3-1", "http://localhost:8070", "rms1pp4wuuz0y42caz48vv876qfpmffswsvg40zz8v79sy8cp0jfxm4kunflcgt") + d.AddValidatorNode("V4", "docker-network-inx-validator-4-1", "http://localhost:8040", "rms1pr8cxs3dzu9xh4cduff4dd4cxdthpjkpwmz2244f75m0urslrsvtsshrrjw") + d.AddNode("node5", "docker-network-node-5-1", "http://localhost:8090") + + runErr := d.Run() + require.NoError(t, runErr) + + d.WaitUntilNetworkReady() + + nodeClientV1 := d.Client("V1") + + // get the management client + managementClient, err := nodeClientV1.Management(getContextWithTimeout(5 * time.Second)) + require.NoError(t, err) + + nextEpochStartSlotIndex := func(slot iotago.SlotIndex) iotago.SlotIndex { + currentEpoch := nodeClientV1.CommittedAPI().TimeProvider().EpochFromSlot(slot) + return nodeClientV1.CommittedAPI().TimeProvider().EpochStart(currentEpoch + 1) + } + + type test struct { + name string + testFunc func(t *testing.T) + } + + tests := []*test{ + { + name: "Test_CreateSnapshot", + testFunc: func(t *testing.T) { + // wait for the next epoch to start + info, err := nodeClientV1.Info(getContextWithTimeout(5 * time.Second)) + require.NoError(t, err) + d.AwaitCommitment(nextEpochStartSlotIndex(info.Status.LatestCommitmentID.Slot())) + + // create snapshot + snapshotResponse, err := managementClient.CreateSnapshot(getContextWithTimeout(5*time.Second), 1) + require.NoError(t, err) + require.NotNil(t, snapshotResponse) + }, + }, + } + + for _, test := range tests { + t.Run(test.name, test.testFunc) + } +} diff --git a/tools/gendoc/go.mod b/tools/gendoc/go.mod index 54e70dc0c..530d724b7 100644 --- a/tools/gendoc/go.mod +++ b/tools/gendoc/go.mod @@ -75,7 +75,7 @@ require ( github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240307101848-db58eb9353ec // indirect github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240307100839-48553e1d2022 // indirect github.com/iotaledger/iota-crypto-demo v0.0.0-20240320124000-d02f37a4fdff // indirect - github.com/iotaledger/iota.go/v4 v4.0.0-20240325092426-32979eef3205 // indirect + github.com/iotaledger/iota.go/v4 v4.0.0-20240415115618-57e9e887bf49 // indirect github.com/ipfs/boxo v0.18.0 // indirect github.com/ipfs/go-cid v0.4.1 // indirect github.com/ipfs/go-datastore v0.6.0 // indirect diff --git a/tools/gendoc/go.sum b/tools/gendoc/go.sum index f3f480790..b91c4463a 100644 --- a/tools/gendoc/go.sum +++ b/tools/gendoc/go.sum @@ -330,8 +330,8 @@ github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240307100839-48553e1d2022 h1:I178Sa github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240307100839-48553e1d2022/go.mod h1:jTFxIWiMUdAwO263jlJCSWcNLqEkgYEVOFXfjp5aNJM= github.com/iotaledger/iota-crypto-demo v0.0.0-20240320124000-d02f37a4fdff h1:Do8fakxvFaj7dLckoo/z+mRyBdZo8QvT8HcgnQlG2Sg= github.com/iotaledger/iota-crypto-demo v0.0.0-20240320124000-d02f37a4fdff/go.mod h1:aVEutEWFnhDNJBxtVuzy2BeTN+8FAlnR83k7hKV0CFE= -github.com/iotaledger/iota.go/v4 v4.0.0-20240325092426-32979eef3205 h1:nn7nCEezVLmFExiONAJ2XAgZKOJJ+iWrwfDBFdYTKSY= -github.com/iotaledger/iota.go/v4 v4.0.0-20240325092426-32979eef3205/go.mod h1:qn/63CB0/jE1em6ewqDSiz+ovS+E/os7K5b7g2pmJFg= +github.com/iotaledger/iota.go/v4 v4.0.0-20240415115618-57e9e887bf49 h1:1uYaqFeokRrmgkX813vYdn1KTLUGMa97OxJVcOfHm7c= +github.com/iotaledger/iota.go/v4 v4.0.0-20240415115618-57e9e887bf49/go.mod h1:qn/63CB0/jE1em6ewqDSiz+ovS+E/os7K5b7g2pmJFg= github.com/ipfs/boxo v0.18.0 h1:MOL9/AgoV3e7jlVMInicaSdbgralfqSsbkc31dZ9tmw= github.com/ipfs/boxo v0.18.0/go.mod h1:pIZgTWdm3k3pLF9Uq6MB8JEcW07UDwNJjlXW1HELW80= github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= From dd258921bea32eaef3f4ece4d6f8c4a64bd26b7f Mon Sep 17 00:00:00 2001 From: Andrew Date: Mon, 15 Apr 2024 15:51:35 +0100 Subject: [PATCH 16/20] fix BIC underflow and add test --- .../engine/accounts/accountsledger/manager.go | 3 +-- pkg/tests/accounts_test.go | 12 ++++++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pkg/protocol/engine/accounts/accountsledger/manager.go b/pkg/protocol/engine/accounts/accountsledger/manager.go index 916a3a20f..7416434d5 100644 --- a/pkg/protocol/engine/accounts/accountsledger/manager.go +++ b/pkg/protocol/engine/accounts/accountsledger/manager.go @@ -531,12 +531,11 @@ func (m *Manager) commitAccountTree(slot iotago.SlotIndex, accountDiffChanges ma if diffChange.BICChange != 0 || !exists { // decay the credits to the current slot if the account exists - if exists { + if exists && accountData.Credits.Value > 0 { decayedPreviousCredits, err := m.apiProvider.APIForSlot(slot).ManaDecayProvider().DecayManaBySlots(iotago.Mana(accountData.Credits.Value), accountData.Credits.UpdateSlot, slot) if err != nil { return ierrors.Wrapf(err, "can't retrieve account, could not decay credits for account %s in slot %d", accountData.ID, slot) } - // update the account data diff taking into account the decay, the modified diff will be stored in the calling // ApplyDiff function to be able to properly rollback the account to a previous slot. diffChange.BICChange -= accountData.Credits.Value - iotago.BlockIssuanceCredits(decayedPreviousCredits) diff --git a/pkg/tests/accounts_test.go b/pkg/tests/accounts_test.go index dfe4cde40..8c32a9352 100644 --- a/pkg/tests/accounts_test.go +++ b/pkg/tests/accounts_test.go @@ -464,7 +464,10 @@ func Test_NegativeBIC_BlockIssuerLocked(t *testing.T) { iotago.SlotIndex(0), testsuite.GenesisTimeWithOffsetBySlots(iotago.SlotIndex(200), testsuite.DefaultSlotDurationInSeconds), testsuite.DefaultSlotDurationInSeconds, - testsuite.DefaultSlotsPerEpochExponent, + 3, + ), + iotago.WithLivenessOptions( + 10, 10, 2, 4, 5, ), ), ) @@ -588,6 +591,7 @@ func Test_NegativeBIC_BlockIssuerLocked(t *testing.T) { // Allot some mana to the locked account to unlock it. // The locked wallet 2 is preparing and signs the transaction, but it's issued by wallet 1 whose account is not locked. { + latestParents = ts.CommitUntilSlot(block3Slot+ts.API.TimeProvider().EpochDurationSlots(), ts.BlockID("block2.1")) allottedBIC := testPayloadCost tx1 := wallet2.AllotManaFromInputs("TX1", iotago.Allotments{&iotago.Allotment{ @@ -608,7 +612,7 @@ func Test_NegativeBIC_BlockIssuerLocked(t *testing.T) { ts.AssertAccountData(&accounts.AccountData{ ID: wallet1.BlockIssuer.AccountData.ID, - Credits: accounts.NewBlockIssuanceCredits(wallet1BIC, block3Slot), + Credits: accounts.NewBlockIssuanceCredits(wallet1BIC, block31.ID().Slot()), OutputID: wallet1OutputID, ExpirySlot: iotago.MaxSlotIndex, BlockIssuerKeys: wallet1.BlockIssuer.BlockIssuerKeys(), @@ -616,7 +620,7 @@ func Test_NegativeBIC_BlockIssuerLocked(t *testing.T) { ts.AssertAccountData(&accounts.AccountData{ ID: wallet2.BlockIssuer.AccountData.ID, - Credits: accounts.NewBlockIssuanceCredits(wallet2BIC, block3Slot), + Credits: accounts.NewBlockIssuanceCredits(wallet2BIC, block31.ID().Slot()), ExpirySlot: iotago.MaxSlotIndex, OutputID: wallet2OutputID, BlockIssuerKeys: wallet2.BlockIssuer.BlockIssuerKeys(), @@ -639,7 +643,7 @@ func Test_NegativeBIC_BlockIssuerLocked(t *testing.T) { ts.AssertAccountData(&accounts.AccountData{ ID: wallet1.BlockIssuer.AccountData.ID, - Credits: accounts.NewBlockIssuanceCredits(wallet1BIC, block3Slot), + Credits: accounts.NewBlockIssuanceCredits(wallet1BIC, ts.BlockID("block3.1").Slot()), OutputID: wallet1OutputID, ExpirySlot: iotago.MaxSlotIndex, BlockIssuerKeys: wallet1.BlockIssuer.BlockIssuerKeys(), From 47e63ee373d7796f51721eb0b64e4e1e73934146 Mon Sep 17 00:00:00 2001 From: muXxer Date: Mon, 15 Apr 2024 17:40:32 +0200 Subject: [PATCH 17/20] Fix tests --- .../tests/api_management_test.go | 54 ++++++++++--------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/tools/docker-network/tests/api_management_test.go b/tools/docker-network/tests/api_management_test.go index 6cc27f4d9..a423df753 100644 --- a/tools/docker-network/tests/api_management_test.go +++ b/tools/docker-network/tests/api_management_test.go @@ -183,9 +183,9 @@ func Test_ManagementAPI_Peers_BadRequests(t *testing.T) { func Test_ManagementAPI_Pruning(t *testing.T) { d := NewDockerTestFramework(t, WithProtocolParametersOptions( - iotago.WithSupplyOptions(1813620509061365, 63, 1, 3, 0, 0, 0), - iotago.WithTimeProviderOptions(5, time.Now().Unix(), 1, 3), - iotago.WithLivenessOptions(2, 2, 3, 4, 5), + iotago.WithSupplyOptions(1813620509061365, 63, 1, 4, 0, 0, 0), + iotago.WithTimeProviderOptions(0, time.Now().Unix(), 3, 4), + iotago.WithLivenessOptions(3, 4, 2, 4, 8), iotago.WithCongestionControlOptions(1, 1, 1, 400_000, 250_000, 50_000_000, 1000, 100), iotago.WithRewardsOptions(8, 10, 2, 384), iotago.WithTargetCommitteeSize(4), @@ -209,9 +209,14 @@ func Test_ManagementAPI_Pruning(t *testing.T) { managementClient, err := nodeClientV1.Management(getContextWithTimeout(5 * time.Second)) require.NoError(t, err) - nextEpochStartSlotIndex := func(slot iotago.SlotIndex) iotago.SlotIndex { - currentEpoch := nodeClientV1.CommittedAPI().TimeProvider().EpochFromSlot(slot) - return nodeClientV1.CommittedAPI().TimeProvider().EpochStart(currentEpoch + 1) + awaitNextEpoch := func() { + info, err := nodeClientV1.Info(getContextWithTimeout(5 * time.Second)) + require.NoError(t, err) + + currentEpoch := nodeClientV1.CommittedAPI().TimeProvider().EpochFromSlot(info.Status.LatestCommitmentID.Slot()) + + // await the start slot of the next epoch + d.AwaitCommitment(nodeClientV1.CommittedAPI().TimeProvider().EpochStart(currentEpoch + 1)) } type test struct { @@ -223,10 +228,10 @@ func Test_ManagementAPI_Pruning(t *testing.T) { { name: "Test_PruneDatabase_ByEpoch", testFunc: func(t *testing.T) { - // wait for the next epoch to start - info, err := nodeClientV1.Info(getContextWithTimeout(5 * time.Second)) - require.NoError(t, err) - d.AwaitCommitment(nextEpochStartSlotIndex(info.Status.LatestCommitmentID.Slot())) + // we need to wait until epoch 3 to be able to prune epoch 1 + awaitNextEpoch() + awaitNextEpoch() + awaitNextEpoch() // prune database by epoch pruneDatabaseResponse, err := managementClient.PruneDatabaseByEpoch(getContextWithTimeout(5*time.Second), 1) @@ -238,9 +243,7 @@ func Test_ManagementAPI_Pruning(t *testing.T) { name: "Test_PruneDatabase_ByDepth", testFunc: func(t *testing.T) { // wait for the next epoch to start - info, err := nodeClientV1.Info(getContextWithTimeout(5 * time.Second)) - require.NoError(t, err) - d.AwaitCommitment(nextEpochStartSlotIndex(info.Status.LatestCommitmentID.Slot())) + awaitNextEpoch() // prune database by depth pruneDatabaseResponse, err := managementClient.PruneDatabaseByDepth(getContextWithTimeout(5*time.Second), 1) @@ -252,9 +255,7 @@ func Test_ManagementAPI_Pruning(t *testing.T) { name: "Test_PruneDatabase_BySize", testFunc: func(t *testing.T) { // wait for the next epoch to start - info, err := nodeClientV1.Info(getContextWithTimeout(5 * time.Second)) - require.NoError(t, err) - d.AwaitCommitment(nextEpochStartSlotIndex(info.Status.LatestCommitmentID.Slot())) + awaitNextEpoch() // prune database by size pruneDatabaseResponse, err := managementClient.PruneDatabaseBySize(getContextWithTimeout(5*time.Second), "1M") @@ -272,9 +273,9 @@ func Test_ManagementAPI_Pruning(t *testing.T) { func Test_ManagementAPI_Snapshots(t *testing.T) { d := NewDockerTestFramework(t, WithProtocolParametersOptions( - iotago.WithSupplyOptions(1813620509061365, 63, 1, 3, 0, 0, 0), - iotago.WithTimeProviderOptions(5, time.Now().Unix(), 1, 3), - iotago.WithLivenessOptions(2, 2, 3, 4, 5), + iotago.WithSupplyOptions(1813620509061365, 63, 1, 4, 0, 0, 0), + iotago.WithTimeProviderOptions(0, time.Now().Unix(), 3, 4), + iotago.WithLivenessOptions(3, 4, 2, 4, 8), iotago.WithCongestionControlOptions(1, 1, 1, 400_000, 250_000, 50_000_000, 1000, 100), iotago.WithRewardsOptions(8, 10, 2, 384), iotago.WithTargetCommitteeSize(4), @@ -298,9 +299,14 @@ func Test_ManagementAPI_Snapshots(t *testing.T) { managementClient, err := nodeClientV1.Management(getContextWithTimeout(5 * time.Second)) require.NoError(t, err) - nextEpochStartSlotIndex := func(slot iotago.SlotIndex) iotago.SlotIndex { - currentEpoch := nodeClientV1.CommittedAPI().TimeProvider().EpochFromSlot(slot) - return nodeClientV1.CommittedAPI().TimeProvider().EpochStart(currentEpoch + 1) + awaitNextEpoch := func() { + info, err := nodeClientV1.Info(getContextWithTimeout(5 * time.Second)) + require.NoError(t, err) + + currentEpoch := nodeClientV1.CommittedAPI().TimeProvider().EpochFromSlot(info.Status.LatestCommitmentID.Slot()) + + // await the start slot of the next epoch + d.AwaitCommitment(nodeClientV1.CommittedAPI().TimeProvider().EpochStart(currentEpoch + 1)) } type test struct { @@ -313,9 +319,7 @@ func Test_ManagementAPI_Snapshots(t *testing.T) { name: "Test_CreateSnapshot", testFunc: func(t *testing.T) { // wait for the next epoch to start - info, err := nodeClientV1.Info(getContextWithTimeout(5 * time.Second)) - require.NoError(t, err) - d.AwaitCommitment(nextEpochStartSlotIndex(info.Status.LatestCommitmentID.Slot())) + awaitNextEpoch() // create snapshot snapshotResponse, err := managementClient.CreateSnapshot(getContextWithTimeout(5*time.Second), 1) From d2abde3781afc913f83150c366ed2ee6a891fd01 Mon Sep 17 00:00:00 2001 From: muXxer Date: Mon, 15 Apr 2024 17:54:32 +0200 Subject: [PATCH 18/20] Remove p2p first packet received check --- pkg/network/errors.go | 2 -- pkg/network/p2p/manager.go | 20 +++----------------- 2 files changed, 3 insertions(+), 19 deletions(-) diff --git a/pkg/network/errors.go b/pkg/network/errors.go index 060cd475c..a73a74e85 100644 --- a/pkg/network/errors.go +++ b/pkg/network/errors.go @@ -11,8 +11,6 @@ var ( ErrLoopbackPeer = ierrors.New("loopback connection not allowed") // ErrDuplicatePeer is returned when the same peer is added more than once. ErrDuplicatePeer = ierrors.New("already connected") - // ErrFirstPacketNotReceived is returned when the first packet from a peer is not received. - ErrFirstPacketNotReceived = ierrors.New("first packet not received") // ErrMaxAutopeeringPeersReached is returned when the maximum number of autopeering peers is reached. ErrMaxAutopeeringPeersReached = ierrors.New("max autopeering peers reached") ) diff --git a/pkg/network/p2p/manager.go b/pkg/network/p2p/manager.go index ab2573c7d..30064518b 100644 --- a/pkg/network/p2p/manager.go +++ b/pkg/network/p2p/manager.go @@ -2,7 +2,6 @@ package p2p import ( "context" - "time" "github.com/libp2p/go-libp2p/core/host" p2pnetwork "github.com/libp2p/go-libp2p/core/network" @@ -145,7 +144,7 @@ func (m *Manager) DialPeer(ctx context.Context, peer *network.Peer) error { return ierrors.Wrapf(err, "failed to update peer %s", peer.ID.String()) } - if err := m.addNeighbor(ctx, peer, ps); err != nil { + if err := m.addNeighbor(peer, ps); err != nil { m.closeStream(stream) return ierrors.Wrapf(err, "failed to add neighbor %s", peer.ID.String()) @@ -347,7 +346,7 @@ func (m *Manager) handleStream(stream p2pnetwork.Stream) { return } - if err := m.addNeighbor(m.ctx, networkPeer, ps); err != nil { + if err := m.addNeighbor(networkPeer, ps); err != nil { m.logger.LogErrorf("failed to add neighbor, peerID: %s, error: %s", peerID.String(), err.Error()) m.closeStream(stream) @@ -376,7 +375,7 @@ func (m *Manager) neighbor(id peer.ID) (*neighbor, error) { return nbr, nil } -func (m *Manager) addNeighbor(ctx context.Context, peer *network.Peer, ps *PacketsStream) error { +func (m *Manager) addNeighbor(peer *network.Peer, ps *PacketsStream) error { if peer.ID == m.libp2pHost.ID() { return ierrors.WithStack(network.ErrLoopbackPeer) } @@ -392,9 +391,6 @@ func (m *Manager) addNeighbor(ctx context.Context, peer *network.Peer, ps *Packe return ierrors.WithStack(network.ErrDuplicatePeer) } - firstPacketReceivedCtx, firstPacketReceivedCancel := context.WithDeadline(ctx, time.Now().Add(5*time.Second)) - defer firstPacketReceivedCancel() - var innerErr error nbr := newNeighbor(m.logger, peer, ps, func(nbr *neighbor, packet proto.Message) { m.protocolHandlerMutex.RLock() @@ -410,7 +406,6 @@ func (m *Manager) addNeighbor(ctx context.Context, peer *network.Peer, ps *Packe }, func(nbr *neighbor) { nbr.logger.LogInfof("Neighbor connected: %s", nbr.Peer().ID.String()) nbr.Peer().SetConnStatus(network.ConnStatusConnected) - firstPacketReceivedCancel() m.neighborAdded.Trigger(nbr) }, func(nbr *neighbor) { m.deleteNeighbor(nbr) @@ -426,15 +421,6 @@ func (m *Manager) addNeighbor(ctx context.Context, peer *network.Peer, ps *Packe nbr.readLoop() nbr.writeLoop() - <-firstPacketReceivedCtx.Done() - - if ierrors.Is(firstPacketReceivedCtx.Err(), context.DeadlineExceeded) { - nbr.logger.LogErrorf("First packet not received within deadline") - nbr.Close() - - return ierrors.WithStack(network.ErrFirstPacketNotReceived) - } - return innerErr } From 789a1f49bf87296526e959454cae7563dff07933 Mon Sep 17 00:00:00 2001 From: Andrew Date: Tue, 16 Apr 2024 09:11:50 +0100 Subject: [PATCH 19/20] apply decay to BIC if it is positive --- pkg/requesthandler/accounts.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/pkg/requesthandler/accounts.go b/pkg/requesthandler/accounts.go index ceb698f02..b4437e088 100644 --- a/pkg/requesthandler/accounts.go +++ b/pkg/requesthandler/accounts.go @@ -35,11 +35,22 @@ func (r *RequestHandler) CongestionByAccountAddress(accountAddress *iotago.Accou return nil, ierrors.WithMessagef(echo.ErrNotFound, "account %s not found", accountID.ToHex()) } + blockIssuanceCredits := acc.Credits.Value + // Apply decay to BIC if the value is positive + if acc.Credits.Value > 0 { + manaDecayProvider := r.APIProvider().APIForSlot(commitment.Slot()).ManaDecayProvider() + decayedBIC, err := manaDecayProvider.DecayManaBySlots(iotago.Mana(acc.Credits.Value), acc.Credits.UpdateSlot, commitment.Slot()) + if err != nil { + return nil, ierrors.WithMessagef(echo.ErrInternalServerError, "failed to decay BIC for account %s: %w", accountID.ToHex(), err) + } + blockIssuanceCredits = iotago.BlockIssuanceCredits(decayedBIC) + } + return &api.CongestionResponse{ Slot: commitment.Slot(), Ready: r.protocol.Engines.Main.Get().Scheduler.IsBlockIssuerReady(accountID, workScores...), ReferenceManaCost: commitment.ReferenceManaCost(), - BlockIssuanceCredits: acc.Credits.Value, + BlockIssuanceCredits: blockIssuanceCredits, }, nil } From 042ea6b685c0e948f7fd661ab6887a630b854853 Mon Sep 17 00:00:00 2001 From: Andrew Date: Tue, 16 Apr 2024 09:21:07 +0100 Subject: [PATCH 20/20] muxxer suggestion --- pkg/requesthandler/accounts.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/requesthandler/accounts.go b/pkg/requesthandler/accounts.go index b4437e088..a8c56378c 100644 --- a/pkg/requesthandler/accounts.go +++ b/pkg/requesthandler/accounts.go @@ -37,7 +37,7 @@ func (r *RequestHandler) CongestionByAccountAddress(accountAddress *iotago.Accou blockIssuanceCredits := acc.Credits.Value // Apply decay to BIC if the value is positive - if acc.Credits.Value > 0 { + if blockIssuanceCredits > 0 { manaDecayProvider := r.APIProvider().APIForSlot(commitment.Slot()).ManaDecayProvider() decayedBIC, err := manaDecayProvider.DecayManaBySlots(iotago.Mana(acc.Credits.Value), acc.Credits.UpdateSlot, commitment.Slot()) if err != nil { @@ -45,7 +45,7 @@ func (r *RequestHandler) CongestionByAccountAddress(accountAddress *iotago.Accou } blockIssuanceCredits = iotago.BlockIssuanceCredits(decayedBIC) } - + return &api.CongestionResponse{ Slot: commitment.Slot(), Ready: r.protocol.Engines.Main.Get().Scheduler.IsBlockIssuerReady(accountID, workScores...),