Skip to content

Commit

Permalink
Merge pull request #293 from iotaledger/feat/reactive-chainmanager
Browse files Browse the repository at this point in the history
Feat: Reactive protocol rework
  • Loading branch information
hmoog authored Dec 5, 2023
2 parents 923faec + 8829fea commit 6378533
Show file tree
Hide file tree
Showing 127 changed files with 3,428 additions and 3,904 deletions.
4 changes: 2 additions & 2 deletions components/dashboard/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ func currentNodeStatus() *nodestatus {
LastPauseGC: m.PauseNs[(m.NumGC+255)%256],
}
// get TangleTime
cl := deps.Protocol.MainEngineInstance().Clock
syncStatus := deps.Protocol.MainEngineInstance().SyncManager.SyncStatus()
cl := deps.Protocol.Engines.Main.Get().Clock
syncStatus := deps.Protocol.Engines.Main.Get().SyncManager.SyncStatus()

status.TangleTime = tangleTime{
Synced: syncStatus.NodeSynced,
Expand Down
20 changes: 10 additions & 10 deletions components/dashboard/explorer_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ func setupExplorerRoutes(routeGroup *echo.Group) {
}

func findBlock(blockID iotago.BlockID) (explorerBlk *ExplorerBlock, err error) {
block, exists := deps.Protocol.MainEngineInstance().Block(blockID)
block, exists := deps.Protocol.Engines.Main.Get().Block(blockID)
if !exists {
return nil, ierrors.Errorf("block not found: %s", blockID.ToHex())
}

cachedBlock, _ := deps.Protocol.MainEngineInstance().BlockCache.Block(blockID)
cachedBlock, _ := deps.Protocol.Engines.Main.Get().BlockCache.Block(blockID)

blockMetadata, err := deps.Protocol.MainEngineInstance().Retainer.BlockMetadata(blockID)
blockMetadata, err := deps.Protocol.Engines.Main.Get().Retainer.BlockMetadata(blockID)
if err != nil {
return nil, ierrors.Wrapf(err, "block metadata %s", blockID.ToHex())
}
Expand Down Expand Up @@ -196,12 +196,12 @@ func getTransaction(c echo.Context) error {
outputID := iotago.OutputID{}
copy(outputID[:], txID[:])

output, err := deps.Protocol.MainEngineInstance().Ledger.Output(outputID)
output, err := deps.Protocol.Engines.Main.Get().Ledger.Output(outputID)
if err != nil {
return err
}

block, exists := deps.Protocol.MainEngineInstance().Block(output.BlockID())
block, exists := deps.Protocol.Engines.Main.Get().Block(output.BlockID())
if !exists {
return ierrors.Errorf("block not found: %s", output.BlockID().ToHex())
}
Expand All @@ -223,12 +223,12 @@ func getTransactionMetadata(c echo.Context) error {
// Get the first output of that transaction (using index 0)
outputID := iotago.OutputID{}
copy(outputID[:], txID[:])
txMetadata, exists := deps.Protocol.MainEngineInstance().Ledger.MemPool().TransactionMetadata(txID)
txMetadata, exists := deps.Protocol.Engines.Main.Get().Ledger.MemPool().TransactionMetadata(txID)
if !exists {
return ierrors.Errorf("tx metadata not found: %s", txID.ToHex())
}

conflicts, _ := deps.Protocol.MainEngineInstance().Ledger.SpendDAG().ConflictingSpenders(txID)
conflicts, _ := deps.Protocol.Engines.Main.Get().Ledger.SpendDAG().ConflictingSpenders(txID)

return httpserver.JSONResponse(c, http.StatusOK, NewTransactionMetadata(txMetadata, conflicts))
}
Expand All @@ -239,7 +239,7 @@ func getOutput(c echo.Context) error {
return err
}

output, err := deps.Protocol.MainEngineInstance().Ledger.Output(outputID)
output, err := deps.Protocol.Engines.Main.Get().Ledger.Output(outputID)
if err != nil {
return err
}
Expand All @@ -253,7 +253,7 @@ func getSlotDetailsByID(c echo.Context) error {
return err
}

commitment, err := deps.Protocol.MainEngineInstance().Storage.Commitments().Load(commitmentID.Slot())
commitment, err := deps.Protocol.Engines.Main.Get().Storage.Commitments().Load(commitmentID.Slot())
if err != nil {
return err
}
Expand All @@ -262,7 +262,7 @@ func getSlotDetailsByID(c echo.Context) error {
return ierrors.Errorf("commitment in the store for slot %d does not match the given commitmentID (%s != %s)", commitmentID.Slot(), commitment.ID(), commitmentID)
}

diffs, err := deps.Protocol.MainEngineInstance().Ledger.SlotDiffs(commitmentID.Slot())
diffs, err := deps.Protocol.Engines.Main.Get().Ledger.SlotDiffs(commitmentID.Slot())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion components/dashboard/tip.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func setupTipsRoutes(routeGroup *echo.Group) {
}

func tips() *TipsResponse {
allTips := append(deps.Protocol.MainEngineInstance().TipManager.StrongTips(), deps.Protocol.MainEngineInstance().TipManager.WeakTips()...)
allTips := append(deps.Protocol.Engines.Main.Get().TipManager.StrongTips(), deps.Protocol.Engines.Main.Get().TipManager.WeakTips()...)
t := make([]string, len(allTips))

for i, tip := range allTips {
Expand Down
2 changes: 1 addition & 1 deletion components/dashboard/visualizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func sendVertex(blk *blocks.Block, confirmed bool) {
IsTx: isTx,
IsTxAccepted: func() bool {
if isTx {
txMetadata, exists := deps.Protocol.MainEngineInstance().Ledger.MemPool().TransactionMetadata(lo.PanicOnErr(signedTransaction.Transaction.ID()))
txMetadata, exists := deps.Protocol.Engines.Main.Get().Ledger.MemPool().TransactionMetadata(lo.PanicOnErr(signedTransaction.Transaction.ID()))
if exists {
return txMetadata.IsAccepted()
}
Expand Down
2 changes: 1 addition & 1 deletion components/dashboard/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func runWebSocketStreams(component *app.Component) {
broadcastWsBlock(&wsblk{MsgTypeNodeStatus, currentNodeStatus()})
broadcastWsBlock(&wsblk{MsgTypeNeighborMetric, neighborMetrics()})
broadcastWsBlock(&wsblk{MsgTypeTipsMetric, &tipsInfo{
TotalTips: len(deps.Protocol.MainEngineInstance().TipManager.StrongTips()) + len(deps.Protocol.MainEngineInstance().TipManager.WeakTips()),
TotalTips: len(deps.Protocol.Engines.Main.Get().TipManager.StrongTips()) + len(deps.Protocol.Engines.Main.Get().TipManager.WeakTips()),
}})
case *componentsmetric:
broadcastWsBlock(&wsblk{MsgTypeComponentCounterMetric, x})
Expand Down
2 changes: 1 addition & 1 deletion components/dashboard_metrics/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func run() error {
}

func configureComponentCountersEvents() {
deps.Protocol.Events.Network.BlockReceived.Hook(func(_ *model.Block, _ peer.ID) {
deps.Protocol.Network.OnBlockReceived(func(_ *model.Block, _ peer.ID) {
incComponentCounter(Received)
})

Expand Down
6 changes: 3 additions & 3 deletions components/dashboard_metrics/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func nodeInfoExtended() *NodeInfoExtended {

func databaseSizesMetrics() (*DatabaseSizesMetric, error) {
return &DatabaseSizesMetric{
Prunable: deps.Protocol.MainEngineInstance().Storage.PrunableDatabaseSize(),
Permanent: deps.Protocol.MainEngineInstance().Storage.PermanentDatabaseSize(),
Total: deps.Protocol.MainEngineInstance().Storage.Size(),
Prunable: deps.Protocol.Engines.Main.Get().Storage.PrunableDatabaseSize(),
Permanent: deps.Protocol.Engines.Main.Get().Storage.PermanentDatabaseSize(),
Total: deps.Protocol.Engines.Main.Get().Storage.Size(),
Time: time.Now().Unix(),
}, nil
}
2 changes: 1 addition & 1 deletion components/debugapi/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func getSlotBlockIDs(index iotago.SlotIndex) (*BlockChangesResponse, error) {
blocksForSlot, err := deps.Protocol.MainEngineInstance().Storage.Blocks(index)
blocksForSlot, err := deps.Protocol.Engines.Main.Get().Storage.Blocks(index)
if err != nil {
return nil, ierrors.Wrapf(err, "failed to get block storage bucket for slot %d", index)
}
Expand Down
23 changes: 13 additions & 10 deletions components/debugapi/commitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (

"github.com/iotaledger/hive.go/ds/walker"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/iota-core/pkg/protocol/chainmanager"
"github.com/iotaledger/iota-core/pkg/protocol"
)

func chainManagerAllChainsDot() (string, error) {
rootCommitment := deps.Protocol.ChainManager.RootCommitment()
rootCommitment := deps.Protocol.Chains.Main.Get().ForkingPoint.Get()
g := graphviz.New()
defer g.Close()

Expand All @@ -32,7 +32,7 @@ func chainManagerAllChainsDot() (string, error) {
}

func chainManagerAllChainsRendered() ([]byte, error) {
rootCommitment := deps.Protocol.ChainManager.RootCommitment()
rootCommitment := deps.Protocol.Chains.Main.Get().ForkingPoint.Get()
g := graphviz.New()
defer g.Close()

Expand All @@ -50,7 +50,7 @@ func chainManagerAllChainsRendered() ([]byte, error) {
return buf.Bytes(), nil
}

func prepareCommitmentGraph(g *graphviz.Graphviz, rootCommitment *chainmanager.ChainCommitment) (*cgraph.Graph, error) {
func prepareCommitmentGraph(g *graphviz.Graphviz, rootCommitment *protocol.Commitment) (*cgraph.Graph, error) {
graph, err := g.Graph()
if err != nil {
return nil, err
Expand All @@ -62,36 +62,39 @@ func prepareCommitmentGraph(g *graphviz.Graphviz, rootCommitment *chainmanager.C
}
root.SetColor("green")

for commitmentWalker := walker.New[*chainmanager.ChainCommitment](false).Push(rootCommitment); commitmentWalker.HasNext(); {
for commitmentWalker := walker.New[*protocol.Commitment](false).Push(rootCommitment); commitmentWalker.HasNext(); {
parentCommitment := commitmentWalker.Next()
parent, parentErr := createNode(graph, parentCommitment)
if parentErr != nil {
return nil, parentErr
}

for _, childCommitment := range parentCommitment.Children() {
if err = parentCommitment.Children.ForEach(func(childCommitment *protocol.Commitment) error {
child, childErr := createNode(graph, childCommitment)
if childErr != nil {
return nil, childErr
return childErr
}

if childCommitment.Chain().ForkingPoint.ID() == deps.Protocol.MainEngineInstance().ChainID() {
if childCommitment.Chain.Get() == deps.Protocol.Chains.Main.Get() {
child.SetColor("green")
}

if _, edgeErr := graph.CreateEdge(fmt.Sprintf("%s -> %s", parentCommitment.ID().String()[:8], childCommitment.ID().String()[:8]), parent, child); edgeErr != nil {
return nil, ierrors.Wrapf(edgeErr, "could not create edge %s -> %s", parentCommitment.ID().String()[:8], childCommitment.ID().String()[:8])
return ierrors.Wrapf(edgeErr, "could not create edge %s -> %s", parentCommitment.ID().String()[:8], childCommitment.ID().String()[:8])
}

commitmentWalker.Push(childCommitment)

return nil
}); err != nil {
return nil, err
}
}

return graph, nil
}

func createNode(graph *cgraph.Graph, commitment *chainmanager.ChainCommitment) (*cgraph.Node, error) {
func createNode(graph *cgraph.Graph, commitment *protocol.Commitment) (*cgraph.Node, error) {
node, err := graph.Node(fmt.Sprintf("%d: %s", commitment.ID().Slot(), commitment.ID().String()[:8]))
if err != nil {
return nil, ierrors.Wrapf(err, "could not create node %s", commitment.ID().String()[:8])
Expand Down
2 changes: 1 addition & 1 deletion components/debugapi/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func configure() error {
return err
}

if block, exists := deps.Protocol.MainEngineInstance().BlockCache.Block(blockID); exists && block.ProtocolBlock() != nil {
if block, exists := deps.Protocol.Engines.Main.Get().BlockCache.Block(blockID); exists && block.ProtocolBlock() != nil {
response := BlockMetadataResponseFromBlock(block)

return httpserver.JSONResponse(c, http.StatusOK, response)
Expand Down
4 changes: 2 additions & 2 deletions components/debugapi/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

//nolint:unparam // we have no error case right now
func validatorsSummary() (*ValidatorsSummaryResponse, error) {
seatManager := deps.Protocol.MainEngineInstance().SybilProtection.SeatManager()
latestSlotIndex := deps.Protocol.MainEngineInstance().Storage.Settings().LatestCommitment().Slot()
seatManager := deps.Protocol.Engines.Main.Get().SybilProtection.SeatManager()
latestSlotIndex := deps.Protocol.Engines.Main.Get().Storage.Settings().LatestCommitment().Slot()
latestCommittee, exists := seatManager.CommitteeInSlot(latestSlotIndex)
if !exists {
return nil, ierrors.Errorf("committee for slot %d was not selected", latestSlotIndex)
Expand Down
2 changes: 1 addition & 1 deletion components/debugapi/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func init() {

func storeTransactionsPerSlot(scd *notarization.SlotCommittedDetails) error {
slot := scd.Commitment.Slot()
stateDiff, err := deps.Protocol.MainEngineInstance().Ledger.MemPool().StateDiff(slot)
stateDiff, err := deps.Protocol.Engines.Main.Get().Ledger.MemPool().StateDiff(slot)
if err != nil {
return ierrors.Wrapf(err, "failed to retrieve state diff for slot %d", slot)
}
Expand Down
6 changes: 3 additions & 3 deletions components/inx/server_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func (s *Server) ReadIsValidatorAccount(_ context.Context, accountInfoRequest *i
return nil, ierrors.Wrap(err, "error when parsing account id")
}

account, exists, err := deps.Protocol.MainEngineInstance().Ledger.Account(accountID, slot)
account, exists, err := deps.Protocol.Engines.Main.Get().Ledger.Account(accountID, slot)
if err != nil {
return nil, ierrors.Wrapf(err, "error when retrieving account data for %s", accountID)
}
Expand All @@ -29,7 +29,7 @@ func (s *Server) ReadIsCommitteeMember(_ context.Context, accountInfoRequest *in
if err != nil {
return nil, ierrors.Wrap(err, "error when parsing account id")
}
committee, exists := deps.Protocol.MainEngineInstance().SybilProtection.SeatManager().CommitteeInSlot(slot)
committee, exists := deps.Protocol.Engines.Main.Get().SybilProtection.SeatManager().CommitteeInSlot(slot)
if !exists {
return nil, ierrors.Errorf("committee does not exist for slot %d", slot)
}
Expand All @@ -44,7 +44,7 @@ func (s *Server) ReadIsCandidate(_ context.Context, accountInfoRequest *inx.Acco
return nil, ierrors.Wrap(err, "error when parsing account id")
}

isCandidateActive, err := deps.Protocol.MainEngineInstance().SybilProtection.IsCandidateActive(accountID, deps.Protocol.APIForSlot(slot).TimeProvider().EpochFromSlot(slot))
isCandidateActive, err := deps.Protocol.Engines.Main.Get().SybilProtection.IsCandidateActive(accountID, deps.Protocol.APIForSlot(slot).TimeProvider().EpochFromSlot(slot))
if err != nil {
return nil, ierrors.Wrap(err, "error when checking if candidate is active")
}
Expand Down
8 changes: 4 additions & 4 deletions components/inx/server_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
)

func (s *Server) ReadActiveRootBlocks(_ context.Context, _ *inx.NoParams) (*inx.RootBlocksResponse, error) {
activeRootBlocks := deps.Protocol.MainEngineInstance().EvictionState.ActiveRootBlocks()
activeRootBlocks := deps.Protocol.Engines.Main.Get().EvictionState.ActiveRootBlocks()

return inx.WrapRootBlocks(activeRootBlocks), nil
}

func (s *Server) ReadBlock(_ context.Context, blockID *inx.BlockId) (*inx.RawBlock, error) {
blkID := blockID.Unwrap()
block, exists := deps.Protocol.MainEngineInstance().Block(blkID) // block +1
block, exists := deps.Protocol.Engines.Main.Get().Block(blkID) // block +1
if !exists {
return nil, status.Errorf(codes.NotFound, "block %s not found", blkID.ToHex())
}
Expand Down Expand Up @@ -146,7 +146,7 @@ func (s *Server) ListenToConfirmedBlocks(_ *inx.NoParams, srv inx.INX_ListenToCo
}

func (s *Server) ReadAcceptedBlocks(slot *inx.SlotIndex, srv inx.INX_ReadAcceptedBlocksServer) error {
blocksStore, err := deps.Protocol.MainEngineInstance().Storage.Blocks(slot.Unwrap())
blocksStore, err := deps.Protocol.Engines.Main.Get().Storage.Blocks(slot.Unwrap())
if err != nil {
return status.Errorf(codes.InvalidArgument, "failed to get blocks: %s", err.Error())
}
Expand Down Expand Up @@ -203,7 +203,7 @@ func (s *Server) attachBlock(ctx context.Context, block *iotago.Block) (*inx.Blo
}

func getINXBlockMetadata(blockID iotago.BlockID) (*inx.BlockMetadata, error) {
retainerBlockMetadata, err := deps.Protocol.MainEngineInstance().Retainer.BlockMetadata(blockID)
retainerBlockMetadata, err := deps.Protocol.Engines.Main.Get().Retainer.BlockMetadata(blockID)
if err != nil {
return nil, ierrors.Errorf("failed to get BlockMetadata: %v", err)
}
Expand Down
10 changes: 5 additions & 5 deletions components/inx/server_commitments.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func inxCommitment(commitment *model.Commitment) *inx.Commitment {

func (s *Server) ListenToCommitments(req *inx.SlotRangeRequest, srv inx.INX_ListenToCommitmentsServer) error {
createCommitmentPayloadForSlotAndSend := func(slot iotago.SlotIndex) error {
commitment, err := deps.Protocol.MainEngineInstance().Storage.Commitments().Load(slot)
commitment, err := deps.Protocol.Engines.Main.Get().Storage.Commitments().Load(slot)
if err != nil {
if ierrors.Is(err, kvstore.ErrKeyNotFound) {
return status.Errorf(codes.NotFound, "commitment slot %d not found", slot)
Expand Down Expand Up @@ -64,15 +64,15 @@ func (s *Server) ListenToCommitments(req *inx.SlotRangeRequest, srv inx.INX_List
return 0, nil
}

latestCommitment := deps.Protocol.MainEngineInstance().SyncManager.LatestCommitment()
latestCommitment := deps.Protocol.Engines.Main.Get().SyncManager.LatestCommitment()

if startSlot > latestCommitment.Slot() {
// no need to send previous commitments
return 0, nil
}

// Stream all available commitments first
prunedEpoch, hasPruned := deps.Protocol.MainEngineInstance().SyncManager.LastPrunedEpoch()
prunedEpoch, hasPruned := deps.Protocol.Engines.Main.Get().SyncManager.LastPrunedEpoch()
if hasPruned && startSlot <= deps.Protocol.CommittedAPI().TimeProvider().EpochEnd(prunedEpoch) {
return 0, status.Errorf(codes.InvalidArgument, "given startSlot %d is older than the current pruningSlot %d", startSlot, deps.Protocol.CommittedAPI().TimeProvider().EpochEnd(prunedEpoch))
}
Expand Down Expand Up @@ -157,7 +157,7 @@ func (s *Server) ListenToCommitments(req *inx.SlotRangeRequest, srv inx.INX_List
}

func (s *Server) ForceCommitUntil(_ context.Context, slot *inx.SlotIndex) (*inx.NoParams, error) {
err := deps.Protocol.MainEngineInstance().Notarization.ForceCommitUntil(slot.Unwrap())
err := deps.Protocol.Engines.Main.Get().Notarization.ForceCommitUntil(slot.Unwrap())
if err != nil {
return nil, ierrors.Wrapf(err, "error while performing force commit until %d", slot.Index)
}
Expand All @@ -171,7 +171,7 @@ func (s *Server) ReadCommitment(_ context.Context, req *inx.CommitmentRequest) (
commitmentSlot = req.GetCommitmentId().Unwrap().Slot()
}

commitment, err := deps.Protocol.MainEngineInstance().Storage.Commitments().Load(commitmentSlot)
commitment, err := deps.Protocol.Engines.Main.Get().Storage.Commitments().Load(commitmentSlot)
if err != nil {
if ierrors.Is(err, kvstore.ErrKeyNotFound) {
return nil, status.Errorf(codes.NotFound, "commitment slot %d not found", req.GetCommitmentSlot())
Expand Down
4 changes: 2 additions & 2 deletions components/inx/server_issuance.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func (s *Server) RequestTips(_ context.Context, req *inx.TipsRequest) (*inx.TipsResponse, error) {
references := deps.Protocol.MainEngineInstance().TipSelection.SelectTips(int(req.GetCount()))
references := deps.Protocol.Engines.Main.Get().TipSelection.SelectTips(int(req.GetCount()))

return &inx.TipsResponse{
StrongTips: inx.NewBlockIds(references[iotago.StrongParentType]),
Expand All @@ -30,7 +30,7 @@ func (s *Server) ValidatePayload(_ context.Context, payload *inx.RawPayload) (*i

switch typedPayload := blockPayload.(type) {
case *iotago.SignedTransaction:
memPool := deps.Protocol.MainEngineInstance().Ledger.MemPool()
memPool := deps.Protocol.Engines.Main.Get().Ledger.MemPool()

inputReferences, inputsErr := memPool.VM().Inputs(typedPayload.Transaction)
if inputsErr != nil {
Expand Down
Loading

0 comments on commit 6378533

Please sign in to comment.