Skip to content
This repository has been archived by the owner on Jan 24, 2025. It is now read-only.

Commit

Permalink
Merge branch 'develop' into fix/conflicting-tx-reason
Browse files Browse the repository at this point in the history
  • Loading branch information
jkrvivian committed Nov 8, 2023
2 parents 9280eb4 + bc7ea4c commit e01553a
Show file tree
Hide file tree
Showing 51 changed files with 478 additions and 271 deletions.
28 changes: 28 additions & 0 deletions components/inx/server_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/iotaledger/hive.go/runtime/workerpool"
inx "github.com/iotaledger/inx/go"
"github.com/iotaledger/iota-core/pkg/blockhandler"
"github.com/iotaledger/iota-core/pkg/model"
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"
iotago "github.com/iotaledger/iota.go/v4"
)
Expand Down Expand Up @@ -123,6 +124,33 @@ func (s *Server) ListenToConfirmedBlocks(_ *inx.NoParams, srv inx.INX_ListenToCo
return ctx.Err()
}

func (s *Server) ReadAcceptedBlocks(slot *inx.SlotIndex, srv inx.INX_ReadAcceptedBlocksServer) error {
blocksStore, err := deps.Protocol.MainEngineInstance().Storage.Blocks(slot.Unwrap())
if err != nil {
return status.Errorf(codes.InvalidArgument, "failed to get blocks: %s", err.Error())
}

if err := blocksStore.ForEachBlockInSlot(func(block *model.Block) error {
metadata, err := getINXBlockMetadata(block.ID())
if err != nil {
return err
}

payload := &inx.BlockWithMetadata{
Metadata: metadata,
Block: &inx.RawBlock{
Data: block.Data(),
},
}

return srv.Send(payload)
}); err != nil {
return status.Errorf(codes.Internal, "failed to iterate blocks: %s", err.Error())
}

return nil
}

