Skip to content

Commit

Permalink
Merge branch 'feat/cache-reset' of github.com:iotaledger/iota-core in…
Browse files Browse the repository at this point in the history
…to feat/reactive-chainmanager
  • Loading branch information
hmoog committed Oct 20, 2023
2 parents 496cc81 + 7e5b1e4 commit a7e3642
Show file tree
Hide file tree
Showing 50 changed files with 370 additions and 39 deletions.
6 changes: 6 additions & 0 deletions pkg/core/buffer/unsolid_commitment_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,9 @@ func (u *UnsolidCommitmentBuffer[V]) GetValuesAndEvict(commitmentID iotago.Commi

return values
}

// Reset resets the component to a clean state as if it was created at the last commitment.
func (u *UnsolidCommitmentBuffer[V]) Reset() {
u.blockBuffers.Clear()
u.commitmentBuffer.Each(func(key iotago.CommitmentID, _ types.Empty) { u.commitmentBuffer.Remove(key) })
}
27 changes: 27 additions & 0 deletions pkg/protocol/engine/accounts/accountsledger/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,33 @@ func (m *Manager) AddAccount(output *utxoledger.Output, blockIssuanceCredits iot
return nil
}

// Reset resets the component to a clean state as if it was created at the last commitment.
func (m *Manager) Reset() {
blockBurnsToDelete := make([]iotago.SlotIndex, 0)
m.blockBurns.ForEachKey(func(slot iotago.SlotIndex) bool {
if slot > m.latestCommittedSlot {
blockBurnsToDelete = append(blockBurnsToDelete, slot)
}

return true
})

versionSignalsToDelete := make([]iotago.SlotIndex, 0)
m.latestSupportedVersionSignals.ForEach(func(slot iotago.SlotIndex, _ *shrinkingmap.ShrinkingMap[iotago.AccountID, *model.SignaledBlock]) {
if slot > m.latestCommittedSlot {
versionSignalsToDelete = append(versionSignalsToDelete, slot)
}
})

for _, slot := range blockBurnsToDelete {
m.blockBurns.Delete(slot)
}

for _, slot := range versionSignalsToDelete {
m.latestSupportedVersionSignals.Evict(slot)
}
}

