Skip to content

Commit

Permalink
Merge pull request #408 from iotaledger/feat/top-stakers-seatmanager
Browse files Browse the repository at this point in the history
Implement candidate registration and TopStakers seat manager.
  • Loading branch information
piotrm50 authored Oct 25, 2023
2 parents cdd2e14 + 4591f73 commit 0329a5e
Show file tree
Hide file tree
Showing 46 changed files with 1,605 additions and 359 deletions.
9 changes: 7 additions & 2 deletions components/debugapi/node.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package debugapi

import (
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/iota-core/pkg/core/account"
iotago "github.com/iotaledger/iota.go/v4"
Expand All @@ -10,8 +11,12 @@ import (
func validatorsSummary() (*ValidatorsSummaryResponse, error) {
seatManager := deps.Protocol.MainEngineInstance().SybilProtection.SeatManager()
latestSlotIndex := deps.Protocol.MainEngineInstance().Storage.Settings().LatestCommitment().Slot()
latestCommittee := seatManager.Committee(latestSlotIndex)
validatorSeats := []*Validator{}
latestCommittee, exists := seatManager.CommitteeInSlot(latestSlotIndex)
if !exists {
return nil, ierrors.Errorf("committee for slot %d was not selected", latestSlotIndex)
}

var validatorSeats []*Validator
latestCommittee.Accounts().ForEach(func(id iotago.AccountID, pool *account.Pool) bool {
validatorSeats = append(validatorSeats, &Validator{
AccountID: id,
Expand Down
14 changes: 12 additions & 2 deletions components/restapi/core/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ func validatorByAccountID(c echo.Context) (*apimodels.ValidatorResponse, error)
return nil, ierrors.Errorf("account not found: %s for latest committedSlot %d", accountID.ToHex(), latestCommittedSlot)
}
nextEpoch := deps.Protocol.APIForSlot(latestCommittedSlot).TimeProvider().EpochFromSlot(latestCommittedSlot) + 1
active := deps.Protocol.MainEngineInstance().SybilProtection.IsCandidateActive(accountID, nextEpoch)

active, err := deps.Protocol.MainEngineInstance().SybilProtection.IsCandidateActive(accountID, nextEpoch)
if err != nil {
return nil, ierrors.Wrapf(err, "failed to check if account %s is an active candidate", accountID.ToHex())
}

return &apimodels.ValidatorResponse{
AccountID: accountID,
Expand Down Expand Up @@ -198,7 +202,13 @@ func selectedCommittee(c echo.Context) *apimodels.CommitteeResponse {
slot = timeProvider.EpochEnd(epoch)
}

seatedAccounts := deps.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(slot)
seatedAccounts, exists := deps.Protocol.MainEngineInstance().SybilProtection.SeatManager().CommitteeInSlot(slot)
if !exists {
return &apimodels.CommitteeResponse{
Epoch: epoch,
}
}

committee := make([]*apimodels.CommitteeMemberResponse, 0, seatedAccounts.Accounts().Size())
seatedAccounts.Accounts().ForEach(func(accountID iotago.AccountID, seat *account.Pool) bool {
committee = append(committee, &apimodels.CommitteeMemberResponse{
Expand Down
10 changes: 9 additions & 1 deletion components/validator/issuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,15 @@ func issueValidatorBlock(ctx context.Context) {
return
}

if !engineInstance.SybilProtection.SeatManager().Committee(deps.Protocol.CommittedAPI().TimeProvider().SlotFromTime(blockIssuingTime)).HasAccount(validatorAccount.ID()) {
blockSlot := deps.Protocol.CommittedAPI().TimeProvider().SlotFromTime(blockIssuingTime)
committee, exists := engineInstance.SybilProtection.SeatManager().CommitteeInSlot(blockSlot)
if !exists {
Component.LogWarnf("committee for slot %d not selected: %s", blockSlot, err.Error())

return
}

if !committee.HasAccount(validatorAccount.ID()) {
// update nextBroadcast value here, so that this updated value is used in the `defer`
// callback to schedule issuing of the next block at a different interval than for committee members
nextBroadcast = blockIssuingTime.Add(ParamsValidator.CandidateBroadcastInterval)
Expand Down
19 changes: 14 additions & 5 deletions pkg/protocol/commitment_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ type CommitmentVerifier struct {
validatorAccountsAtFork map[iotago.AccountID]*accounts.AccountData
}

func NewCommitmentVerifier(mainEngine *engine.Engine, lastCommonCommitmentBeforeFork *model.Commitment) *CommitmentVerifier {
committeeAtForkingPoint := mainEngine.SybilProtection.SeatManager().Committee(lastCommonCommitmentBeforeFork.Slot()).Accounts().IDs()
func NewCommitmentVerifier(mainEngine *engine.Engine, lastCommonCommitmentBeforeFork *model.Commitment) (*CommitmentVerifier, error) {
committeeAtForkingPoint, exists := mainEngine.SybilProtection.SeatManager().CommitteeInSlot(lastCommonCommitmentBeforeFork.Slot())
if !exists {
return nil, ierrors.Errorf("committee in slot %d does not exist", lastCommonCommitmentBeforeFork.Slot())
}

return &CommitmentVerifier{
engine: mainEngine,
cumulativeWeight: lastCommonCommitmentBeforeFork.CumulativeWeight(),
validatorAccountsAtFork: lo.PanicOnErr(mainEngine.Ledger.PastAccounts(committeeAtForkingPoint, lastCommonCommitmentBeforeFork.Slot())),
validatorAccountsAtFork: lo.PanicOnErr(mainEngine.Ledger.PastAccounts(committeeAtForkingPoint.Accounts().IDs(), lastCommonCommitmentBeforeFork.Slot())),
// TODO: what happens if the committee rotated after the fork?
}
}, nil
}

func (c *CommitmentVerifier) verifyCommitment(commitment *model.Commitment, attestations []*iotago.Attestation, merkleProof *merklehasher.Proof[iotago.Identifier]) (blockIDsFromAttestations iotago.BlockIDs, cumulativeWeight uint64, err error) {
Expand Down Expand Up @@ -153,7 +156,13 @@ func (c *CommitmentVerifier) verifyAttestations(attestations []*iotago.Attestati
if err != nil {
return nil, 0, ierrors.Wrap(err, "error calculating blockID from attestation")
}
if _, seatExists := c.engine.SybilProtection.SeatManager().Committee(attestationBlockID.Slot()).GetSeat(att.IssuerID); seatExists {

committee, exists := c.engine.SybilProtection.SeatManager().CommitteeInSlot(attestationBlockID.Slot())
if !exists {
return nil, 0, ierrors.Errorf("committee for slot %d does not exist", attestationBlockID.Slot())
}

if _, seatExists := committee.GetSeat(att.IssuerID); seatExists {
seatCount++
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/protocol/engine/attestation/attestations.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Attestations interface {
// GetMap returns the attestations that are included in the commitment of the given slot as ads.Map.
// If attestationCommitmentOffset=3 and commitment is 10, then the returned attestations are blocks from 7 to 10 that commit to at least 7.
GetMap(index iotago.SlotIndex) (attestations ads.Map[iotago.Identifier, iotago.AccountID, *iotago.Attestation], err error)
AddAttestationFromValidationBlock(block *blocks.Block)
AddAttestationFromValidationBlock(block *blocks.Block) error
Commit(index iotago.SlotIndex) (newCW uint64, attestationsRoot iotago.Identifier, err error)

Import(reader io.ReadSeeker) (err error)
Expand Down
29 changes: 20 additions & 9 deletions pkg/protocol/engine/attestation/slotattestation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (
// - obtain and evict from it attestations that *commit to* lastCommittedSlot-attestationCommitmentOffset
// - committed attestations: retrieved at slot that we are committing, stored at slot lastCommittedSlot-attestationCommitmentOffset
type Manager struct {
committeeFunc func(slot iotago.SlotIndex) *account.SeatedAccounts
committeeFunc func(slot iotago.SlotIndex) (*account.SeatedAccounts, bool)

futureAttestations *memstorage.IndexedStorage[iotago.SlotIndex, iotago.AccountID, *iotago.Attestation]
pendingAttestations *memstorage.IndexedStorage[iotago.SlotIndex, iotago.AccountID, *iotago.Attestation]
Expand All @@ -73,7 +73,7 @@ func NewProvider() module.Provider[*engine.Engine, attestation.Attestations] {
latestCommitment.Slot(),
latestCommitment.CumulativeWeight(),
e.Storage.Attestations,
e.SybilProtection.SeatManager().Committee,
e.SybilProtection.SeatManager().CommitteeInSlot,
e,
)
})
Expand All @@ -83,7 +83,7 @@ func NewManager(
lastCommittedSlot iotago.SlotIndex,
lastCumulativeWeight uint64,
bucketedStorage func(slot iotago.SlotIndex) (kvstore.KVStore, error),
committeeFunc func(slot iotago.SlotIndex) *account.SeatedAccounts,
committeeFunc func(slot iotago.SlotIndex) (*account.SeatedAccounts, bool),
apiProvider iotago.APIProvider,
) *Manager {
m := &Manager{
Expand Down Expand Up @@ -145,23 +145,27 @@ func (m *Manager) GetMap(slot iotago.SlotIndex) (ads.Map[iotago.Identifier, iota
}

// AddAttestationFromValidationBlock adds an attestation from a block to the future attestations (beyond the attestation window).
func (m *Manager) AddAttestationFromValidationBlock(block *blocks.Block) {
func (m *Manager) AddAttestationFromValidationBlock(block *blocks.Block) error {
// Only track validator blocks.
if _, isValidationBlock := block.ValidationBlock(); !isValidationBlock {
return
return nil
}

committee, exists := m.committeeFunc(block.ID().Slot())
if !exists {
return ierrors.Errorf("committee for slot %d does not exist", block.ID().Slot())
}
// Only track attestations of active committee members.
if _, exists := m.committeeFunc(block.ID().Slot()).GetSeat(block.ProtocolBlock().IssuerID); !exists {
return
if _, exists := committee.GetSeat(block.ProtocolBlock().IssuerID); !exists {
return nil
}

m.commitmentMutex.RLock()
defer m.commitmentMutex.RUnlock()

// We only care about attestations that are newer than the last committed slot.
if block.ID().Slot() <= m.lastCommittedSlot {
return
return nil
}

newAttestation := iotago.NewAttestation(m.apiProvider.APIForSlot(block.ID().Slot()), block.ProtocolBlock())
Expand All @@ -179,6 +183,8 @@ func (m *Manager) AddAttestationFromValidationBlock(block *blocks.Block) {

return currentValue
})

return nil
}

func (m *Manager) applyToPendingAttestations(attestation *iotago.Attestation, cutoffSlot iotago.SlotIndex) {
Expand Down Expand Up @@ -254,9 +260,14 @@ func (m *Manager) Commit(slot iotago.SlotIndex) (newCW uint64, attestationsRoot
}

// Add all attestations to the tree and calculate the new cumulative weight.
committee, exists := m.committeeFunc(slot)
if !exists {
return 0, iotago.Identifier{}, ierrors.Wrapf(err, "failed to get committee when committing slot %d", slot)
}

for _, a := range attestations {
// TODO: which weight are we using here? The current one? Or the one of the slot of the attestation/commitmentID?
if _, exists := m.committeeFunc(slot).GetSeat(a.IssuerID); exists {
if _, exists := committee.GetSeat(a.IssuerID); exists {
if err := tree.Set(a.IssuerID, a); err != nil {
return 0, iotago.Identifier{}, ierrors.Wrapf(err, "failed to set attestation %s in tree", a.IssuerID)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ func NewTestFramework(test *testing.T) *TestFramework {
})), nil
}

committeeFunc := func(index iotago.SlotIndex) *account.SeatedAccounts {
committeeFunc := func(index iotago.SlotIndex) (*account.SeatedAccounts, bool) {
accounts := account.NewAccounts()
var members []iotago.AccountID
t.issuerByAlias.ForEach(func(alias string, issuer *issuer) bool {
accounts.Set(issuer.accountID, &account.Pool{}) // we don't care about pools with PoA
members = append(members, issuer.accountID)
return true
})
return accounts.SelectCommittee(members...)
return accounts.SelectCommittee(members...), true
}

t.testAPI = iotago.V3API(
Expand Down
12 changes: 9 additions & 3 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,18 @@ func NewProvider(opts ...options.Option[Scheduler]) module.Provider[*engine.Engi
})
e.Events.Notarization.LatestCommitmentUpdated.Hook(func(commitment *model.Commitment) {
// when the last slot of an epoch is committed, remove the queues of validators that are no longer in the committee.
if s.apiProvider.CommittedAPI().TimeProvider().SlotsBeforeNextEpoch(commitment.Slot()) == 0 {
if s.apiProvider.APIForSlot(commitment.Slot()).TimeProvider().SlotsBeforeNextEpoch(commitment.Slot()) == 0 {
s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()
committee, exists := s.seatManager.CommitteeInSlot(commitment.Slot() + 1)
if !exists {
s.errorHandler(ierrors.Errorf("committee does not exist in committed slot %d", commitment.Slot()+1))

return
}

s.validatorBuffer.buffer.ForEach(func(accountID iotago.AccountID, validatorQueue *ValidatorQueue) bool {
if !s.seatManager.Committee(commitment.Slot() + 1).HasAccount(accountID) {
if !committee.HasAccount(accountID) {
s.shutdownValidatorQueue(validatorQueue)
s.validatorBuffer.Delete(accountID)
}
Expand Down Expand Up @@ -615,7 +621,7 @@ func (s *Scheduler) updateDeficit(accountID iotago.AccountID, delta Deficit) err
func (s *Scheduler) incrementDeficit(issuerID iotago.AccountID, rounds Deficit, slot iotago.SlotIndex) error {
quantum, err := s.quantumFunc(issuerID, slot)
if err != nil {
return err
return ierrors.Wrap(err, "failed to retrieve quantum")
}

delta, err := safemath.SafeMul(quantum, rounds)
Expand Down
10 changes: 8 additions & 2 deletions pkg/protocol/engine/consensus/blockgadget/testframework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ import (
"github.com/stretchr/testify/require"

"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/kvstore/mapdb"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/iota-core/pkg/core/account"
"github.com/iotaledger/iota-core/pkg/model"
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"
"github.com/iotaledger/iota-core/pkg/protocol/engine/consensus/blockgadget"
"github.com/iotaledger/iota-core/pkg/protocol/engine/consensus/blockgadget/thresholdblockgadget"
"github.com/iotaledger/iota-core/pkg/protocol/engine/eviction"
"github.com/iotaledger/iota-core/pkg/protocol/sybilprotection/seatmanager/mock"
"github.com/iotaledger/iota-core/pkg/storage/prunable/epochstore"
"github.com/iotaledger/iota-core/pkg/storage/prunable/slotstore"
iotago "github.com/iotaledger/iota.go/v4"
"github.com/iotaledger/iota.go/v4/api"
Expand All @@ -40,7 +43,7 @@ func NewTestFramework(test *testing.T) *TestFramework {
T: test,
blocks: shrinkingmap.New[string, *blocks.Block](),

SeatManager: mock.NewManualPOA(),
SeatManager: mock.NewManualPOA(api.SingleVersionProvider(tpkg.TestAPI), epochstore.NewStore(kvstore.Realm{}, kvstore.Realm{}, mapdb.NewMapDB(), 0, (*account.Accounts).Bytes, account.AccountsFromBytes)),
}

evictionState := eviction.NewState(mapdb.NewMapDB(), func(slot iotago.SlotIndex) (*slotstore.Store[iotago.BlockID, iotago.CommitmentID], error) {
Expand All @@ -53,7 +56,10 @@ func NewTestFramework(test *testing.T) *TestFramework {
})

t.blockCache = blocks.New(evictionState, api.SingleVersionProvider(tpkg.TestAPI))
instance := thresholdblockgadget.New(t.blockCache, t.SeatManager)
instance := thresholdblockgadget.New(t.blockCache, t.SeatManager, func(err error) {
fmt.Printf(">> Gadget.Error: %s\n", err)
})

t.Events = instance.Events()
t.Instance = instance

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/iotaledger/hive.go/ds"
"github.com/iotaledger/hive.go/ds/walker"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/runtime/event"
"github.com/iotaledger/hive.go/runtime/module"
"github.com/iotaledger/hive.go/runtime/options"
Expand All @@ -22,8 +23,9 @@ import (
type Gadget struct {
events *blockgadget.Events

seatManager seatmanager.SeatManager
blockCache *blocks.Blocks
seatManager seatmanager.SeatManager
blockCache *blocks.Blocks
errorHandler func(error)

optsAcceptanceThreshold float64
optsConfirmationThreshold float64
Expand All @@ -34,7 +36,7 @@ type Gadget struct {

func NewProvider(opts ...options.Option[Gadget]) module.Provider[*engine.Engine, blockgadget.Gadget] {
return module.Provide(func(e *engine.Engine) blockgadget.Gadget {
g := New(e.BlockCache, e.SybilProtection.SeatManager(), opts...)
g := New(e.BlockCache, e.SybilProtection.SeatManager(), e.ErrorHandler("gadget"), opts...)

wp := e.Workers.CreatePool("ThresholdBlockGadget", workerpool.WithWorkerCount(1))
e.Events.Booker.BlockBooked.Hook(g.TrackWitnessWeight, event.WithWorkerPool(wp))
Expand All @@ -45,11 +47,12 @@ func NewProvider(opts ...options.Option[Gadget]) module.Provider[*engine.Engine,
})
}

func New(blockCache *blocks.Blocks, seatManager seatmanager.SeatManager, opts ...options.Option[Gadget]) *Gadget {
func New(blockCache *blocks.Blocks, seatManager seatmanager.SeatManager, errorHandler func(error), opts ...options.Option[Gadget]) *Gadget {
return options.Apply(&Gadget{
events: blockgadget.NewEvents(),
seatManager: seatManager,
blockCache: blockCache,
events: blockgadget.NewEvents(),
seatManager: seatManager,
blockCache: blockCache,
errorHandler: errorHandler,

optsAcceptanceThreshold: 0.67,
optsConfirmationThreshold: 0.67,
Expand Down Expand Up @@ -98,8 +101,15 @@ func (g *Gadget) isCommitteeValidationBlock(block *blocks.Block) (seat account.S
return 0, false
}

committee, exists := g.seatManager.CommitteeInSlot(block.ID().Slot())
if !exists {
g.errorHandler(ierrors.Errorf("committee for slot %d does not exist", block.ID().Slot()))

return 0, false
}

// Only accept blocks for issuers that are part of the committee.
return g.seatManager.Committee(block.ID().Slot()).GetSeat(block.ProtocolBlock().IssuerID)
return committee.GetSeat(block.ProtocolBlock().IssuerID)
}

func anyChildInSet(block *blocks.Block, set ds.Set[iotago.BlockID]) bool {
Expand Down
2 changes: 1 addition & 1 deletion pkg/protocol/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func (e *Engine) acceptanceHandler() {

e.Events.BlockGadget.BlockAccepted.Hook(func(block *blocks.Block) {
e.Ledger.TrackBlock(block)
e.SybilProtection.TrackValidationBlock(block)
e.SybilProtection.TrackBlock(block)
e.UpgradeOrchestrator.TrackValidationBlock(block)

e.Events.AcceptedBlockProcessed.Trigger(block)
Expand Down
17 changes: 14 additions & 3 deletions pkg/protocol/engine/filter/blockfilter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Filter struct {

optsMaxAllowedWallClockDrift time.Duration

committeeFunc func(iotago.SlotIndex) *account.SeatedAccounts
committeeFunc func(iotago.SlotIndex) (*account.SeatedAccounts, bool)

module.Module
}
Expand All @@ -42,7 +42,7 @@ func NewProvider(opts ...options.Option[Filter]) module.Provider[*engine.Engine,
e.HookConstructed(func() {
e.Events.Filter.LinkTo(f.events)
e.SybilProtection.HookInitialized(func() {
f.committeeFunc = e.SybilProtection.SeatManager().Committee
f.committeeFunc = e.SybilProtection.SeatManager().CommitteeInSlot
})
f.TriggerInitialized()
})
Expand Down Expand Up @@ -92,7 +92,18 @@ func (f *Filter) ProcessReceivedBlock(block *model.Block, source peer.ID) {

if _, isValidation := block.ValidationBlock(); isValidation {
blockSlot := block.ProtocolBlock().API.TimeProvider().SlotFromTime(block.ProtocolBlock().IssuingTime)
if !f.committeeFunc(blockSlot).HasAccount(block.ProtocolBlock().IssuerID) {
committee, exists := f.committeeFunc(blockSlot)
if !exists {
f.events.BlockPreFiltered.Trigger(&filter.BlockPreFilteredEvent{
Block: block,
Reason: ierrors.Wrapf(ErrValidatorNotInCommittee, "no committee for slot %d", blockSlot),
Source: source,
})

return
}

if !committee.HasAccount(block.ProtocolBlock().IssuerID) {
f.events.BlockPreFiltered.Trigger(&filter.BlockPreFilteredEvent{
Block: block,
Reason: ierrors.Wrapf(ErrValidatorNotInCommittee, "validation block issuer %s is not part of the committee for slot %d", block.ProtocolBlock().IssuerID, blockSlot),
Expand Down
Loading

0 comments on commit 0329a5e

Please sign in to comment.