Skip to content

Commit

Permalink
Merge pull request #896 from iotaledger/feat/improve-forcecommit
Browse files Browse the repository at this point in the history
Improve Force Commitment logic
  • Loading branch information
piotrm50 authored Apr 15, 2024
2 parents 6d0a4e3 + 04ae074 commit 7426466
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 353 deletions.
9 changes: 8 additions & 1 deletion components/inx/server_commitments.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,14 @@ func (s *Server) ListenToCommitments(req *inx.SlotRangeRequest, srv inx.INX_List
}

func (s *Server) ForceCommitUntil(_ context.Context, slot *inx.SlotRequest) (*inx.NoParams, error) {
err := deps.Protocol.Engines.Main.Get().Notarization.ForceCommitUntil(slot.Unwrap())
// If the chain manager is aware of a commitments on the main chain, then do not force commit.
// The node should wait to warpsync those slots and use those commitments to avoid potentially creating a diverging commitment.
unwrappedSlot := slot.Unwrap()
if latestChainCommitment := deps.Protocol.Chains.Main.Get().LatestCommitment.Get(); latestChainCommitment.Slot() >= unwrappedSlot {
return nil, ierrors.Errorf("chain manager is aware of a newer commitment (%s) than target slot %d", latestChainCommitment, unwrappedSlot)
}

err := deps.Protocol.Engines.Main.Get().Notarization.ForceCommitUntil(unwrappedSlot)
if err != nil {
return nil, ierrors.Wrapf(err, "error while performing force commit until %d", slot.GetSlot())
}
Expand Down
20 changes: 0 additions & 20 deletions components/prometheus/metrics_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package prometheus
import (
"time"

"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/runtime/event"
"github.com/iotaledger/iota-core/components/prometheus/collector"
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"
Expand All @@ -16,7 +15,6 @@ const (
queueSizePerNodeCount = "queue_size_per_node_count"
validatorQueueSizePerNodeCount = "validator_queue_size_per_node_count"
schedulerProcessedBlocks = "processed_blocks"
manaAmountPerNode = "mana_per_node"
scheduledBlockLabel = "scheduled"
skippedBlockLabel = "skipped"
droppedBlockLabel = "dropped"
Expand Down Expand Up @@ -116,24 +114,6 @@ var SchedulerMetrics = collector.NewCollection(schedulerNamespace,
}, event.WithWorkerPool(Component.WorkerPool))
}),
)),
collector.WithMetric(collector.NewMetric(manaAmountPerNode,
collector.WithType(collector.Gauge),
collector.WithLabels("issuer_id"),
collector.WithPruningDelay(10*time.Minute),
collector.WithHelp("Current amount of mana of each issuer in the queue."),
collector.WithInitFunc(func() {
deps.Protocol.Events.Engine.Scheduler.BlockEnqueued.Hook(func(block *blocks.Block) {
mana, err := deps.Protocol.Engines.Main.Get().Ledger.ManaManager().GetManaOnAccount(block.ProtocolBlock().Header.IssuerID, block.SlotCommitmentID().Slot())
if err != nil {
deps.Protocol.Engines.Main.Get().ErrorHandler("metrics")(ierrors.Wrapf(err, "failed to retrieve mana on account %s for slot %d", block.ProtocolBlock().Header.IssuerID.ToHex(), block.SlotCommitmentID().Slot()))

return
}

deps.Collector.Update(schedulerNamespace, manaAmountPerNode, float64(mana), block.ProtocolBlock().Header.IssuerID.String())
}, event.WithWorkerPool(Component.WorkerPool))
}),
)),
collector.WithMetric(collector.NewMetric(schedulerProcessedBlocks,
collector.WithType(collector.Counter),
collector.WithLabels("state"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3844,68 +3844,6 @@
"title": "Scheduler metrics",
"type": "row"
},
{
"datasource": {
"type": "prometheus",
"uid": "PBFA97CFB590B2093"
},
"description": "",
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 6,
"w": 15,
"x": 0,
"y": 115
},
"id": 146,
"options": {
"displayMode": "lcd",
"minVizHeight": 10,
"minVizWidth": 0,
"orientation": "horizontal",
"reduceOptions": {
"calcs": [
"last"
],
"fields": "",
"values": false
},
"showUnfilled": true,
"text": {},
"valueMode": "color"
},
"pluginVersion": "9.5.6",
"targets": [
{
"exemplar": true,
"expr": "topk(15, sum by (node_id) (scheduler_queue_size_per_node_work{instance=~\"$instance\"}/scheduler_mana_per_node{instance=~\"$instance\"}))",
"instant": true,
"interval": "",
"legendFormat": "Node: {{node_id}}",
"queryType": "randomWalk",
"refId": "A"
}
],
"title": "Mana-scaled queue size",
"type": "bargauge"
},
{
"datasource": {
"type": "prometheus",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,75 +46,6 @@
"title": "Scheduler metrics",
"type": "row"
},
{
"datasource": {
"type": "prometheus",
"uid": "PBFA97CFB590B2093"
},
"description": "",
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 6,
"w": 15,
"x": 0,
"y": 1
},
"id": 146,
"options": {
"displayMode": "lcd",
"minVizHeight": 10,
"minVizWidth": 0,
"orientation": "horizontal",
"reduceOptions": {
"calcs": [
"last"
],
"fields": "",
"values": false
},
"showUnfilled": true,
"text": {},
"valueMode": "color"
},
"pluginVersion": "9.5.6",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "PBFA97CFB590B2093"
},
"editorMode": "code",
"exemplar": true,
"expr": "topk(15, sum by (issuer_id) (scheduler_queue_size_per_node_count{instance=~\"$instance\"}/scheduler_mana_per_node{instance=~\"$instance\"}))",
"hide": false,
"instant": true,
"interval": "",
"legendFormat": "Issuer: {{issuer_id}}",
"queryType": "randomWalk",
"refId": "A"
}
],
"title": "Mana-scaled queue size",
"type": "bargauge"
},
{
"datasource": {
"type": "prometheus",
Expand Down
13 changes: 10 additions & 3 deletions pkg/protocol/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,19 @@ func newBlocks(protocol *Protocol) *Blocks {
})
})