func (m *Manager) rollbackAccountTo(accountData *accounts.AccountData, targetSlot iotago.SlotIndex) (wasDestroyed bool, err error) {
// to reach targetSlot, we need to rollback diffs from the current latestCommittedSlot down to targetSlot + 1
for diffSlot := m.latestCommittedSlot; diffSlot > targetSlot; diffSlot-- {
Expand Down
3 changes: 3 additions & 0 deletions pkg/protocol/engine/attestation/attestations.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type Attestations interface {
Export(writer io.WriteSeeker, targetSlot iotago.SlotIndex) (err error)
Rollback(index iotago.SlotIndex) (err error)

// Reset resets the component to a clean state as if it was created at the last commitment.
Reset()

RestoreFromDisk() (err error)

module.Interface
Expand Down
26 changes: 26 additions & 0 deletions pkg/protocol/engine/attestation/slotattestation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package slotattestation
import (
"github.com/iotaledger/hive.go/ads"
"github.com/iotaledger/hive.go/core/memstorage"
"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/runtime/module"
Expand Down Expand Up @@ -308,6 +309,31 @@ func (m *Manager) Rollback(targetSlot iotago.SlotIndex) error {
return nil
}

// Reset resets the component to a clean state as if it was created at the last commitment.
func (m *Manager) Reset() {
futureAttestationsToClear := make([]iotago.SlotIndex, 0)
m.futureAttestations.ForEach(func(slot iotago.SlotIndex, _ *shrinkingmap.ShrinkingMap[iotago.AccountID, *iotago.Attestation]) {
if slot > m.lastCommittedSlot {
futureAttestationsToClear = append(futureAttestationsToClear, slot)
}
})

pendingAttestationsToClear := make([]iotago.SlotIndex, 0)
m.pendingAttestations.ForEach(func(slot iotago.SlotIndex, _ *shrinkingmap.ShrinkingMap[iotago.AccountID, *iotago.Attestation]) {
if slot > m.lastCommittedSlot {
pendingAttestationsToClear = append(pendingAttestationsToClear, slot)
}
})

for _, slot := range futureAttestationsToClear {
m.futureAttestations.Evict(slot)
}

for _, slot := range pendingAttestationsToClear {
m.pendingAttestations.Evict(slot)
}
}

func (m *Manager) computeAttestationCommitmentOffset(slot iotago.SlotIndex) (cutoffSlot iotago.SlotIndex, isValid bool) {
if slot < m.apiProvider.APIForSlot(slot).ProtocolParameters().MaxCommittableAge() {
return 0, false
Expand Down
3 changes: 3 additions & 0 deletions pkg/protocol/engine/blockdag/blockdag.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,8 @@ type BlockDAG interface {
// without requesting it.
GetOrRequestBlock(blockID iotago.BlockID) (block *blocks.Block, requested bool)

// Reset resets the component to a clean state as if it was created at the last commitment.
Reset()

module.Interface
}
5 changes: 5 additions & 0 deletions pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ func (b *BlockDAG) GetOrRequestBlock(blockID iotago.BlockID) (block *blocks.Bloc
})
}

// Reset resets the component to a clean state as if it was created at the last commitment.
func (b *BlockDAG) Reset() {
b.uncommittedSlotBlocks.Reset()
}

func (b *BlockDAG) Shutdown() {
b.TriggerStopped()
b.workers.Shutdown()
Expand Down
5 changes: 5 additions & 0 deletions pkg/protocol/engine/blocks/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,8 @@ func (b *Blocks) StoreBlock(block *Block) (stored bool) {

return storage.Set(block.ID(), block)
}

// Reset resets the component to a clean state as if it was created at the last commitment.
func (b *Blocks) Reset() {
b.blocks.Clear()
}
3 changes: 3 additions & 0 deletions pkg/protocol/engine/booker/booker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ import (
)

type Booker interface {
// Reset resets the component to a clean state as if it was created at the last commitment.
Reset()

module.Interface
}
3 changes: 3 additions & 0 deletions pkg/protocol/engine/booker/inmemorybooker/booker.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ func (b *Booker) Queue(block *blocks.Block) error {
return nil
}

// Reset resets the component to a clean state as if it was created at the last commitment.
func (b *Booker) Reset() {}

func (b *Booker) Shutdown() {
b.TriggerStopped()
}
Expand Down
35 changes: 10 additions & 25 deletions pkg/protocol/engine/clock/blocktime/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,19 @@ func NewProvider(opts ...options.Option[Clock]) module.Provider[*engine.Engine,
asyncOpt := event.WithWorkerPool(c.workerPool)
c.HookStopped(lo.Batch(
e.Events.BlockGadget.BlockAccepted.Hook(func(block *blocks.Block) {
c.advanceAccepted(block.IssuingTime())
c.acceptedTime.Advance(block.IssuingTime())
}, asyncOpt).Unhook,

e.Events.BlockGadget.BlockConfirmed.Hook(func(block *blocks.Block) {
c.advanceConfirmed(block.IssuingTime())
c.confirmedTime.Advance(block.IssuingTime())
}, asyncOpt).Unhook,

e.Events.SlotGadget.SlotFinalized.Hook(func(slot iotago.SlotIndex) {
timeProvider := e.APIForSlot(slot).TimeProvider()
slotEndTime := timeProvider.SlotEndTime(slot)

c.onSlotFinalized(slotEndTime)
c.acceptedTime.Advance(slotEndTime)
c.confirmedTime.Advance(slotEndTime)
}, asyncOpt).Unhook,
))
})
Expand Down Expand Up @@ -98,29 +99,13 @@ func (c *Clock) Snapshot() *clock.Snapshot {
}
}

// Reset resets the time values tracked in the clock to the given time.
func (c *Clock) Reset(newTime time.Time) {
c.acceptedTime.Reset(newTime)
c.confirmedTime.Reset(newTime)
}

func (c *Clock) Shutdown() {
c.workerPool.Shutdown()
c.TriggerStopped()
}

func (c *Clock) advanceAccepted(time time.Time) {
c.Lock()
defer c.Unlock()

c.acceptedTime.Advance(time)
}

func (c *Clock) advanceConfirmed(time time.Time) {
c.Lock()
defer c.Unlock()

c.confirmedTime.Advance(time)
}

func (c *Clock) onSlotFinalized(slotEndTime time.Time) {
c.Lock()
defer c.Unlock()

c.acceptedTime.Advance(slotEndTime)
c.confirmedTime.Advance(slotEndTime)
}
11 changes: 11 additions & 0 deletions pkg/protocol/engine/clock/blocktime/relativetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ func (c *RelativeTime) Advance(newTime time.Time) (updated bool) {
return true
}

// Reset resets the time value to the given time (resetting monotonicity of the relative time).
func (c *RelativeTime) Reset(newTime time.Time) {
c.mutex.Lock()
defer c.mutex.Unlock()

c.timeUpdateOffset = time.Now()
c.time = newTime

c.OnUpdated.Trigger(c.time)
}

// determineTimeUpdateOffset determines the new timeUpdateOffset that is in sync with the monotonic clock.
func (c *RelativeTime) determineTimeUpdateOffset(newTime time.Time) time.Time {
diff := time.Since(c.timeUpdateOffset)
Expand Down
3 changes: 3 additions & 0 deletions pkg/protocol/engine/clock/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ type Clock interface {
// Snapshot returns a snapshot of all time values tracked in the clock read atomically.
Snapshot() *Snapshot

// Reset resets the time values tracked in the clock to the given time.
Reset(newTime time.Time)

// Interface embeds the required methods of the module.Interface.
module.Interface
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ func (c *CommitmentFilter) evaluateBlock(block *blocks.Block) {
c.events.BlockAllowed.Trigger(block)
}

// Reset resets the component to a clean state as if it was created at the last commitment.
func (c *CommitmentFilter) Reset() {}

func (c *CommitmentFilter) Shutdown() {
c.TriggerStopped()
}
3 changes: 3 additions & 0 deletions pkg/protocol/engine/commitmentfilter/commitmentfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,8 @@ type CommitmentFilter interface {
// ProcessPreFilteredBlock processes block from the given source.
ProcessPreFilteredBlock(block *blocks.Block)

// Reset resets the component to a clean state as if it was created at the last commitment.
Reset()

module.Interface
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ func (s *Scheduler) AddBlock(block *blocks.Block) {
}
}

// Reset resets the component to a clean state as if it was created at the last commitment.
func (s *Scheduler) Reset() {
// TODO: clear all buffers? currently irrelevant because the scheduler is not relevant for commitments.
}

func (s *Scheduler) enqueueBasicBlock(block *blocks.Block) {
s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,6 @@ func (s *Scheduler) AddBlock(block *blocks.Block) {
s.events.BlockScheduled.Trigger(block)
}
}

// Reset resets the component to a clean state as if it was created at the last commitment.
func (s *Scheduler) Reset() {}
4 changes: 3 additions & 1 deletion pkg/protocol/engine/congestioncontrol/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Scheduler interface {
AddBlock(*blocks.Block)
// IsBlockIssuerReady returns true if the block issuer is ready to issuer a block, i.e., if the block issuer were to add a block to the scheduler, would it be scheduled.
IsBlockIssuerReady(iotago.AccountID, ...*blocks.Block) bool
// BufferSize returns the current buffer size of the Scheduler as block count.
// BasicBufferSize returns the current buffer size of the Scheduler as block count.
BasicBufferSize() int
// ValidatorBufferSize returns the current buffer size of the Scheduler as block count.
ValidatorBufferSize() int
Expand All @@ -23,6 +23,8 @@ type Scheduler interface {
IssuerQueueWork(issuerID iotago.AccountID) iotago.WorkScore
// ValidatorQueueBlockCount returns the queue size of the given validator as block count.
ValidatorQueueBlockCount(validatorID iotago.AccountID) int
// Reset resets the component to a clean state as if it was created at the last commitment.
Reset()

module.Interface
}
3 changes: 3 additions & 0 deletions pkg/protocol/engine/consensus/blockgadget/gadget.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ type Gadget interface {

TrackWitnessWeight(votingBlock *blocks.Block)
SetAccepted(block *blocks.Block) bool

// Reset resets the component to a clean state as if it was created at the last commitment.
Reset()
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func (g *Gadget) Shutdown() {
g.TriggerStopped()
}

// Reset resets the component to a clean state as if it was created at the last commitment.
func (g *Gadget) Reset() {}

// propagate performs a breadth-first past cone walk starting at initialBlockIDs. evaluateFunc is called for every block visited
// and needs to return whether to continue the walk further.
func (g *Gadget) propagate(initialBlockIDs iotago.BlockIDs, evaluateFunc func(block *blocks.Block) bool) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/protocol/engine/consensus/slotgadget/slotgadget.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ import (
)

type Gadget interface {
// Reset resets the component to a clean state as if it was created at the last commitment.
Reset()

module.Interface
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ func NewProvider(opts ...options.Option[Gadget]) module.Provider[*engine.Engine,
})
}

// Reset resets the component to a clean state as if it was created at the last commitment.
func (g *Gadget) Reset() {
// TODO: reset slotTrackers
}

func (g *Gadget) Shutdown() {
g.TriggerStopped()
g.workers.Shutdown()
Expand Down
40 changes: 35 additions & 5 deletions pkg/protocol/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,41 @@ func New(
)
}

func (e *Engine) ProcessBlockFromPeer(block *model.Block, source peer.ID) {
e.Filter.ProcessReceivedBlock(block, source)
e.Events.BlockProcessed.Trigger(block.ID())
}

// Reset resets the component to a clean state as if it was created at the last commitment.
func (e *Engine) Reset() {
e.Workers.WaitChildren()

e.BlockRequester.Clear()
e.Storage.Reset()
e.EvictionState.Reset()
e.Filter.Reset()
e.CommitmentFilter.Reset()
e.BlockCache.Reset()
e.BlockDAG.Reset()
e.Booker.Reset()
e.Ledger.Reset()
e.BlockGadget.Reset()
e.SlotGadget.Reset()
e.Notarization.Reset()
e.Attestations.Reset()
e.SybilProtection.Reset()
e.Scheduler.Reset()
e.TipManager.Reset()
e.TipSelection.Reset()
e.Retainer.Reset()
e.SyncManager.Reset()
e.UpgradeOrchestrator.Reset()

latestCommittedSlot := e.Storage.Settings().LatestCommitment().Slot()
latestCommittedTime := e.APIForSlot(latestCommittedSlot).TimeProvider().SlotEndTime(latestCommittedSlot)
e.Clock.Reset(latestCommittedTime)
}

func (e *Engine) Shutdown() {
if !e.WasShutdown() {
e.TriggerShutdown()
Expand Down Expand Up @@ -248,11 +283,6 @@ func (e *Engine) Shutdown() {
}
}

func (e *Engine) ProcessBlockFromPeer(block *model.Block, source peer.ID) {
e.Filter.ProcessReceivedBlock(block, source)
e.Events.BlockProcessed.Trigger(block.ID())
}

func (e *Engine) BlockFromCache(id iotago.BlockID) (*blocks.Block, bool) {
return e.BlockCache.Block(id)
}
Expand Down
Loading

0 comments on commit a7e3642

Please sign in to comment.