Skip to content

Commit

Permalink
Merge pull request #531 from iotaledger/feat/engine-switching-committ…
Browse files Browse the repository at this point in the history
…ee-rotation-test

Engine switching with committee rotation
  • Loading branch information
jonastheis authored Nov 29, 2023
2 parents 26441e4 + f491489 commit 6a455be
Show file tree
Hide file tree
Showing 25 changed files with 598 additions and 219 deletions.
22 changes: 14 additions & 8 deletions pkg/protocol/block_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (b *BlockDispatcher) processWarpSyncResponse(commitmentID iotago.Commitment
}
targetEngine.Reset()

// Once all blocks are booked we
// Once all blocks are booked and their weight propagated we
// 1. Mark all transactions as accepted
// 2. Mark all blocks as accepted
// 3. Force commitment of the slot
Expand All @@ -298,6 +298,17 @@ func (b *BlockDispatcher) processWarpSyncResponse(commitmentID iotago.Commitment
b.protocol.HandleError(ierrors.Errorf("producedCommitment ID mismatch: %s != %s", producedCommitment.ID(), commitmentID))
return
}

// 5. We add all blocks as root blocks. We can only do it after the commitment of the slot because otherwise
// confirmation of the blocks can't be properly propagated (as it skips propagation to root blocks).
for slotCommitmentID, blockIDsForCommitment := range blockIDsBySlotCommitmentID {
for _, blockID := range blockIDsForCommitment {
// We need to make sure that we add all blocks as root blocks because we don't know which blocks are root blocks without
// blocks from future slots. We're committing the current slot which then leads to the eviction of the blocks from the
// block cache and thus if not root blocks no block in the next slot can become solid.
targetEngine.EvictionState.AddRootBlock(blockID, slotCommitmentID)
}
}
}

