From 2952130b961d610920d72bb23c1b254edf4ec3e5 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Tue, 7 Nov 2023 18:17:31 +0100 Subject: [PATCH] Added new INX rpc requested by chronicle --- components/inx/server_blocks.go | 28 ++++++ components/inx/server_commitments.go | 130 +++++++++++++++++++++++++++ go.mod | 2 +- go.sum | 4 +- tools/gendoc/go.mod | 2 +- tools/gendoc/go.sum | 4 +- 6 files changed, 164 insertions(+), 6 deletions(-) diff --git a/components/inx/server_blocks.go b/components/inx/server_blocks.go index 0565e94b8..07a0a8ed3 100644 --- a/components/inx/server_blocks.go +++ b/components/inx/server_blocks.go @@ -12,6 +12,7 @@ import ( "github.com/iotaledger/hive.go/runtime/workerpool" inx "github.com/iotaledger/inx/go" "github.com/iotaledger/iota-core/pkg/blockhandler" + "github.com/iotaledger/iota-core/pkg/model" "github.com/iotaledger/iota-core/pkg/protocol/engine/blocks" iotago "github.com/iotaledger/iota.go/v4" ) @@ -123,6 +124,33 @@ func (s *Server) ListenToConfirmedBlocks(_ *inx.NoParams, srv inx.INX_ListenToCo return ctx.Err() } +func (s *Server) ReadAcceptedBlocks(slot *inx.SlotIndex, srv inx.INX_ReadAcceptedBlocksServer) error { + blocksStore, err := deps.Protocol.MainEngineInstance().Storage.Blocks(slot.Unwrap()) + if err != nil { + return status.Errorf(codes.InvalidArgument, "failed to get blocks: %s", err.Error()) + } + + if err := blocksStore.ForEachBlockInSlot(func(block *model.Block) error { + metadata, err := getINXBlockMetadata(block.ID()) + if err != nil { + return err + } + + payload := &inx.BlockWithMetadata{ + Metadata: metadata, + Block: &inx.RawBlock{ + Data: block.Data(), + }, + } + + return srv.Send(payload) + }); err != nil { + return status.Errorf(codes.Internal, "failed to iterate blocks: %s", err.Error()) + } + + return nil +} + func (s *Server) SubmitBlock(ctx context.Context, rawBlock *inx.RawBlock) (*inx.BlockId, error) { block, err := rawBlock.UnwrapBlock(deps.Protocol) if err != nil { diff --git a/components/inx/server_commitments.go b/components/inx/server_commitments.go index b16713f48..a88448dd8 100644 --- a/components/inx/server_commitments.go +++ b/components/inx/server_commitments.go @@ -8,6 +8,7 @@ import ( "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/kvstore" + "github.com/iotaledger/hive.go/runtime/workerpool" inx "github.com/iotaledger/inx/go" "github.com/iotaledger/iota-core/pkg/model" iotago "github.com/iotaledger/iota.go/v4" @@ -22,6 +23,135 @@ func inxCommitment(commitment *model.Commitment) *inx.Commitment { } } +func (s *Server) ListenToCommitments(req *inx.SlotRangeRequest, srv inx.INX_ListenToCommitmentsServer) error { + createCommitmentPayloadForSlotAndSend := func(slot iotago.SlotIndex) error { + commitment, err := deps.Protocol.MainEngineInstance().Storage.Commitments().Load(slot) + if err != nil { + if ierrors.Is(err, kvstore.ErrKeyNotFound) { + return status.Errorf(codes.NotFound, "commitment slot %d not found", slot) + } + + return err + } + + if err := srv.Send(inxCommitment(commitment)); err != nil { + return ierrors.Errorf("send error: %w", err) + } + + return nil + } + + sendSlotsRange := func(startSlot iotago.SlotIndex, endSlot iotago.SlotIndex) error { + for currentSlot := startSlot; currentSlot <= endSlot; currentSlot++ { + if err := createCommitmentPayloadForSlotAndSend(currentSlot); err != nil { + return err + } + } + + return nil + } + + // if a startSlot is given, we send all available commitments including the start slot. + // if an endSlot is given, we send all available commitments up to and including min(latestCommitmentSlot, endSlot). + // if no startSlot is given, but an endSlot, we don't send previous commitments. + sendPreviousSlots := func(startSlot iotago.SlotIndex, endSlot iotago.SlotIndex) (iotago.SlotIndex, error) { + if startSlot == 0 { + // no need to send previous commitments + return 0, nil + } + + latestCommitment := deps.Protocol.MainEngineInstance().SyncManager.LatestCommitment() + + if startSlot > latestCommitment.Slot() { + // no need to send previous commitments + return 0, nil + } + + // Stream all available commitments first + prunedEpoch, hasPruned := deps.Protocol.MainEngineInstance().SyncManager.LastPrunedEpoch() + if hasPruned && startSlot <= deps.Protocol.CommittedAPI().TimeProvider().EpochEnd(prunedEpoch) { + return 0, status.Errorf(codes.InvalidArgument, "given startSlot %d is older than the current pruningSlot %d", startSlot, deps.Protocol.CommittedAPI().TimeProvider().EpochEnd(prunedEpoch)) + } + + if endSlot == 0 || endSlot > latestCommitment.Slot() { + endSlot = latestCommitment.Slot() + } + + if err := sendSlotsRange(startSlot, endSlot); err != nil { + return 0, err + } + + return endSlot, nil + } + + stream := &streamRange{ + start: iotago.SlotIndex(req.GetStartSlot()), + end: iotago.SlotIndex(req.GetEndSlot()), + } + + var err error + stream.lastSent, err = sendPreviousSlots(stream.start, stream.end) + if err != nil { + return err + } + + if stream.isBounded() && stream.lastSent >= stream.end { + // We are done sending, so close the stream + return nil + } + + catchUpFunc := func(start iotago.SlotIndex, end iotago.SlotIndex) error { + err := sendSlotsRange(start, end) + if err != nil { + err := ierrors.Errorf("sendSlotsRange error: %w", err) + Component.LogError(err.Error()) + + return err + } + + return nil + } + + sendFunc := func(_ iotago.SlotIndex, payload *inx.Commitment) error { + if err := srv.Send(payload); err != nil { + err := ierrors.Errorf("send error: %w", err) + Component.LogError(err.Error()) + + return err + } + + return nil + } + + var innerErr error + ctx, cancel := context.WithCancel(Component.Daemon().ContextStopped()) + + wp := workerpool.New("ListenToCommitments", workerpool.WithWorkerCount(workerCount)).Start() + + unhook := deps.Protocol.Events.Engine.Notarization.LatestCommitmentUpdated.Hook(func(commitment *model.Commitment) { + done, err := handleRangedSend1(commitment.Slot(), inxCommitment(commitment), stream, catchUpFunc, sendFunc) + switch { + case err != nil: + innerErr = err + cancel() + + case done: + cancel() + } + }).Unhook + + <-ctx.Done() + unhook() + + // We need to wait until all tasks are done, otherwise we might call + // "SendMsg" and "CloseSend" in parallel on the grpc stream, which is + // not safe according to the grpc docs. + wp.Shutdown() + wp.ShutdownComplete.Wait() + + return innerErr +} + func (s *Server) ForceCommitUntil(_ context.Context, slot *inx.SlotIndex) (*inx.NoParams, error) { err := deps.Protocol.MainEngineInstance().Notarization.ForceCommitUntil(slot.Unwrap()) if err != nil { diff --git a/go.mod b/go.mod index 1ef176edb..9c2010337 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231027195901-620bd7470e42 github.com/iotaledger/hive.go/stringify v0.0.0-20231027195901-620bd7470e42 github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231031135002-4c79ea5193f5 - github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231031134131-b6ad918dc1ac + github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231107164901-f1c55755b932 github.com/iotaledger/iota.go/v4 v4.0.0-20231107162630-dad6a05074e1 github.com/labstack/echo/v4 v4.11.2 github.com/labstack/gommon v0.4.0 diff --git a/go.sum b/go.sum index 4a8c61aaf..19b8ca02e 100644 --- a/go.sum +++ b/go.sum @@ -305,8 +305,8 @@ github.com/iotaledger/hive.go/stringify v0.0.0-20231027195901-620bd7470e42 h1:Ol github.com/iotaledger/hive.go/stringify v0.0.0-20231027195901-620bd7470e42/go.mod h1:FTo/UWzNYgnQ082GI9QVM9HFDERqf9rw9RivNpqrnTs= github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231031135002-4c79ea5193f5 h1:17JDzMKTMXKF3xys6gPURRddkZhg1LY+xwfhbr/sVqg= github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231031135002-4c79ea5193f5/go.mod h1:LsJvoBUVVnY7tkwwByCVtAwmp5bFXdyJNGU/+KVQJVM= -github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231031134131-b6ad918dc1ac h1:c7R33+TQGMYP6pvLUQQaqpdDFl+GZbhAcfGMI0285fo= -github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231031134131-b6ad918dc1ac/go.mod h1:qPuMUvCTaghsnYRDnRoRuztTyEKFlmi2S7gb44rH7WM= +github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231107164901-f1c55755b932 h1:aHM5rhrRbECb4R2l88QYJAR+x5CuXu+MidWGyoYY1Z4= +github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231107164901-f1c55755b932/go.mod h1:qPuMUvCTaghsnYRDnRoRuztTyEKFlmi2S7gb44rH7WM= github.com/iotaledger/iota.go/v4 v4.0.0-20231107162630-dad6a05074e1 h1:qtcFaBZnrUNMOsmgbVxF/VSxOO+U0NMsMH64PDjlklU= github.com/iotaledger/iota.go/v4 v4.0.0-20231107162630-dad6a05074e1/go.mod h1:jqbLYq4a/FwuiPBqFfkAwwxU8vs3+kReRq2/tyX5qRA= github.com/ipfs/boxo v0.13.1 h1:nQ5oQzcMZR3oL41REJDcTbrvDvuZh3J9ckc9+ILeRQI= diff --git a/tools/gendoc/go.mod b/tools/gendoc/go.mod index e9230e0aa..6cf163241 100644 --- a/tools/gendoc/go.mod +++ b/tools/gendoc/go.mod @@ -71,7 +71,7 @@ require ( github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231027195901-620bd7470e42 // indirect github.com/iotaledger/hive.go/stringify v0.0.0-20231027195901-620bd7470e42 // indirect github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231031135002-4c79ea5193f5 // indirect - github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231031134131-b6ad918dc1ac // indirect + github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231107164901-f1c55755b932 // indirect github.com/iotaledger/iota.go/v4 v4.0.0-20231107162630-dad6a05074e1 // indirect github.com/ipfs/boxo v0.13.1 // indirect github.com/ipfs/go-cid v0.4.1 // indirect diff --git a/tools/gendoc/go.sum b/tools/gendoc/go.sum index 7c946a914..cd7be5e9b 100644 --- a/tools/gendoc/go.sum +++ b/tools/gendoc/go.sum @@ -309,8 +309,8 @@ github.com/iotaledger/hive.go/stringify v0.0.0-20231027195901-620bd7470e42 h1:Ol github.com/iotaledger/hive.go/stringify v0.0.0-20231027195901-620bd7470e42/go.mod h1:FTo/UWzNYgnQ082GI9QVM9HFDERqf9rw9RivNpqrnTs= github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231031135002-4c79ea5193f5 h1:17JDzMKTMXKF3xys6gPURRddkZhg1LY+xwfhbr/sVqg= github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231031135002-4c79ea5193f5/go.mod h1:LsJvoBUVVnY7tkwwByCVtAwmp5bFXdyJNGU/+KVQJVM= -github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231031134131-b6ad918dc1ac h1:c7R33+TQGMYP6pvLUQQaqpdDFl+GZbhAcfGMI0285fo= -github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231031134131-b6ad918dc1ac/go.mod h1:qPuMUvCTaghsnYRDnRoRuztTyEKFlmi2S7gb44rH7WM= +github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231107164901-f1c55755b932 h1:aHM5rhrRbECb4R2l88QYJAR+x5CuXu+MidWGyoYY1Z4= +github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231107164901-f1c55755b932/go.mod h1:qPuMUvCTaghsnYRDnRoRuztTyEKFlmi2S7gb44rH7WM= github.com/iotaledger/iota.go/v4 v4.0.0-20231107162630-dad6a05074e1 h1:qtcFaBZnrUNMOsmgbVxF/VSxOO+U0NMsMH64PDjlklU= github.com/iotaledger/iota.go/v4 v4.0.0-20231107162630-dad6a05074e1/go.mod h1:jqbLYq4a/FwuiPBqFfkAwwxU8vs3+kReRq2/tyX5qRA= github.com/ipfs/boxo v0.13.1 h1:nQ5oQzcMZR3oL41REJDcTbrvDvuZh3J9ckc9+ILeRQI=