//nolint:revive
protocol.Chains.WithInitializedEngines(func(chain *Chain, engine *engine.Engine) (shutdown func()) {
return lo.BatchReverse(
engine.Events.BlockRequester.Tick.Hook(b.SendRequest).Unhook,
engine.Events.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook,
engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook,
engine.Events.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) {
if !chain.WarpSyncMode.Get() {
b.SendResponse(block.ModelBlock())
}
}).Unhook,
engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) {
if !chain.WarpSyncMode.Get() {
b.SendResponse(block.ModelBlock())
}
}).Unhook,
)
})
})
Expand Down
10 changes: 8 additions & 2 deletions pkg/protocol/chains.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,12 +378,18 @@ func (c *Chains) deriveLatestSeenSlot(protocol *Protocol) func() {
return lo.BatchReverse(
c.WithInitializedEngines(func(_ *Chain, engine *engine.Engine) (shutdown func()) {
return engine.LatestCommitment.OnUpdate(func(_ *model.Commitment, latestCommitment *model.Commitment) {
c.LatestSeenSlot.Set(latestCommitment.Slot())
// Check the value to avoid having to acquire write locks inside of the Set method.
if c.LatestSeenSlot.Get() < latestCommitment.Slot() {
c.LatestSeenSlot.Set(latestCommitment.Slot())
}
})
}),

protocol.Network.OnBlockReceived(func(block *model.Block, _ peer.ID) {
c.LatestSeenSlot.Set(block.ProtocolBlock().Header.SlotCommitmentID.Slot())
// Check the value to avoid having to acquire write locks inside of the Set method.
if c.LatestSeenSlot.Get() < block.ProtocolBlock().Header.SlotCommitmentID.Slot() {
c.LatestSeenSlot.Set(block.ProtocolBlock().Header.SlotCommitmentID.Slot())
}
}),
)
})
Expand Down
23 changes: 20 additions & 3 deletions pkg/protocol/engine/notarization/slotnotarization/manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package slotnotarization

