Skip to content

Commit

Permalink
Merge branch 'feat/cache-reset' of github.com:iotaledger/iota-core in…
Browse files Browse the repository at this point in the history
…to feat/reactive-chainmanager
  • Loading branch information
hmoog committed Nov 7, 2023
2 parents c98bcda + f4d229a commit 38a0ac5
Show file tree
Hide file tree
Showing 16 changed files with 39 additions and 95 deletions.
3 changes: 3 additions & 0 deletions pkg/core/buffer/unsolid_commitment_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ func (u *UnsolidCommitmentBuffer[V]) GetValuesAndEvict(commitmentID iotago.Commi

// Reset resets the component to a clean state as if it was created at the last commitment.
func (u *UnsolidCommitmentBuffer[V]) Reset() {
u.mutex.Lock()
defer u.mutex.Unlock()

u.blockBuffers.Clear()
u.commitmentBuffer.Each(func(key iotago.CommitmentID, _ types.Empty) { u.commitmentBuffer.Remove(key) })
}
26 changes: 4 additions & 22 deletions pkg/protocol/engine/accounts/accountsledger/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,29 +357,11 @@ func (m *Manager) AddAccount(output *utxoledger.Output, blockIssuanceCredits iot

// Reset resets the component to a clean state as if it was created at the last commitment.
func (m *Manager) Reset() {
blockBurnsToDelete := make([]iotago.SlotIndex, 0)
m.blockBurns.ForEachKey(func(slot iotago.SlotIndex) bool {
if slot > m.latestCommittedSlot {
blockBurnsToDelete = append(blockBurnsToDelete, slot)
}

return true
})

versionSignalsToDelete := make([]iotago.SlotIndex, 0)
m.latestSupportedVersionSignals.ForEach(func(slot iotago.SlotIndex, _ *shrinkingmap.ShrinkingMap[iotago.AccountID, *model.SignaledBlock]) {
if slot > m.latestCommittedSlot {
versionSignalsToDelete = append(versionSignalsToDelete, slot)
}
})

for _, slot := range blockBurnsToDelete {
m.blockBurns.Delete(slot)
}
m.mutex.Lock()
defer m.mutex.Unlock()

for _, slot := range versionSignalsToDelete {
m.latestSupportedVersionSignals.Evict(slot)
}
m.blockBurns.Clear()
m.latestSupportedVersionSignals.Clear()
}

func (m *Manager) rollbackAccountTo(accountData *accounts.AccountData, targetSlot iotago.SlotIndex) (wasDestroyed bool, err error) {
Expand Down
14 changes: 4 additions & 10 deletions pkg/protocol/engine/attestation/slotattestation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,10 @@ func (m *Manager) Rollback(targetSlot iotago.SlotIndex) error {

// Reset resets the component to a clean state as if it was created at the last commitment.
func (m *Manager) Reset() {
futureAttestationsToClear := make([]iotago.SlotIndex, 0)
m.futureAttestations.ForEach(func(slot iotago.SlotIndex, _ *shrinkingmap.ShrinkingMap[iotago.AccountID, *iotago.Attestation]) {
if slot > m.lastCommittedSlot {
futureAttestationsToClear = append(futureAttestationsToClear, slot)
}
})
m.commitmentMutex.Lock()
defer m.commitmentMutex.Unlock()

m.futureAttestations.Clear()

pendingAttestationsToClear := make([]iotago.SlotIndex, 0)
m.pendingAttestations.ForEach(func(slot iotago.SlotIndex, _ *shrinkingmap.ShrinkingMap[iotago.AccountID, *iotago.Attestation]) {
Expand All @@ -336,10 +334,6 @@ func (m *Manager) Reset() {
}
})

for _, slot := range futureAttestationsToClear {
m.futureAttestations.Evict(slot)
}

for _, slot := range pendingAttestationsToClear {
m.pendingAttestations.Evict(slot)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/protocol/engine/booker/inmemorybooker/booker.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (b *Booker) Queue(block *blocks.Block) error {
}

// Reset resets the component to a clean state as if it was created at the last commitment.
func (b *Booker) Reset() {}
func (b *Booker) Reset() { /* nothing to reset but comply with interface */ }

func (b *Booker) Shutdown() {
b.TriggerStopped()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (c *CommitmentFilter) evaluateBlock(block *blocks.Block) {
}

// Reset resets the component to a clean state as if it was created at the last commitment.
func (c *CommitmentFilter) Reset() {}
func (c *CommitmentFilter) Reset() { /* nothing to reset but comply with interface */ }

func (c *CommitmentFilter) Shutdown() {
c.TriggerStopped()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,4 @@ func (s *Scheduler) AddBlock(block *blocks.Block) {
}

// Reset resets the component to a clean state as if it was created at the last commitment.
func (s *Scheduler) Reset() {}
func (s *Scheduler) Reset() { /* nothing to reset but comply with interface */ }
13 changes: 4 additions & 9 deletions pkg/protocol/engine/eviction/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,20 +325,15 @@ func (s *State) PopulateFromStorage(latestCommitmentSlot iotago.SlotIndex) {
}

func (s *State) Reset() {
rootBlocksToClear := make([]iotago.BlockID, 0)
blocksToPrune := make([]iotago.BlockID, 0)
s.rootBlocks.ForEach(func(slot iotago.SlotIndex, storage *shrinkingmap.ShrinkingMap[iotago.BlockID, iotago.CommitmentID]) {
if slot > s.lastEvictedSlot {
storage.ForEach(func(blockID iotago.BlockID, commitmentID iotago.CommitmentID) bool {
rootBlocksToClear = append(rootBlocksToClear, blockID)

return true
})
blocksToPrune = append(blocksToPrune, storage.Keys()...)
}
})

for _, blockID := range rootBlocksToClear {
s.RemoveRootBlock(blockID)
}
// we need to prune delayed because s.rootBlocks.ForEach and s.RemoveRootBlock both lock
lo.ForEach(blocksToPrune, s.RemoveRootBlock)
}

// latestNonEmptySlot returns the latest slot that contains a rootblock.
Expand Down
2 changes: 1 addition & 1 deletion pkg/protocol/engine/filter/blockfilter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (f *Filter) ProcessReceivedBlock(block *model.Block, source peer.ID) {
}

// Reset resets the component to a clean state as if it was created at the last commitment.
func (f *Filter) Reset() {}
func (f *Filter) Reset() { /* nothing to reset but comply with interface */ }

func (f *Filter) Shutdown() {
f.TriggerStopped()
Expand Down
15 changes: 5 additions & 10 deletions pkg/protocol/engine/mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,13 @@ func (m *MemPool[VoteRank]) stateDiff(slot iotago.SlotIndex) (*StateDiff, error)

// Reset resets the component to a clean state as if it was created at the last commitment.
func (m *MemPool[VoteRank]) Reset() {
stateDiffsToDelete := make([]iotago.SlotIndex, 0)
m.stateDiffs.ForEachKey(func(slot iotago.SlotIndex) bool {
if slot > m.lastEvictedSlot {
stateDiffsToDelete = append(stateDiffsToDelete, slot)
if stateDiff, deleted := m.stateDiffs.DeleteAndReturn(slot); deleted {
if err := stateDiff.Reset(); err != nil {
m.errorHandler(ierrors.Wrapf(err, "failed to reset state diff for slot %d", slot))
}
}
}

return true
Expand All @@ -215,14 +218,6 @@ func (m *MemPool[VoteRank]) Reset() {
}
})

for _, slot := range stateDiffsToDelete {
if stateDiff, deleted := m.stateDiffs.DeleteAndReturn(slot); deleted {
if err := stateDiff.Reset(); err != nil {
m.errorHandler(ierrors.Wrapf(err, "failed to reset state diff for slot %d", slot))
}
}
}

for _, slot := range attachmentsToDelete {
if evictedAttachments := m.attachments.Evict(slot); evictedAttachments != nil {
evictedAttachments.ForEach(func(id iotago.BlockID, metadata *SignedTransactionMetadata) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,7 @@ func (m *SlotMutations) Evict(index iotago.SlotIndex) error {

// Reset resets the component to a clean state as if it was created at the last commitment.
func (m *SlotMutations) Reset() {
slotsToReset := make([]iotago.SlotIndex, 0)
m.acceptedBlocksBySlot.ForEachKey(func(slot iotago.SlotIndex) bool {
if slot > m.latestCommittedIndex {
slotsToReset = append(slotsToReset, slot)
}

return true
})

for _, slot := range slotsToReset {
m.acceptedBlocksBySlot.Delete(slot)
}
m.acceptedBlocksBySlot.Clear()
}

// AcceptedBlocks returns the set of accepted blocks for the given slot.
Expand Down
10 changes: 4 additions & 6 deletions pkg/protocol/protocol_warp_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,11 @@ func (w *WarpSyncProtocol) ProcessResponse(commitmentID iotago.CommitmentID, blo
return false
}

targetEngine.Reset()
// If the engine is "dirty" we need to restore the state of the engine to the state of the chain commitment.
// As we already decided to switch and sync to this chain we should make sure that processing the blocks from the commitment
// leads to the verified commitment.
if targetEngine.Notarization.AcceptedBlocksCount(commitmentID.Slot()) > 0 {
w.LogError("ENGINE IS DIRTY")
// make sure the engine is clean and requires a warp-sync before we start processing the blocks
if targetEngine.Workers.WaitChildren(); targetEngine.Storage.Settings().LatestCommitment().ID().Slot() > commitmentID.Slot() {
return true
}
targetEngine.Reset()

// Once all blocks are booked we
// 1. Mark all transactions as accepted
Expand Down
6 changes: 2 additions & 4 deletions pkg/storage/permanent/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,11 @@ func (s *Settings) AdvanceLatestStoredSlot(slot iotago.SlotIndex) (err error) {

if _, err = s.storeLatestProcessedSlot.Compute(func(latestStoredSlot iotago.SlotIndex, _ bool) (newValue iotago.SlotIndex, err error) {
if latestStoredSlot >= slot {
return latestStoredSlot, errLatestStoredSlotUnchanged
return latestStoredSlot, kvstore.ErrTypedValueNotChanged
}

return slot, nil
}); err != nil && !ierrors.Is(err, errLatestStoredSlotUnchanged) {
}); err != nil {
return ierrors.Wrap(err, "failed to advance latest stored slot")
}

Expand Down Expand Up @@ -614,5 +614,3 @@ func read[T any](typedValue *kvstore.TypedValue[T]) (value T) {

return value
}

var errLatestStoredSlotUnchanged = ierrors.New("latest stored slot did not change")
7 changes: 7 additions & 0 deletions pkg/storage/prunable/epochstore/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package epochstore

const (
entriesKey byte = iota
lastAccessedEpochKey
lastPrunedEpochKey
)
6 changes: 0 additions & 6 deletions pkg/storage/prunable/epochstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,6 @@ import (
iotago "github.com/iotaledger/iota.go/v4"
)

const (
entriesKey byte = iota
lastAccessedEpochKey
lastPrunedEpochKey
)

type Store[V any] struct {
realm kvstore.Realm
kv *kvstore.TypedStore[iotago.EpochIndex, V]
Expand Down
6 changes: 1 addition & 5 deletions pkg/storage/storage_prunable.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,7 @@ func (s *Storage) RestoreFromDisk() {
}

func (s *Storage) Rollback(targetSlot iotago.SlotIndex) error {
if err := s.prunable.Rollback(s.pruningRange(targetSlot)); err != nil {
return ierrors.Wrapf(err, "failed to rollback prunable storage to slot %d", targetSlot)
}

return nil
return s.prunable.Rollback(s.pruningRange(targetSlot))
}

func (s *Storage) pruningRange(targetSlot iotago.SlotIndex) (targetEpoch iotago.EpochIndex, pruneRange [2]iotago.SlotIndex) {
Expand Down
7 changes: 0 additions & 7 deletions pkg/tests/loss_of_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/stretchr/testify/require"

"github.com/iotaledger/hive.go/log"
"github.com/iotaledger/iota-core/pkg/protocol"
"github.com/iotaledger/iota-core/pkg/testsuite"
"github.com/iotaledger/iota-core/pkg/testsuite/mock"
Expand Down Expand Up @@ -40,8 +39,6 @@ func TestLossOfAcceptanceFromGenesis(t *testing.T) {

ts.Run(true, nil)

node0.Protocol.SetLogLevel(log.LevelTrace)

// Create snapshot to use later.
snapshotPath := ts.Directory.Path(fmt.Sprintf("%d_snapshot", time.Now().Unix()))
require.NoError(t, ts.Node("node0").Protocol.Engines.Main.Get().WriteSnapshot(snapshotPath))
Expand All @@ -51,7 +48,6 @@ func TestLossOfAcceptanceFromGenesis(t *testing.T) {
block0 := ts.IssueValidationBlock("block0", node0,
mock.WithIssuingTime(ts.API.TimeProvider().SlotStartTime(50)),
)
fmt.Println(block0.SlotCommitmentID())
require.EqualValues(t, 48, ts.Block("block0").SlotCommitmentID().Slot())
// Reviving the chain should select one parent from the last committed slot.
require.Len(t, block0.Parents(), 1)
Expand Down Expand Up @@ -93,9 +89,6 @@ func TestLossOfAcceptanceFromGenesis(t *testing.T) {
{
ts.IssueBlocksAtSlots("", []iotago.SlotIndex{58, 59}, 3, "57.2", ts.Nodes("node0", "node1", "node2"), true, nil)

// wait for node to warp-sync before checking (we warp-sync one by one which takes a few seconds)
time.Sleep(10 * time.Second)

ts.AssertEqualStoredCommitmentAtIndex(57, ts.Nodes()...)
ts.AssertLatestCommitmentSlotIndex(57, ts.Nodes()...)
ts.AssertBlocksInCacheAccepted(ts.BlocksWithPrefix("59.0"), true, ts.Nodes()...)
Expand Down

0 comments on commit 38a0ac5

Please sign in to comment.