blockBookedFunc := func(_ bool, _ bool) {
Expand Down Expand Up @@ -336,20 +347,15 @@ func (b *BlockDispatcher) processWarpSyncResponse(commitmentID iotago.Commitment
return nil
}

for slotCommitmentID, blockIDsForCommitment := range blockIDsBySlotCommitmentID {
for _, blockIDsForCommitment := range blockIDsBySlotCommitmentID {
for _, blockID := range blockIDsForCommitment {
block, _ := targetEngine.BlockDAG.GetOrRequestBlock(blockID)
if block == nil { // this should never happen as we're requesting the blocks for this slot so it can't be evicted.
b.protocol.HandleError(ierrors.Errorf("failed to get block %s", blockID))
continue
}

// We need to make sure that we add all blocks as root blocks because we don't know which blocks are root blocks without
// blocks from future slots. We're committing the current slot which then leads to the eviction of the blocks from the
// block cache and thus if not root blocks no block in the next slot can become solid.
targetEngine.EvictionState.AddRootBlock(blockID, slotCommitmentID)

block.Booked().OnUpdate(blockBookedFunc)
block.WeightPropagated().OnUpdate(blockBookedFunc)
}
}

Expand Down
81 changes: 62 additions & 19 deletions pkg/protocol/commitment_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"github.com/iotaledger/hive.go/ds"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore/mapdb"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/iota-core/pkg/model"
"github.com/iotaledger/iota-core/pkg/protocol/engine"
"github.com/iotaledger/iota-core/pkg/protocol/engine/accounts"
Expand All @@ -14,27 +13,45 @@ import (
)

type CommitmentVerifier struct {
engine *engine.Engine
cumulativeWeight uint64
validatorAccountsAtFork map[iotago.AccountID]*accounts.AccountData
engine *engine.Engine
lastCommonSlotBeforeFork iotago.SlotIndex

// cumulativeWeight is the cumulative weight of the verified commitments. It is updated after each verification.
cumulativeWeight uint64

// epoch is the epoch of the currently verified commitment. Initially, it is set to the epoch of the last common commitment before the fork.
epoch iotago.EpochIndex

// validatorAccountsData is the accounts data of the validators for the current epoch as known at lastCommonSlotBeforeFork.
// Initially, it is set to the accounts data of the validators for the epoch of the last common commitment before the fork.
validatorAccountsData map[iotago.AccountID]*accounts.AccountData
}

func NewCommitmentVerifier(mainEngine *engine.Engine, lastCommonCommitmentBeforeFork *model.Commitment) (*CommitmentVerifier, error) {
committeeAtForkingPoint, exists := mainEngine.SybilProtection.SeatManager().CommitteeInSlot(lastCommonCommitmentBeforeFork.Slot())
apiForSlot := mainEngine.APIForSlot(lastCommonCommitmentBeforeFork.Slot())
epoch := apiForSlot.TimeProvider().EpochFromSlot(lastCommonCommitmentBeforeFork.Slot())

committeeAtForkingPoint, exists := mainEngine.SybilProtection.SeatManager().CommitteeInEpoch(epoch)
if !exists {
return nil, ierrors.Errorf("committee in slot %d does not exist", lastCommonCommitmentBeforeFork.Slot())
return nil, ierrors.Errorf("committee in epoch %d of last commonCommitment slot %d before fork does not exist", epoch, lastCommonCommitmentBeforeFork.Slot())
}

accountsAtForkingPoint, err := committeeAtForkingPoint.Accounts()
if err != nil {
return nil, ierrors.Wrapf(err, "failed to get accounts from committee for slot %d", lastCommonCommitmentBeforeFork.Slot())
}

validatorAccountsDataAtForkingPoint, err := mainEngine.Ledger.PastAccounts(accountsAtForkingPoint.IDs(), lastCommonCommitmentBeforeFork.Slot())
if err != nil {
return nil, ierrors.Wrapf(err, "failed to get past accounts for slot %d", lastCommonCommitmentBeforeFork.Slot())
}

return &CommitmentVerifier{
engine: mainEngine,
cumulativeWeight: lastCommonCommitmentBeforeFork.CumulativeWeight(),
validatorAccountsAtFork: lo.PanicOnErr(mainEngine.Ledger.PastAccounts(accountsAtForkingPoint.IDs(), lastCommonCommitmentBeforeFork.Slot())),
// TODO: what happens if the committee rotated after the fork?
engine: mainEngine,
cumulativeWeight: lastCommonCommitmentBeforeFork.CumulativeWeight(),
lastCommonSlotBeforeFork: lastCommonCommitmentBeforeFork.Slot(),
epoch: epoch,
validatorAccountsData: validatorAccountsDataAtForkingPoint,
}, nil
}

Expand All @@ -58,27 +75,49 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *model.Commitment, atte
return nil, 0, ierrors.Errorf("invalid merkle proof for attestations for commitment %s", commitment.ID())
}

// 2. Verify attestations.
// 2. Update validatorAccountsData if fork happened across epoch boundaries.
// We try to use the latest accounts data (at lastCommonSlotBeforeFork) for the current epoch.
// This is necessary because the committee might have rotated at the epoch boundary and different validators might be part of it.
// In case anything goes wrong we keep using previously known accounts data (initially set to the accounts data
// of the validators for the epoch of the last common commitment before the fork).
apiForSlot := c.engine.APIForSlot(commitment.Slot())
commitmentEpoch := apiForSlot.TimeProvider().EpochFromSlot(commitment.Slot())
if commitmentEpoch > c.epoch {
c.epoch = commitmentEpoch

committee, exists := c.engine.SybilProtection.SeatManager().CommitteeInEpoch(commitmentEpoch)
if exists {
validatorAccounts, err := committee.Accounts()
if err == nil {
validatorAccountsData, err := c.engine.Ledger.PastAccounts(validatorAccounts.IDs(), c.lastCommonSlotBeforeFork)
if err == nil {
c.validatorAccountsData = validatorAccountsData
}
}
}
}

// 3. Verify attestations.
blockIDs, seatCount, err := c.verifyAttestations(attestations)
if err != nil {
return nil, 0, ierrors.Wrapf(err, "error validating attestations for commitment %s", commitment.ID())
}

// 3. Verify that calculated cumulative weight from attestations is lower or equal to cumulative weight of commitment.
// 4. Verify that calculated cumulative weight from attestations is lower or equal to cumulative weight of commitment.
// This is necessary due to public key changes of validators in the window of forking point and the current state of
// the other chain (as validators could have added/removed public keys that we don't know about yet).
//
// 1. The weight should be equal if all public keys are known and unchanged.
// a) The weight should be equal if all public keys are known and unchanged.
//
// 2. A public key is added to an account.
// b) A public key is added to an account.
// We do not count a seat for the issuer for this slot and the computed CW will be lower than the CW in
// the commitment. This is fine, since this is a rare occasion and a heavier chain will become heavier anyway, eventually.
// It will simply take a bit longer to accumulate enough CW so that the chain-switch rule kicks in.
// Note: In an extreme scenario where all validators add and use a new public key, the chain will never become heavier.
// This can only be prevented by adding such key changes provably to the commitments so that these changes
// can be reconstructed and verified by nodes that do not have the latest ledger state.
//
// 3. A public key is removed from an account.
// c) A public key is removed from an account.
// We count the seat for the issuer for this slot even though we shouldn't have. According to the protocol, a valid
// chain with such a block can never exist because the block itself (here provided as an attestation) would be invalid.
// However, we do not know about this yet since we do not have the latest ledger state.
Expand All @@ -105,11 +144,13 @@ func (c *CommitmentVerifier) verifyAttestations(attestations []*iotago.Attestati
// 1. The attestation might be fake.
// 2. The issuer might have added a new public key in the meantime, but we don't know about it yet
// since we only have the ledger state at the forking point.
accountData, exists := c.validatorAccountsAtFork[att.Header.IssuerID]
accountData, exists := c.validatorAccountsData[att.Header.IssuerID]

// We always need to have the accountData for a validator.
// We don't know the account data of the issuer. Ignore.
// This could be due to committee rotation at epoch boundary where a new validator (unknown at forking point)
// is selected into the committee.
if !exists {
return nil, 0, ierrors.Errorf("accountData for issuerID %s does not exist", att.Header.IssuerID)
continue
}

switch signature := att.Signature.(type) {
Expand Down Expand Up @@ -137,12 +178,14 @@ func (c *CommitmentVerifier) verifyAttestations(attestations []*iotago.Attestati
return nil, 0, ierrors.Errorf("issuerID %s contained in multiple attestations", att.Header.IssuerID)
}

// TODO: this might differ if we have a Accounts with changing weights depending on the Slot/epoch
attestationBlockID, err := att.BlockID()
if err != nil {
return nil, 0, ierrors.Wrap(err, "error calculating blockID from attestation")
}

// We need to make sure that the issuer is actually part of the committee for the slot of the attestation (issuance of the block).
// Note: here we're explicitly not using the slot of the commitment we're verifying, but the slot of the attestation.
// This is because at the time the attestation was created, the committee might have been different from the one at commitment time (due to rotation at epoch boundary).
committee, exists := c.engine.SybilProtection.SeatManager().CommitteeInSlot(attestationBlockID.Slot())
if !exists {
return nil, 0, ierrors.Errorf("committee for slot %d does not exist", attestationBlockID.Slot())
Expand Down
20 changes: 13 additions & 7 deletions pkg/protocol/engine/attestation/slotattestation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,20 @@ func (m *Manager) Commit(slot iotago.SlotIndex) (newCW uint64, attestationsRoot
return 0, iotago.Identifier{}, ierrors.Wrapf(err, "failed to get attestation storage when committing slot %d", slot)
}

// Add all attestations to the tree and calculate the new cumulative weight.
committee, exists := m.committeeFunc(slot)
if !exists {
return 0, iotago.Identifier{}, ierrors.Wrapf(err, "failed to get committee when committing slot %d", slot)
}

for _, a := range attestations {
// TODO: which weight are we using here? The current one? Or the one of the slot of the attestation/commitmentID?
blockID, err := a.BlockID()
if err != nil {
return 0, iotago.Identifier{}, ierrors.Wrapf(err, "failed to get blockID of attestation %s", a.Header.IssuerID)
}

// We use the committee of the slot of the attestation in contrast to the slot that we're committing right now.
// This is because at the time the attestation was created, the committee might have been different from the current one (due to rotation at epoch boundary).
committee, exists := m.committeeFunc(blockID.Slot())
if !exists {
return 0, iotago.Identifier{}, ierrors.Wrapf(err, "failed to get committee when committing slot %d", slot)
}

// Add all attestations to the tree and calculate the new cumulative weight.
if _, exists := committee.GetSeat(a.Header.IssuerID); exists {
if err := tree.Set(a.Header.IssuerID, a); err != nil {
return 0, iotago.Identifier{}, ierrors.Wrapf(err, "failed to set attestation %s in tree", a.Header.IssuerID)
Expand Down
32 changes: 25 additions & 7 deletions pkg/protocol/engine/blocks/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Block struct {
preConfirmed bool
confirmationRatifiers ds.Set[account.SeatIndex]
confirmed bool
weightPropagated reactive.Variable[bool]

// Scheduler block
scheduled bool
Expand Down Expand Up @@ -86,6 +87,7 @@ func NewBlock(data *model.Block) *Block {
invalid: reactive.NewVariable[bool](),
booked: reactive.NewVariable[bool](),
accepted: reactive.NewVariable[bool](),
weightPropagated: reactive.NewVariable[bool](),
notarized: reactive.NewVariable[bool](),
workScore: data.WorkScore(),
}
Expand All @@ -104,18 +106,20 @@ func NewRootBlock(blockID iotago.BlockID, commitmentID iotago.CommitmentID, issu
commitmentID: commitmentID,
issuingTime: issuingTime,
},
solid: reactive.NewVariable[bool](),
invalid: reactive.NewVariable[bool](),
booked: reactive.NewVariable[bool](),
preAccepted: true,
accepted: reactive.NewVariable[bool](),
notarized: reactive.NewVariable[bool](),
scheduled: true,
solid: reactive.NewVariable[bool](),
invalid: reactive.NewVariable[bool](),
booked: reactive.NewVariable[bool](),
preAccepted: true,
accepted: reactive.NewVariable[bool](),
weightPropagated: reactive.NewVariable[bool](),
notarized: reactive.NewVariable[bool](),
scheduled: true,
}

// This should be true since we commit and evict on acceptance.
b.solid.Set(true)
b.booked.Set(true)
b.weightPropagated.Set(true)
b.notarized.Set(true)
b.accepted.Set(true)

Expand All @@ -135,6 +139,7 @@ func NewMissingBlock(blockID iotago.BlockID) *Block {
invalid: reactive.NewVariable[bool](),
booked: reactive.NewVariable[bool](),
accepted: reactive.NewVariable[bool](),
weightPropagated: reactive.NewVariable[bool](),
notarized: reactive.NewVariable[bool](),
}
}
Expand Down Expand Up @@ -605,6 +610,18 @@ func (b *Block) SetPreConfirmed() (wasUpdated bool) {
return wasUpdated
}

func (b *Block) WeightPropagated() reactive.Variable[bool] {
return b.weightPropagated
}

func (b *Block) IsWeightPropagated() bool {
return b.weightPropagated.Get()
}

func (b *Block) SetWeightPropagated() (wasUpdated bool) {
return !b.weightPropagated.Set(true)
}

func (b *Block) Notarized() reactive.Variable[bool] {
return b.notarized
}
Expand Down Expand Up @@ -633,6 +650,7 @@ func (b *Block) String() string {
builder.AddField(stringify.NewStructField("PreConfirmed", b.preConfirmed))
builder.AddField(stringify.NewStructField("ConfirmationRatifiers", b.confirmationRatifiers.String()))
builder.AddField(stringify.NewStructField("Confirmed", b.confirmed))
builder.AddField(stringify.NewStructField("WeightPropagated", b.weightPropagated.Get()))
builder.AddField(stringify.NewStructField("Scheduled", b.scheduled))
builder.AddField(stringify.NewStructField("Dropped", b.dropped))
builder.AddField(stringify.NewStructField("Skipped", b.skipped))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ func (g *Gadget) trackConfirmationRatifierWeight(votingBlock *blocks.Block) {
return false
}

// Skip propagation if the block is already accepted.
// Skip propagation if the block is already confirmed.
if block.IsConfirmed() {
return false
}

// Skip further propagation if the witness is not new.
// Skip further propagation if the ratifier is not new.
propagateFurther := block.AddConfirmationRatifier(seat)

if g.shouldConfirm(block) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
)

func (g *Gadget) TrackWitnessWeight(votingBlock *blocks.Block) {
defer votingBlock.SetWeightPropagated()

// Only track witness weight for issuers that are part of the committee.
seat, isValid := g.isCommitteeValidationBlock(votingBlock)
if !isValid {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ import (
"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/hive.go/runtime/event"
"github.com/iotaledger/hive.go/runtime/module"
"github.com/iotaledger/hive.go/runtime/options"
"github.com/iotaledger/hive.go/runtime/syncutils"
"github.com/iotaledger/hive.go/runtime/workerpool"
"github.com/iotaledger/iota-core/pkg/protocol/engine"
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"
"github.com/iotaledger/iota-core/pkg/protocol/engine/consensus/slotgadget"
Expand All @@ -19,8 +17,7 @@ import (
)

type Gadget struct {
events *slotgadget.Events
workers *workerpool.Group
events *slotgadget.Events

// Keep track of votes on slots (from commitments) per slot of blocks. I.e. a slot can only be finalized if
// optsSlotFinalizationThreshold is reached within a slot.
Expand Down Expand Up @@ -49,13 +46,12 @@ func NewProvider(opts ...options.Option[Gadget]) module.Provider[*engine.Engine,
g.slotTrackers = shrinkingmap.New[iotago.SlotIndex, *slottracker.SlotTracker]()

e.Events.SlotGadget.LinkTo(g.events)
g.workers = e.Workers.CreateGroup("SlotGadget")

e.HookConstructed(func() {
g.seatManager = e.SybilProtection.SeatManager()
g.TriggerConstructed()

e.Events.BlockGadget.BlockConfirmed.Hook(g.trackVotes, event.WithWorkerPool(g.workers.CreatePool("TrackAndRefresh", workerpool.WithWorkerCount(1)))) // Using just 1 worker to avoid contention
e.Events.BlockGadget.BlockConfirmed.Hook(g.trackVotes)
})

g.storeLastFinalizedSlotFunc = func(slot iotago.SlotIndex) {
Expand Down Expand Up @@ -87,7 +83,6 @@ func (g *Gadget) Reset() {

func (g *Gadget) Shutdown() {
g.TriggerStopped()
g.workers.Shutdown()
}

func (g *Gadget) setLastFinalizedSlot(i iotago.SlotIndex) {
Expand Down Expand Up @@ -120,7 +115,6 @@ func (g *Gadget) trackVotes(block *blocks.Block) {
}

func (g *Gadget) refreshSlotFinalization(tracker *slottracker.SlotTracker, previousLatestSlotIndex iotago.SlotIndex, newLatestSlotIndex iotago.SlotIndex) (finalizedSlots []iotago.SlotIndex) {

for i := lo.Max(g.lastFinalizedSlot, previousLatestSlotIndex) + 1; i <= newLatestSlotIndex; i++ {
committeeTotalSeats := g.seatManager.SeatCountInSlot(i)
attestorsTotalSeats := len(tracker.Voters(i))
Expand Down
Loading

0 comments on commit 6a455be

Please sign in to comment.