Skip to content

Commit

Permalink
Remove intermediate eviction state cache
Browse files Browse the repository at this point in the history
  • Loading branch information
karimodm committed Nov 6, 2023
1 parent 2394969 commit eb75aff
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 69 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
go.uber.org/atomic v1.11.0
go.uber.org/dig v1.17.1
golang.org/x/crypto v0.14.0
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
)
Expand Down Expand Up @@ -169,7 +170,6 @@ require (
go.uber.org/mock v0.3.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/image v0.13.0 // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.17.0 // indirect
Expand Down
1 change: 0 additions & 1 deletion pkg/protocol/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func New(
} else {
// Restore from Disk
e.Storage.RestoreFromDisk()
e.EvictionState.PopulateFromStorage(e.Storage.Settings().LatestCommitment().Slot())

if err := e.Attestations.RestoreFromDisk(); err != nil {
panic(ierrors.Wrap(err, "failed to restore attestations from disk"))
Expand Down
114 changes: 48 additions & 66 deletions pkg/protocol/engine/eviction/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package eviction
import (
"io"

"github.com/iotaledger/hive.go/core/memstorage"
"github.com/iotaledger/hive.go/ds/ringbuffer"
"github.com/iotaledger/hive.go/core/safemath"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/lo"
Expand All @@ -21,8 +20,6 @@ const latestNonEmptySlotKey = 1
type State struct {
Events *Events

rootBlocks *memstorage.IndexedStorage[iotago.SlotIndex, iotago.BlockID, iotago.CommitmentID]
latestRootBlocks *ringbuffer.RingBuffer[iotago.BlockID]
rootBlockStorageFunc func(iotago.SlotIndex) (*slotstore.Store[iotago.BlockID, iotago.CommitmentID], error)
lastEvictedSlot iotago.SlotIndex
latestNonEmptyStore kvstore.KVStore
Expand All @@ -37,8 +34,6 @@ type State struct {
func NewState(latestNonEmptyStore kvstore.KVStore, rootBlockStorageFunc func(iotago.SlotIndex) (*slotstore.Store[iotago.BlockID, iotago.CommitmentID], error), genesisRootBlockFunc func() iotago.BlockID, opts ...options.Option[State]) (state *State) {
return options.Apply(&State{
Events: NewEvents(),
rootBlocks: memstorage.NewIndexedStorage[iotago.SlotIndex, iotago.BlockID, iotago.CommitmentID](),
latestRootBlocks: ringbuffer.NewRingBuffer[iotago.BlockID](8),
rootBlockStorageFunc: rootBlockStorageFunc,
latestNonEmptyStore: latestNonEmptyStore,
genesisRootBlockFunc: genesisRootBlockFunc,
Expand All @@ -57,8 +52,12 @@ func (s *State) AdvanceActiveWindowToIndex(slot iotago.SlotIndex) {

if delayedIndex, shouldEvictRootBlocks := s.delayedBlockEvictionThreshold(slot); shouldEvictRootBlocks {
// Remember the last slot outside our cache window that has root blocks.
if evictedSlot := s.rootBlocks.Evict(delayedIndex); evictedSlot != nil && evictedSlot.Size() > 0 {
s.setLatestNonEmptySlot(delayedIndex)
if storage, err := s.rootBlockStorageFunc(delayedIndex); err != nil {
storage.StreamKeys(func(_ iotago.BlockID) error {

Check failure on line 56 in pkg/protocol/engine/eviction/state.go

View workflow job for this annotation

GitHub Actions / golangci

[golangci] pkg/protocol/engine/eviction/state.go#L56

Error return value of `storage.StreamKeys` is not checked (errcheck)
Raw output
pkg/protocol/engine/eviction/state.go:56:22: Error return value of `storage.StreamKeys` is not checked (errcheck)
			storage.StreamKeys(func(_ iotago.BlockID) error {
			                  ^
s.setLatestNonEmptySlot(delayedIndex)

return ierrors.New("stop iteration")
})
}
}

Expand Down Expand Up @@ -89,15 +88,17 @@ func (s *State) ActiveRootBlocks() map[iotago.BlockID]iotago.CommitmentID {
activeRootBlocks := make(map[iotago.BlockID]iotago.CommitmentID)
startSlot, endSlot := s.activeIndexRange()
for slot := startSlot; slot <= endSlot; slot++ {
storage := s.rootBlocks.Get(slot)
if storage == nil {
// We assume the cache is always populated for the latest slots.
storage, err := s.rootBlockStorageFunc(slot)
// Slot too old, it was pruned.
if err != nil {
continue
}

storage.ForEach(func(id iotago.BlockID, commitmentID iotago.CommitmentID) bool {
storage.Stream(func(id iotago.BlockID, commitmentID iotago.CommitmentID) error {

Check failure on line 98 in pkg/protocol/engine/eviction/state.go

View workflow job for this annotation

GitHub Actions / golangci

[golangci] pkg/protocol/engine/eviction/state.go#L98

Error return value of `storage.Stream` is not checked (errcheck)
Raw output
pkg/protocol/engine/eviction/state.go:98:17: Error return value of `storage.Stream` is not checked (errcheck)
		storage.Stream(func(id iotago.BlockID, commitmentID iotago.CommitmentID) error {
		              ^
activeRootBlocks[id] = commitmentID

return true
return nil
})
}

Expand All @@ -114,24 +115,18 @@ func (s *State) AddRootBlock(id iotago.BlockID, commitmentID iotago.CommitmentID
return
}

if s.rootBlocks.Get(id.Slot(), true).Set(id, commitmentID) {
if err := lo.PanicOnErr(s.rootBlockStorageFunc(id.Slot())).Store(id, commitmentID); err != nil {
panic(ierrors.Wrapf(err, "failed to store root block %s", id))
}
if err := lo.PanicOnErr(s.rootBlockStorageFunc(id.Slot())).Store(id, commitmentID); err != nil {
panic(ierrors.Wrapf(err, "failed to store root block %s", id))
}

s.latestRootBlocks.Add(id)
}

// RemoveRootBlock removes a solid entry points from the map.
func (s *State) RemoveRootBlock(id iotago.BlockID) {
s.evictionMutex.RLock()
defer s.evictionMutex.RUnlock()

if rootBlocks := s.rootBlocks.Get(id.Slot()); rootBlocks != nil && rootBlocks.Delete(id) {
if err := lo.PanicOnErr(s.rootBlockStorageFunc(id.Slot())).Delete(id); err != nil {
panic(err)
}
if err := lo.PanicOnErr(s.rootBlockStorageFunc(id.Slot())).Delete(id); err != nil {
panic(err)
}
}

Expand All @@ -144,9 +139,12 @@ func (s *State) IsRootBlock(id iotago.BlockID) (has bool) {
return false
}

slotBlocks := s.rootBlocks.Get(id.Slot(), false)
storage, err := s.rootBlockStorageFunc(id.Slot())
if err != nil {
return false
}

return slotBlocks != nil && slotBlocks.Has(id)
return lo.PanicOnErr(storage.Has(id))
}

// RootBlockCommitmentID returns the commitmentID if it is a known root block.
Expand All @@ -158,22 +156,15 @@ func (s *State) RootBlockCommitmentID(id iotago.BlockID) (commitmentID iotago.Co
return iotago.CommitmentID{}, false
}

slotBlocks := s.rootBlocks.Get(id.Slot(), false)
if slotBlocks == nil {
return iotago.CommitmentID{}, false
storage, err := s.rootBlockStorageFunc(id.Slot())
if err != nil {
return iotago.EmptyCommitmentID, false
}

return slotBlocks.Get(id)
}

// LatestRootBlocks returns the latest root blocks.
func (s *State) LatestRootBlocks() iotago.BlockIDs {
rootBlocks := s.latestRootBlocks.ToSlice()
if len(rootBlocks) == 0 {
return iotago.BlockIDs{s.genesisRootBlockFunc()}
}
// This return empty value for commitmentID in the case the key is not found.
commitmentID = lo.PanicOnErr(storage.Load(id))

return rootBlocks
return commitmentID, commitmentID != iotago.EmptyCommitmentID
}

// Export exports the root blocks to the given writer.
Expand Down Expand Up @@ -235,7 +226,6 @@ func (s *State) Export(writer io.WriteSeeker, lowerTarget iotago.SlotIndex, targ
// Import imports the root blocks from the given reader.
func (s *State) Import(reader io.ReadSeeker) error {
if err := stream.ReadCollection(reader, func(i int) error {

blockIDBytes, err := stream.ReadBlob(reader)
if err != nil {
return ierrors.Wrapf(err, "failed to read root block id %d", i)
Expand All @@ -256,10 +246,8 @@ func (s *State) Import(reader io.ReadSeeker) error {
return ierrors.Wrapf(err, "failed to parse root block's %s commitment id", rootBlockID)
}

if s.rootBlocks.Get(rootBlockID.Slot(), true).Set(rootBlockID, commitmentID) {
if err := lo.PanicOnErr(s.rootBlockStorageFunc(rootBlockID.Slot())).Store(rootBlockID, commitmentID); err != nil {
panic(ierrors.Wrapf(err, "failed to store root block %s", rootBlockID))
}
if err := lo.PanicOnErr(s.rootBlockStorageFunc(rootBlockID.Slot())).Store(rootBlockID, commitmentID); err != nil {
panic(ierrors.Wrapf(err, "failed to store root block %s", rootBlockID))
}

return nil
Expand Down Expand Up @@ -312,22 +300,6 @@ func (s *State) Rollback(lowerTarget, targetIndex iotago.SlotIndex) error {
return nil
}

// PopulateFromStorage populates the root blocks from the storage.
func (s *State) PopulateFromStorage(latestCommitmentSlot iotago.SlotIndex) {
for slot := lo.Return1(s.delayedBlockEvictionThreshold(latestCommitmentSlot)); slot <= latestCommitmentSlot; slot++ {
storedRootBlocks, err := s.rootBlockStorageFunc(slot)
if err != nil {
continue
}

_ = storedRootBlocks.Stream(func(id iotago.BlockID, commitmentID iotago.CommitmentID) error {
s.AddRootBlock(id, commitmentID)

return nil
})
}
}

// latestNonEmptySlot returns the latest slot that contains a rootblock.
func (s *State) latestNonEmptySlot() iotago.SlotIndex {
latestNonEmptySlotBytes, err := s.latestNonEmptyStore.Get([]byte{latestNonEmptySlotKey})
Expand Down Expand Up @@ -381,17 +353,27 @@ func (s *State) delayedBlockEvictionThreshold(slot iotago.SlotIndex) (thresholdS
return genesisSlot, false
}

var rootBlockInWindow bool
// Check if we have root blocks up to the eviction point.
for ; slot >= s.lastEvictedSlot; slot-- {
if rb := s.rootBlocks.Get(slot); rb != nil {
if rb.Size() > 0 {
if slot >= s.optsRootBlocksEvictionDelay {
return slot - s.optsRootBlocksEvictionDelay, true
}

return genesisSlot, false
storage := lo.PanicOnErr(s.rootBlockStorageFunc(slot))

storage.StreamKeys(func(_ iotago.BlockID) error {

Check failure on line 361 in pkg/protocol/engine/eviction/state.go

View workflow job for this annotation

GitHub Actions / golangci

[golangci] pkg/protocol/engine/eviction/state.go#L361

Error return value of `storage.StreamKeys` is not checked (errcheck)
Raw output
pkg/protocol/engine/eviction/state.go:361:21: Error return value of `storage.StreamKeys` is not checked (errcheck)
		storage.StreamKeys(func(_ iotago.BlockID) error {
		                  ^
if thresholdSlot, _ = safemath.SafeSub(slot, s.optsRootBlocksEvictionDelay); thresholdSlot <= genesisSlot {
thresholdSlot = genesisSlot
shouldEvict = false
} else {
shouldEvict = true
}
}

rootBlockInWindow = true

return ierrors.New("stop iteration")
})
}

if rootBlockInWindow {
return thresholdSlot, shouldEvict
}

// If we didn't find any root blocks, we have to fallback to the latestNonEmptySlot before the eviction point.
Expand Down
4 changes: 3 additions & 1 deletion pkg/protocol/engine/tipselection/v1/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"math"
"time"

"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/hive.go/runtime/module"
"github.com/iotaledger/hive.go/runtime/options"
"github.com/iotaledger/iota-core/pkg/protocol/engine"
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"
"github.com/iotaledger/iota-core/pkg/protocol/engine/tipmanager"
"github.com/iotaledger/iota-core/pkg/protocol/engine/tipselection"
iotago "github.com/iotaledger/iota.go/v4"
)

// NewProvider creates a new TipSelection provider, that can be used to inject the component into an engine.
Expand All @@ -20,7 +22,7 @@ func NewProvider(opts ...options.Option[TipSelection]) module.Provider[*engine.E
e.HookConstructed(func() {
// wait for submodules to be constructed (so all of their properties are available)
module.OnAllConstructed(func() {
t.Construct(e.TipManager, e.Ledger.ConflictDAG(), e.Ledger.MemPool().TransactionMetadata, e.EvictionState.LatestRootBlocks, DynamicLivenessThreshold(e.SybilProtection.SeatManager().OnlineCommittee().Size))
t.Construct(e.TipManager, e.Ledger.ConflictDAG(), e.Ledger.MemPool().TransactionMetadata, func() iotago.BlockIDs { return lo.Keys(e.EvictionState.ActiveRootBlocks()) }, DynamicLivenessThreshold(e.SybilProtection.SeatManager().OnlineCommittee().Size))

e.Events.AcceptedBlockProcessed.Hook(func(block *blocks.Block) {
t.SetAcceptanceTime(block.IssuingTime())
Expand Down
12 changes: 12 additions & 0 deletions pkg/protocol/engine/tipselection/v1/tip_selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package tipselectionv1
import (
"time"

"golang.org/x/exp/slices"

"github.com/iotaledger/hive.go/ds"
"github.com/iotaledger/hive.go/ds/reactive"
"github.com/iotaledger/hive.go/ierrors"
Expand Down Expand Up @@ -120,6 +122,16 @@ func (t *TipSelection) SelectTips(amount int) (references model.ParentReferences
}, amount); len(references[iotago.StrongParentType]) == 0 {
rootBlocks := t.rootBlocks()

// Sort the rootBlocks in descending order according to their slot.
slices.SortFunc(rootBlocks, func(i, j iotago.BlockID) int {
if i.Slot() == j.Slot() {
return 0
} else if i.Slot() < j.Slot() {
return 1
}
return -1

Check failure on line 132 in pkg/protocol/engine/tipselection/v1/tip_selection.go

View workflow job for this annotation

GitHub Actions / golangci

[golangci] pkg/protocol/engine/tipselection/v1/tip_selection.go#L132

return with no blank line before (nlreturn)
Raw output
pkg/protocol/engine/tipselection/v1/tip_selection.go:132:5: return with no blank line before (nlreturn)
				return -1
				^
})

references[iotago.StrongParentType] = rootBlocks[:lo.Min(len(rootBlocks), t.optMaxStrongParents)]
}

Expand Down

0 comments on commit eb75aff

Please sign in to comment.