import (
"sync/atomic"
"time"

"github.com/iotaledger/hive.go/ierrors"
Expand Down Expand Up @@ -45,6 +46,9 @@ type Manager struct {

commitmentMutex syncutils.Mutex

// ForceCommitMode contains a flag that indicates whether the manager is in force commit mode.
ForceCommitMode atomic.Bool

module.Module
}

Expand Down Expand Up @@ -109,13 +113,15 @@ func (m *Manager) Shutdown() {

// tryCommitUntil tries to create slot commitments until the new provided acceptance time.
func (m *Manager) tryCommitUntil(commitUntilSlot iotago.SlotIndex) {
if slot := commitUntilSlot; slot > m.storage.Settings().LatestCommitment().Slot() {
m.tryCommitSlotUntil(slot)
if !m.ForceCommitMode.Load() {
if slot := commitUntilSlot; slot > m.storage.Settings().LatestCommitment().Slot() {
m.tryCommitSlotUntil(slot)
}
}
}

func (m *Manager) ForceCommit(slot iotago.SlotIndex) (*model.Commitment, error) {
m.LogInfof("Force commit slot %d", slot)
m.LogInfo("force commit", "slot", slot)

if m.ShutdownEvent().WasTriggered() {
return nil, ierrors.New("notarization manager was stopped")
Expand All @@ -135,10 +141,16 @@ func (m *Manager) ForceCommit(slot iotago.SlotIndex) (*model.Commitment, error)
return nil, ierrors.Wrapf(err, "failed to create commitment for slot %d", slot)
}

m.LogInfo("forced commitment", "commitmentID", commitment.ID())

return commitment, nil
}

func (m *Manager) ForceCommitUntil(commitUntilSlot iotago.SlotIndex) error {
if m.ForceCommitMode.Swap(true) {
return ierrors.New("force commitment already in progress")
}

m.LogInfo("force committing until", "slot", commitUntilSlot)

for i := m.storage.Settings().LatestCommitment().Slot() + 1; i <= commitUntilSlot; i++ {
Expand All @@ -147,6 +159,10 @@ func (m *Manager) ForceCommitUntil(commitUntilSlot iotago.SlotIndex) error {
}
}

m.LogInfo("successfully forced commitment until", "slot", commitUntilSlot)

m.ForceCommitMode.Store(false)

return nil
}

Expand Down Expand Up @@ -184,6 +200,7 @@ func (m *Manager) tryCommitSlotUntil(acceptedBlockIndex iotago.SlotIndex) {

if _, err := m.createCommitment(i); err != nil {
m.errorHandler(ierrors.Wrapf(err, "failed to create commitment for slot %d", i))

return
}
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/testsuite/mock/blockissuer_acceptance_loss.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ func (i *BlockIssuer) reviveChain(issuingTime time.Time, node *Node) (*iotago.Co

issuingSlot := apiForSlot.TimeProvider().SlotFromTime(issuingTime)

// If the chain manager is aware of a commitments on the main chain, then do not force commit.
// The node should wait to warpsync those slots and use those commitments to avoid potentially creating a diverging commitment.
if issuingSlot > apiForSlot.ProtocolParameters().MinCommittableAge() &&
node.Protocol.Chains.Main.Get().LatestCommitment.Get().Slot() >= issuingSlot-apiForSlot.ProtocolParameters().MinCommittableAge() {
return nil, iotago.EmptyBlockID, ierrors.Errorf("chain manager is aware of a newer commitment, slot: %d, minCommittableAge: %d", issuingSlot-apiForSlot.ProtocolParameters().MinCommittableAge(), apiForSlot.ProtocolParameters().MinCommittableAge())
}

// Force commitments until minCommittableAge relative to the block's issuing time. We basically "pretend" that
// this block was already accepted at the time of issuing so that we have a commitment to reference.
if issuingSlot < apiForSlot.ProtocolParameters().MinCommittableAge() { // Should never happen as we're beyond maxCommittableAge which is > minCommittableAge.
Expand Down
Loading

0 comments on commit 7426466

Please sign in to comment.