Skip to content

Commit

Permalink
Merge pull request #503 from iotaledger/feat/chronicle-support
Browse files Browse the repository at this point in the history
Add INX rpc requested by Chronicle
  • Loading branch information
alexsporn authored Nov 8, 2023
2 parents f6d3e9d + 674b7b0 commit bc7ea4c
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 0 deletions.
28 changes: 28 additions & 0 deletions components/inx/server_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/iotaledger/hive.go/runtime/workerpool"
inx "github.com/iotaledger/inx/go"
"github.com/iotaledger/iota-core/pkg/blockhandler"
"github.com/iotaledger/iota-core/pkg/model"
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"
iotago "github.com/iotaledger/iota.go/v4"
)
Expand Down Expand Up @@ -123,6 +124,33 @@ func (s *Server) ListenToConfirmedBlocks(_ *inx.NoParams, srv inx.INX_ListenToCo
return ctx.Err()
}

func (s *Server) ReadAcceptedBlocks(slot *inx.SlotIndex, srv inx.INX_ReadAcceptedBlocksServer) error {
blocksStore, err := deps.Protocol.MainEngineInstance().Storage.Blocks(slot.Unwrap())
if err != nil {
return status.Errorf(codes.InvalidArgument, "failed to get blocks: %s", err.Error())
}

if err := blocksStore.ForEachBlockInSlot(func(block *model.Block) error {
metadata, err := getINXBlockMetadata(block.ID())
if err != nil {
return err
}

payload := &inx.BlockWithMetadata{
Metadata: metadata,
Block: &inx.RawBlock{
Data: block.Data(),
},
}

return srv.Send(payload)
}); err != nil {
return status.Errorf(codes.Internal, "failed to iterate blocks: %s", err.Error())
}

return nil
}

func (s *Server) SubmitBlock(ctx context.Context, rawBlock *inx.RawBlock) (*inx.BlockId, error) {
block, err := rawBlock.UnwrapBlock(deps.Protocol)
if err != nil {
Expand Down
130 changes: 130 additions & 0 deletions components/inx/server_commitments.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/runtime/workerpool"
inx "github.com/iotaledger/inx/go"
"github.com/iotaledger/iota-core/pkg/model"
iotago "github.com/iotaledger/iota.go/v4"
Expand All @@ -22,6 +23,135 @@ func inxCommitment(commitment *model.Commitment) *inx.Commitment {
}
}

func (s *Server) ListenToCommitments(req *inx.SlotRangeRequest, srv inx.INX_ListenToCommitmentsServer) error {
createCommitmentPayloadForSlotAndSend := func(slot iotago.SlotIndex) error {
commitment, err := deps.Protocol.MainEngineInstance().Storage.Commitments().Load(slot)
if err != nil {
if ierrors.Is(err, kvstore.ErrKeyNotFound) {
return status.Errorf(codes.NotFound, "commitment slot %d not found", slot)
}

return err
}

if err := srv.Send(inxCommitment(commitment)); err != nil {
return ierrors.Errorf("send error: %w", err)
}

return nil
}

sendSlotsRange := func(startSlot iotago.SlotIndex, endSlot iotago.SlotIndex) error {
for currentSlot := startSlot; currentSlot <= endSlot; currentSlot++ {
if err := createCommitmentPayloadForSlotAndSend(currentSlot); err != nil {
return err
}
}

return nil
}

// if a startSlot is given, we send all available commitments including the start slot.
// if an endSlot is given, we send all available commitments up to and including min(latestCommitmentSlot, endSlot).
// if no startSlot is given, but an endSlot, we don't send previous commitments.
sendPreviousSlots := func(startSlot iotago.SlotIndex, endSlot iotago.SlotIndex) (iotago.SlotIndex, error) {
if startSlot == 0 {
// no need to send previous commitments
return 0, nil
}

latestCommitment := deps.Protocol.MainEngineInstance().SyncManager.LatestCommitment()

if startSlot > latestCommitment.Slot() {
// no need to send previous commitments
return 0, nil
}

// Stream all available commitments first
prunedEpoch, hasPruned := deps.Protocol.MainEngineInstance().SyncManager.LastPrunedEpoch()
if hasPruned && startSlot <= deps.Protocol.CommittedAPI().TimeProvider().EpochEnd(prunedEpoch) {
return 0, status.Errorf(codes.InvalidArgument, "given startSlot %d is older than the current pruningSlot %d", startSlot, deps.Protocol.CommittedAPI().TimeProvider().EpochEnd(prunedEpoch))
}

if endSlot == 0 || endSlot > latestCommitment.Slot() {
endSlot = latestCommitment.Slot()
}

if err := sendSlotsRange(startSlot, endSlot); err != nil {
return 0, err
}

return endSlot, nil
}

stream := &streamRange{
start: iotago.SlotIndex(req.GetStartSlot()),
end: iotago.SlotIndex(req.GetEndSlot()),
}

var err error
stream.lastSent, err = sendPreviousSlots(stream.start, stream.end)
if err != nil {
return err
}

if stream.isBounded() && stream.lastSent >= stream.end {
// We are done sending, so close the stream
return nil
}

catchUpFunc := func(start iotago.SlotIndex, end iotago.SlotIndex) error {
err := sendSlotsRange(start, end)
if err != nil {
err := ierrors.Errorf("sendSlotsRange error: %w", err)
Component.LogError(err.Error())

return err
}

return nil
}

sendFunc := func(_ iotago.SlotIndex, payload *inx.Commitment) error {
if err := srv.Send(payload); err != nil {
err := ierrors.Errorf("send error: %w", err)
Component.LogError(err.Error())

return err
}

return nil
}

var innerErr error
ctx, cancel := context.WithCancel(Component.Daemon().ContextStopped())

wp := workerpool.New("ListenToCommitments", workerpool.WithWorkerCount(workerCount)).Start()

unhook := deps.Protocol.Events.Engine.Notarization.LatestCommitmentUpdated.Hook(func(commitment *model.Commitment) {
done, err := handleRangedSend1(commitment.Slot(), inxCommitment(commitment), stream, catchUpFunc, sendFunc)
switch {
case err != nil:
innerErr = err
cancel()

case done:
cancel()
}
}).Unhook

<-ctx.Done()
unhook()

// We need to wait until all tasks are done, otherwise we might call
// "SendMsg" and "CloseSend" in parallel on the grpc stream, which is
// not safe according to the grpc docs.
wp.Shutdown()
wp.ShutdownComplete.Wait()

return innerErr
}

func (s *Server) ForceCommitUntil(_ context.Context, slot *inx.SlotIndex) (*inx.NoParams, error) {
err := deps.Protocol.MainEngineInstance().Notarization.ForceCommitUntil(slot.Unwrap())
if err != nil {
Expand Down

0 comments on commit bc7ea4c

Please sign in to comment.