func (s *Server) SubmitBlock(ctx context.Context, rawBlock *inx.RawBlock) (*inx.BlockId, error) {
block, err := rawBlock.UnwrapBlock(deps.Protocol)
if err != nil {
Expand Down
130 changes: 130 additions & 0 deletions components/inx/server_commitments.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/runtime/workerpool"
inx "github.com/iotaledger/inx/go"
"github.com/iotaledger/iota-core/pkg/model"
iotago "github.com/iotaledger/iota.go/v4"
Expand All @@ -22,6 +23,135 @@ func inxCommitment(commitment *model.Commitment) *inx.Commitment {
}
}

func (s *Server) ListenToCommitments(req *inx.SlotRangeRequest, srv inx.INX_ListenToCommitmentsServer) error {
createCommitmentPayloadForSlotAndSend := func(slot iotago.SlotIndex) error {
commitment, err := deps.Protocol.MainEngineInstance().Storage.Commitments().Load(slot)
if err != nil {
if ierrors.Is(err, kvstore.ErrKeyNotFound) {
return status.Errorf(codes.NotFound, "commitment slot %d not found", slot)
}

return err
}

if err := srv.Send(inxCommitment(commitment)); err != nil {
return ierrors.Errorf("send error: %w", err)
}

return nil
}

sendSlotsRange := func(startSlot iotago.SlotIndex, endSlot iotago.SlotIndex) error {
for currentSlot := startSlot; currentSlot <= endSlot; currentSlot++ {
if err := createCommitmentPayloadForSlotAndSend(currentSlot); err != nil {
return err
}
}

return nil
}

// if a startSlot is given, we send all available commitments including the start slot.
// if an endSlot is given, we send all available commitments up to and including min(latestCommitmentSlot, endSlot).
// if no startSlot is given, but an endSlot, we don't send previous commitments.
sendPreviousSlots := func(startSlot iotago.SlotIndex, endSlot iotago.SlotIndex) (iotago.SlotIndex, error) {
if startSlot == 0 {
// no need to send previous commitments
return 0, nil
}

latestCommitment := deps.Protocol.MainEngineInstance().SyncManager.LatestCommitment()

if startSlot > latestCommitment.Slot() {
// no need to send previous commitments
return 0, nil
}

// Stream all available commitments first
prunedEpoch, hasPruned := deps.Protocol.MainEngineInstance().SyncManager.LastPrunedEpoch()
if hasPruned && startSlot <= deps.Protocol.CommittedAPI().TimeProvider().EpochEnd(prunedEpoch) {
return 0, status.Errorf(codes.InvalidArgument, "given startSlot %d is older than the current pruningSlot %d", startSlot, deps.Protocol.CommittedAPI().TimeProvider().EpochEnd(prunedEpoch))
}

if endSlot == 0 || endSlot > latestCommitment.Slot() {
endSlot = latestCommitment.Slot()
}

if err := sendSlotsRange(startSlot, endSlot); err != nil {
return 0, err
}

return endSlot, nil
}

stream := &streamRange{
start: iotago.SlotIndex(req.GetStartSlot()),
end: iotago.SlotIndex(req.GetEndSlot()),
}

var err error
stream.lastSent, err = sendPreviousSlots(stream.start, stream.end)
if err != nil {
return err
}

if stream.isBounded() && stream.lastSent >= stream.end {
// We are done sending, so close the stream
return nil
}

catchUpFunc := func(start iotago.SlotIndex, end iotago.SlotIndex) error {
err := sendSlotsRange(start, end)
if err != nil {
err := ierrors.Errorf("sendSlotsRange error: %w", err)
Component.LogError(err.Error())

return err
}

return nil
}

sendFunc := func(_ iotago.SlotIndex, payload *inx.Commitment) error {
if err := srv.Send(payload); err != nil {
err := ierrors.Errorf("send error: %w", err)
Component.LogError(err.Error())

return err
}

return nil
}

var innerErr error
ctx, cancel := context.WithCancel(Component.Daemon().ContextStopped())

wp := workerpool.New("ListenToCommitments", workerpool.WithWorkerCount(workerCount)).Start()

unhook := deps.Protocol.Events.Engine.Notarization.LatestCommitmentUpdated.Hook(func(commitment *model.Commitment) {
done, err := handleRangedSend1(commitment.Slot(), inxCommitment(commitment), stream, catchUpFunc, sendFunc)
switch {
case err != nil:
innerErr = err
cancel()

case done:
cancel()
}
}).Unhook

<-ctx.Done()
unhook()

// We need to wait until all tasks are done, otherwise we might call
// "SendMsg" and "CloseSend" in parallel on the grpc stream, which is
// not safe according to the grpc docs.
wp.Shutdown()
wp.ShutdownComplete.Wait()

return innerErr
}

func (s *Server) ForceCommitUntil(_ context.Context, slot *inx.SlotIndex) (*inx.NoParams, error) {
err := deps.Protocol.MainEngineInstance().Notarization.ForceCommitUntil(slot.Unwrap())
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ require (
github.com/iotaledger/hive.go/runtime v0.0.0-20231108050255-98e0fa35e936
github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231108050255-98e0fa35e936
github.com/iotaledger/hive.go/stringify v0.0.0-20231108050255-98e0fa35e936
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231031135002-4c79ea5193f5
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231031134131-b6ad918dc1ac
github.com/iotaledger/iota.go/v4 v4.0.0-20231108050608-afce96cfe8a6
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231108104504-1445f545de82
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231108104322-f301c3573998
github.com/iotaledger/iota.go/v4 v4.0.0-20231108103955-bf75d703d8aa
github.com/labstack/echo/v4 v4.11.2
github.com/labstack/gommon v0.4.0
github.com/libp2p/go-libp2p v0.32.0
Expand All @@ -42,6 +42,7 @@ require (
go.uber.org/atomic v1.11.0
go.uber.org/dig v1.17.1
golang.org/x/crypto v0.14.0
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
)
Expand Down Expand Up @@ -168,7 +169,6 @@ require (
go.uber.org/mock v0.3.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/image v0.13.0 // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.17.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,12 @@ github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231108050255-98e0fa3
github.com/iotaledger/hive.go/serializer/v2 v2.0.0-rc.1.0.20231108050255-98e0fa35e936/go.mod h1:FoH3T6yKlZJp8xm8K+zsQiibSynp32v21CpWx8xkek8=
github.com/iotaledger/hive.go/stringify v0.0.0-20231108050255-98e0fa35e936 h1:Y4HgL5gm9S27usg5M2t6wi1BSdCxVorM62lwnpKuMd4=
github.com/iotaledger/hive.go/stringify v0.0.0-20231108050255-98e0fa35e936/go.mod h1:FTo/UWzNYgnQ082GI9QVM9HFDERqf9rw9RivNpqrnTs=
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231031135002-4c79ea5193f5 h1:17JDzMKTMXKF3xys6gPURRddkZhg1LY+xwfhbr/sVqg=
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231031135002-4c79ea5193f5/go.mod h1:LsJvoBUVVnY7tkwwByCVtAwmp5bFXdyJNGU/+KVQJVM=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231031134131-b6ad918dc1ac h1:c7R33+TQGMYP6pvLUQQaqpdDFl+GZbhAcfGMI0285fo=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231031134131-b6ad918dc1ac/go.mod h1:qPuMUvCTaghsnYRDnRoRuztTyEKFlmi2S7gb44rH7WM=
github.com/iotaledger/iota.go/v4 v4.0.0-20231108050608-afce96cfe8a6 h1:4kvG+BB4GOBsNYPY/enPo3xeC65A133L9cD73Kf1p9Q=
github.com/iotaledger/iota.go/v4 v4.0.0-20231108050608-afce96cfe8a6/go.mod h1:8iDORW4/e4NztyAGqjW07uSMjbhs7snbxw+81IWOczY=
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231108104504-1445f545de82 h1:FdM1lxUKgENO3oOlF5blVqmjER44mLIHGpavyUOY5JI=
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20231108104504-1445f545de82/go.mod h1:HVxkGPraMDTRudfG9AFN7Ga9gijp6skXB9TKNBc4KgI=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231108104322-f301c3573998 h1:KkC0SaWrjSMg897r2DDosJYALFfLadFST3Fvoaxg7hw=
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20231108104322-f301c3573998/go.mod h1:c+lBG3vgt2rgXHeOncK8hMllMwihTAtVbu790NslW2w=
github.com/iotaledger/iota.go/v4 v4.0.0-20231108103955-bf75d703d8aa h1:A2nadmSbmn62f6wtrqvv/TCCF2sDiiwyDnl6brbRo1E=
github.com/iotaledger/iota.go/v4 v4.0.0-20231108103955-bf75d703d8aa/go.mod h1:8iDORW4/e4NztyAGqjW07uSMjbhs7snbxw+81IWOczY=
github.com/ipfs/boxo v0.13.1 h1:nQ5oQzcMZR3oL41REJDcTbrvDvuZh3J9ckc9+ILeRQI=
github.com/ipfs/boxo v0.13.1/go.mod h1:btrtHy0lmO1ODMECbbEY1pxNtrLilvKSYLoGQt1yYCk=
github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s=
Expand Down
4 changes: 4 additions & 0 deletions pkg/model/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func (blk *Block) ID() iotago.BlockID {
return blk.blockID
}

func (blk *Block) SlotCommitmentID() iotago.CommitmentID {
return blk.block.Header.SlotCommitmentID
}

func (blk *Block) Data() []byte {
return blk.data
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/model/commitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Commitment struct {
}

func NewEmptyCommitment(api iotago.API) *Commitment {
emptyCommitment := iotago.NewEmptyCommitment(api.ProtocolParameters().Version())
emptyCommitment := iotago.NewEmptyCommitment(api)
emptyCommitment.ReferenceManaCost = api.ProtocolParameters().CongestionControlParameters().MinReferenceManaCost

return lo.PanicOnErr(CommitmentFromCommitment(emptyCommitment, api))
Expand Down
4 changes: 2 additions & 2 deletions pkg/network/protocols/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Events struct {
AttestationsReceived *event.Event4[*model.Commitment, []*iotago.Attestation, *merklehasher.Proof[iotago.Identifier], peer.ID]
AttestationsRequestReceived *event.Event2[iotago.CommitmentID, peer.ID]
WarpSyncRequestReceived *event.Event2[iotago.CommitmentID, peer.ID]
WarpSyncResponseReceived *event.Event6[iotago.CommitmentID, iotago.BlockIDs, *merklehasher.Proof[iotago.Identifier], iotago.TransactionIDs, *merklehasher.Proof[iotago.Identifier], peer.ID]
WarpSyncResponseReceived *event.Event6[iotago.CommitmentID, map[iotago.CommitmentID]iotago.BlockIDs, *merklehasher.Proof[iotago.Identifier], iotago.TransactionIDs, *merklehasher.Proof[iotago.Identifier], peer.ID]
Error *event.Event2[error, peer.ID]

event.Group[Events, *Events]
Expand All @@ -33,7 +33,7 @@ var NewEvents = event.CreateGroupConstructor(func() (newEvents *Events) {
AttestationsReceived: event.New4[*model.Commitment, []*iotago.Attestation, *merklehasher.Proof[iotago.Identifier], peer.ID](),
AttestationsRequestReceived: event.New2[iotago.CommitmentID, peer.ID](),
WarpSyncRequestReceived: event.New2[iotago.CommitmentID, peer.ID](),
WarpSyncResponseReceived: event.New6[iotago.CommitmentID, iotago.BlockIDs, *merklehasher.Proof[iotago.Identifier], iotago.TransactionIDs, *merklehasher.Proof[iotago.Identifier], peer.ID](),
WarpSyncResponseReceived: event.New6[iotago.CommitmentID, map[iotago.CommitmentID]iotago.BlockIDs, *merklehasher.Proof[iotago.Identifier], iotago.TransactionIDs, *merklehasher.Proof[iotago.Identifier], peer.ID](),
Error: event.New2[error, peer.ID](),
}
})
67 changes: 17 additions & 50 deletions pkg/network/protocols/core/models/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/network/protocols/core/models/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,5 @@ message WarpSyncRequest {

message WarpSyncResponse {
bytes commitment_id = 1;
bytes block_ids = 2;
bytes tangle_merkle_proof = 3;
bytes transaction_ids = 4;
bytes mutations_merkle_proof = 5;
bytes payload = 2;
}
2 changes: 1 addition & 1 deletion pkg/network/protocols/core/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (p *Protocol) handlePacket(nbr peer.ID, packet proto.Message) (err error) {
case *nwmodels.Packet_WarpSyncRequest:
p.handleWarpSyncRequest(packetBody.WarpSyncRequest.GetCommitmentId(), nbr)
case *nwmodels.Packet_WarpSyncResponse:
p.handleWarpSyncResponse(packetBody.WarpSyncResponse.GetCommitmentId(), packetBody.WarpSyncResponse.GetBlockIds(), packetBody.WarpSyncResponse.GetTangleMerkleProof(), packetBody.WarpSyncResponse.GetTransactionIds(), packetBody.WarpSyncResponse.GetMutationsMerkleProof(), nbr)
p.handleWarpSyncResponse(packetBody.WarpSyncResponse.GetCommitmentId(), packetBody.WarpSyncResponse.GetPayload(), nbr)
default:
return ierrors.Errorf("unsupported packet; packet=%+v, packetBody=%T-%+v", packet, packetBody, packetBody)
}
Expand Down
Loading

0 comments on commit e01553a

Please sign in to comment.