Skip to content

Commit

Permalink
Merge pull request #729 from iotaledger/debug/mutation-warp-sync
Browse files Browse the repository at this point in the history
Fix eviction
  • Loading branch information
jonastheis authored Feb 15, 2024
2 parents d22845f + e9f561b commit 9993949
Show file tree
Hide file tree
Showing 27 changed files with 427 additions and 190 deletions.
39 changes: 30 additions & 9 deletions components/debugapi/commitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,22 @@ func prepareCommitmentGraph(g *graphviz.Graphviz, rootCommitment *protocol.Commi
return nil, parentErr
}

if err = parentCommitment.Children.ForEach(func(childCommitment *protocol.Commitment) error {
child, childErr := createNode(graph, childCommitment)
if childErr != nil {
return childErr
}
// TODO: this should be removed once eviction of commitments is properly implemented
if parentCommitment.Children.IsEmpty() {
if childCommitment, exists := parentCommitment.Chain.Get().Commitment(parentCommitment.Slot() + 1); exists {
if err = renderChild(childCommitment, graph, parentCommitment, parent); err != nil {
return nil, err
}

if childCommitment.Chain.Get() == deps.Protocol.Chains.Main.Get() {
child.SetColor("green")
commitmentWalker.Push(childCommitment)

continue
}
}

if _, edgeErr := graph.CreateEdge(fmt.Sprintf("%s -> %s", parentCommitment.ID().String()[:8], childCommitment.ID().String()[:8]), parent, child); edgeErr != nil {
return ierrors.Wrapf(edgeErr, "could not create edge %s -> %s", parentCommitment.ID().String()[:8], childCommitment.ID().String()[:8])
if err = parentCommitment.Children.ForEach(func(childCommitment *protocol.Commitment) error {
if err = renderChild(childCommitment, graph, parentCommitment, parent); err != nil {
return err
}

commitmentWalker.Push(childCommitment)
Expand All @@ -94,6 +98,23 @@ func prepareCommitmentGraph(g *graphviz.Graphviz, rootCommitment *protocol.Commi
return graph, nil
}

func renderChild(childCommitment *protocol.Commitment, graph *cgraph.Graph, parentCommitment *protocol.Commitment, parent *cgraph.Node) error {
child, err := createNode(graph, childCommitment)
if err != nil {
return err
}

if childCommitment.Chain.Get() == deps.Protocol.Chains.Main.Get() {
child.SetColor("green")
}

if _, edgeErr := graph.CreateEdge(fmt.Sprintf("%s -> %s", parentCommitment.ID().String()[:8], childCommitment.ID().String()[:8]), parent, child); edgeErr != nil {
return ierrors.Wrapf(edgeErr, "could not create edge %s -> %s", parentCommitment.ID().String()[:8], childCommitment.ID().String()[:8])
}

return nil
}

func createNode(graph *cgraph.Graph, commitment *protocol.Commitment) (*cgraph.Node, error) {
node, err := graph.CreateNode(fmt.Sprintf("%d-%s", commitment.ID().Slot(), commitment.ID().Identifier().String()[:8]))
if err != nil {
Expand Down
15 changes: 9 additions & 6 deletions components/debugapi/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,13 @@ func configure() error {

routeGroup := deps.RestRouteManager.AddRoute("debug/v2")

debugAPIWorkerPool := workerpool.NewGroup("DebugAPI").CreatePool("DebugAPI", workerpool.WithWorkerCount(1))

deps.Protocol.Events.Engine.BlockDAG.BlockAttached.Hook(func(block *blocks.Block) {
blocksPerSlot.Set(block.ID().Slot(), append(lo.Return1(blocksPerSlot.GetOrCreate(block.ID().Slot(), func() []*blocks.Block {
return make([]*blocks.Block, 0)
})), block))
})
}, event.WithWorkerPool(debugAPIWorkerPool))

deps.Protocol.Events.Engine.SlotGadget.SlotFinalized.Hook(func(index iotago.SlotIndex) {
epoch := deps.Protocol.APIForSlot(index).TimeProvider().EpochFromSlot(index)
Expand All @@ -113,23 +115,24 @@ func configure() error {
}
}

}, event.WithWorkerPool(workerpool.NewGroup("DebugAPI").CreatePool("PruneDebugAPI", workerpool.WithWorkerCount(1))))
}, event.WithWorkerPool(debugAPIWorkerPool))

deps.Protocol.Events.Engine.Notarization.SlotCommitted.Hook(func(scd *notarization.SlotCommittedDetails) {
if err := storeTransactionsPerSlot(scd); err != nil {
Component.LogWarnf(">> DebugAPI Error: %s\n", err)
}
})
}, event.WithWorkerPool(debugAPIWorkerPool))

deps.Protocol.Events.Engine.EvictionState.SlotEvicted.Hook(func(index iotago.SlotIndex) {
deps.Protocol.Events.Engine.Evict.Hook(func(index iotago.SlotIndex) {
blocksInSlot, exists := blocksPerSlot.Get(index)
if !exists {
return
}

for _, block := range blocksInSlot {
if block.ProtocolBlock() == nil {
Component.LogInfof("block is a root block", block.ID())
Component.LogInfof("block is a root block %s", block.ID())

continue
}

Expand All @@ -146,7 +149,7 @@ func configure() error {
}

blocksPerSlot.Delete(index)
})
}, event.WithWorkerPool(debugAPIWorkerPool))

routeGroup.GET(RouteBlockMetadata, func(c echo.Context) error {
blockID, err := httpserver.ParseBlockIDParam(c, api.ParameterBlockID)
Expand Down
25 changes: 11 additions & 14 deletions components/debugapi/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"github.com/iotaledger/hive.go/ads"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore/mapdb"
"github.com/iotaledger/iota-core/pkg/protocol/engine/mempool"
"github.com/iotaledger/iota-core/pkg/protocol/engine/notarization"
iotago "github.com/iotaledger/iota.go/v4"
)
Expand All @@ -17,10 +16,7 @@ func init() {

func storeTransactionsPerSlot(scd *notarization.SlotCommittedDetails) error {
slot := scd.Commitment.Slot()
stateDiff, err := deps.Protocol.Engines.Main.Get().Ledger.MemPool().StateDiff(slot)
if err != nil {
return ierrors.Wrapf(err, "failed to retrieve state diff for slot %d", slot)
}

mutationsTree := ads.NewSet[iotago.Identifier](
mapdb.NewMapDB(),
iotago.Identifier.Bytes,
Expand All @@ -33,23 +29,24 @@ func storeTransactionsPerSlot(scd *notarization.SlotCommittedDetails) error {
IncludedTransactions: make([]string, 0),
}

var innerErr error
stateDiff.ExecutedTransactions().ForEach(func(_ iotago.TransactionID, txMeta mempool.TransactionMetadata) bool {
tcs.IncludedTransactions = append(tcs.IncludedTransactions, txMeta.ID().String())
if err := mutationsTree.Add(txMeta.ID()); err != nil {
innerErr = ierrors.Wrapf(err, "failed to add transaction to mutations tree, txID: %s", txMeta.ID())
for _, transaction := range scd.Mutations {
txID, err := transaction.ID()
if err != nil {
return ierrors.Wrapf(err, "failed to calculate transactionID")
}

return false
tcs.IncludedTransactions = append(tcs.IncludedTransactions, txID.String())
if err = mutationsTree.Add(txID); err != nil {
return ierrors.Wrapf(err, "failed to add transaction to mutations tree, txID: %s", txID)
}

return true
})
}

tcs.MutationsRoot = mutationsTree.Root().String()

transactionsPerSlot[slot] = tcs

return innerErr
return nil
}

func getSlotTransactionIDs(slot iotago.SlotIndex) (*TransactionsChangesResponse, error) {
Expand Down
13 changes: 11 additions & 2 deletions components/prometheus/metrics_commitments.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,28 @@ var CommitmentsMetrics = collector.NewCollection(commitmentsNamespace,
collector.WithHelp("Number of accepted blocks by the node per slot."),
collector.WithLabels("slot"),
collector.WithPruningDelay(10*time.Minute),
collector.WithResetBeforeCollecting(true),
collector.WithInitFunc(func() {
deps.Protocol.Events.Engine.Notarization.SlotCommitted.Hook(func(details *notarization.SlotCommittedDetails) {
deps.Collector.Update(commitmentsNamespace, acceptedBlocks, float64(details.AcceptedBlocks.Size()), strconv.Itoa(int(details.Commitment.Slot())))
}, event.WithWorkerPool(Component.WorkerPool))
}),
)),
collector.WithMetric(collector.NewMetric(transactions,
collector.WithType(collector.Gauge),
collector.WithHelp("Number of accepted transactions by the node per slot."),
collector.WithLabels("slot"),
collector.WithPruningDelay(10*time.Minute),
collector.WithInitFunc(func() {
deps.Protocol.Events.Engine.Notarization.SlotCommitted.Hook(func(details *notarization.SlotCommittedDetails) {
deps.Collector.Update(commitmentsNamespace, transactions, float64(len(details.Mutations)), strconv.Itoa(int(details.Commitment.Slot())))
}, event.WithWorkerPool(Component.WorkerPool))
}),
)),
collector.WithMetric(collector.NewMetric(validators,
collector.WithType(collector.Gauge),
collector.WithHelp("Number of active validators per slot."),
collector.WithLabels("slot"),
collector.WithPruningDelay(10*time.Minute),
collector.WithResetBeforeCollecting(true),
collector.WithInitFunc(func() {
deps.Protocol.Events.Engine.Notarization.SlotCommitted.Hook(func(details *notarization.SlotCommittedDetails) {
deps.Collector.Update(commitmentsNamespace, validators, float64(details.ActiveValidatorsCount), strconv.Itoa(int(details.Commitment.Slot())))
Expand Down
7 changes: 1 addition & 6 deletions pkg/protocol/engine/blocks/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package blocks

import (
"github.com/iotaledger/hive.go/core/memstorage"
"github.com/iotaledger/hive.go/runtime/event"
"github.com/iotaledger/hive.go/runtime/syncutils"
"github.com/iotaledger/iota-core/pkg/model"
"github.com/iotaledger/iota-core/pkg/protocol/engine/eviction"
iotago "github.com/iotaledger/iota.go/v4"
)

type Blocks struct {
Evict *event.Event1[iotago.SlotIndex]
blocks *memstorage.IndexedStorage[iotago.SlotIndex, iotago.BlockID, *Block]
evictionState *eviction.State
apiProvider iotago.APIProvider
Expand All @@ -19,16 +17,13 @@ type Blocks struct {

func New(evictionState *eviction.State, apiProvider iotago.APIProvider) *Blocks {
return &Blocks{
Evict: event.New1[iotago.SlotIndex](),
blocks: memstorage.NewIndexedStorage[iotago.SlotIndex, iotago.BlockID, *Block](),
evictionState: evictionState,
apiProvider: apiProvider,
}
}

func (b *Blocks) EvictUntil(slot iotago.SlotIndex) {
b.Evict.Trigger(slot)

func (b *Blocks) Evict(slot iotago.SlotIndex) {
b.evictionMutex.Lock()
defer b.evictionMutex.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion pkg/protocol/engine/booker/inmemorybooker/booker.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (b *Booker) setupBlock(block *blocks.Block) {

parentBlock.Invalid().OnUpdateOnce(func(_ bool, _ bool) {
if block.SetInvalid() {
b.events.BlockInvalid.Trigger(block, ierrors.New("block marked as invalid in Booker"))
b.events.BlockInvalid.Trigger(block, ierrors.Errorf("block marked as invalid in Booker because parent block is invalid %s", parentBlock.ID()))
}
})
})
Expand Down
16 changes: 10 additions & 6 deletions pkg/protocol/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,6 @@ func (e *Engine) setupBlockStorage() {
}

func (e *Engine) setupEvictionState() {
e.Events.EvictionState.LinkTo(e.EvictionState.Events)

wp := e.Workers.CreatePool("EvictionState", workerpool.WithWorkerCount(1)) // Using just 1 worker to avoid contention

e.Events.BlockGadget.BlockAccepted.Hook(func(block *blocks.Block) {
Expand All @@ -471,18 +469,24 @@ func (e *Engine) setupEvictionState() {

e.Events.Notarization.LatestCommitmentUpdated.Hook(func(commitment *model.Commitment) {
e.EvictionState.AdvanceActiveWindowToIndex(commitment.Slot())
}, event.WithWorkerPool(wp))
e.BlockRequester.EvictUntil(commitment.Slot())
})

e.Events.EvictionState.SlotEvicted.Hook(e.BlockCache.EvictUntil)
// We evict the block cache and trigger the eviction event in a separate worker pool.
// The block cache can be evicted asynchronously, as its internal state is defined via the EvictionState, and it will
// be updated accordingly on LatestCommitmentUpdated (atomically).
evictionWP := e.Workers.CreatePool("Eviction", workerpool.WithWorkerCount(1)) // Using just 1 worker to avoid contention
e.Events.Notarization.LatestCommitmentUpdated.Hook(func(commitment *model.Commitment) {
e.BlockCache.Evict(commitment.Slot())
e.Events.Evict.Trigger(commitment.Slot())
}, event.WithWorkerPool(evictionWP))

e.EvictionState.Initialize(e.Storage.Settings().LatestCommitment().Slot())
}

func (e *Engine) setupBlockRequester() {
e.Events.BlockRequester.LinkTo(e.BlockRequester.Events)

e.Events.EvictionState.SlotEvicted.Hook(e.BlockRequester.EvictUntil)

// We need to hook to make sure that the request is created before the block arrives to avoid a race condition
// where we try to delete the request again before it is created. Thus, continuing to request forever.
e.Events.BlockDAG.BlockMissing.Hook(func(block *blocks.Block) {
Expand Down
5 changes: 2 additions & 3 deletions pkg/protocol/engine/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/iotaledger/iota-core/pkg/protocol/engine/congestioncontrol/scheduler"
"github.com/iotaledger/iota-core/pkg/protocol/engine/consensus/blockgadget"
"github.com/iotaledger/iota-core/pkg/protocol/engine/consensus/slotgadget"
"github.com/iotaledger/iota-core/pkg/protocol/engine/eviction"
"github.com/iotaledger/iota-core/pkg/protocol/engine/filter/postsolidfilter"
"github.com/iotaledger/iota-core/pkg/protocol/engine/filter/presolidfilter"
"github.com/iotaledger/iota-core/pkg/protocol/engine/ledger"
Expand All @@ -27,9 +26,9 @@ import (
type Events struct {
BlockProcessed *event.Event1[iotago.BlockID]
AcceptedBlockProcessed *event.Event1[*blocks.Block]
Evict *event.Event1[iotago.SlotIndex]
StoragePruned *event.Event1[iotago.EpochIndex]

EvictionState *eviction.Events
PreSolidFilter *presolidfilter.Events
PostSolidFilter *postsolidfilter.Events
BlockRequester *eventticker.Events[iotago.SlotIndex, iotago.BlockID]
Expand All @@ -55,8 +54,8 @@ var NewEvents = event.CreateGroupConstructor(func() (newEvents *Events) {
return &Events{
BlockProcessed: event.New1[iotago.BlockID](),
AcceptedBlockProcessed: event.New1[*blocks.Block](),
Evict: event.New1[iotago.SlotIndex](),
StoragePruned: event.New1[iotago.EpochIndex](),
EvictionState: eviction.NewEvents(),
PreSolidFilter: presolidfilter.NewEvents(),
PostSolidFilter: postsolidfilter.NewEvents(),
BlockRequester: eventticker.NewEvents[iotago.SlotIndex, iotago.BlockID](),
Expand Down
19 changes: 0 additions & 19 deletions pkg/protocol/engine/eviction/events.go

This file was deleted.

10 changes: 1 addition & 9 deletions pkg/protocol/engine/eviction/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (

// State represents the state of the eviction and keeps track of the root blocks.
type State struct {
Events *Events

settings *permanent.Settings
rootBlockStorageFunc func(iotago.SlotIndex) (*slotstore.Store[iotago.BlockID, iotago.CommitmentID], error)
lastCommittedSlot iotago.SlotIndex
Expand All @@ -30,7 +28,6 @@ type State struct {
func NewState(settings *permanent.Settings, rootBlockStorageFunc func(iotago.SlotIndex) (*slotstore.Store[iotago.BlockID, iotago.CommitmentID], error)) (state *State) {
return &State{
settings: settings,
Events: NewEvents(),
rootBlockStorageFunc: rootBlockStorageFunc,
}
}
Expand All @@ -42,18 +39,13 @@ func (s *State) Initialize(lastCommittedSlot iotago.SlotIndex) {

func (s *State) AdvanceActiveWindowToIndex(slot iotago.SlotIndex) {
s.evictionMutex.Lock()
defer s.evictionMutex.Unlock()

if slot <= s.lastCommittedSlot {
s.evictionMutex.Unlock()
return
}

s.lastCommittedSlot = slot

s.evictionMutex.Unlock()

// We only delay eviction in the Eviction State, but components evict on commitment, which in this context is slot.
s.Events.SlotEvicted.Trigger(slot)
}

func (s *State) LastEvictedSlot() iotago.SlotIndex {
Expand Down
2 changes: 1 addition & 1 deletion pkg/protocol/engine/ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Ledger interface {
ManaManager() *mana.Manager
RMCManager() *rmc.Manager

CommitSlot(slot iotago.SlotIndex) (stateRoot, mutationRoot, accountRoot iotago.Identifier, created utxoledger.Outputs, consumed utxoledger.Spents, err error)
CommitSlot(slot iotago.SlotIndex) (stateRoot, mutationRoot, accountRoot iotago.Identifier, created utxoledger.Outputs, consumed utxoledger.Spents, mutations []*iotago.Transaction, err error)

Import(reader io.ReadSeeker) error
Export(writer io.WriteSeeker, targetSlot iotago.SlotIndex) error
Expand Down
Loading

0 comments on commit 9993949

Please sign in to comment.