Skip to content

Commit

Permalink
Send all warpsync blockIDs grouped by their SlotCommitmentID
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsporn committed Nov 7, 2023
1 parent 891cf84 commit 8b75433
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 116 deletions.
4 changes: 4 additions & 0 deletions pkg/model/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func (blk *Block) ID() iotago.BlockID {
return blk.blockID
}

func (blk *Block) SlotCommitmentID() iotago.CommitmentID {
return blk.block.Header.SlotCommitmentID
}

func (blk *Block) Data() []byte {
return blk.data
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/network/protocols/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Events struct {
AttestationsReceived *event.Event4[*model.Commitment, []*iotago.Attestation, *merklehasher.Proof[iotago.Identifier], peer.ID]
AttestationsRequestReceived *event.Event2[iotago.CommitmentID, peer.ID]
WarpSyncRequestReceived *event.Event2[iotago.CommitmentID, peer.ID]
WarpSyncResponseReceived *event.Event6[iotago.CommitmentID, iotago.BlockIDs, *merklehasher.Proof[iotago.Identifier], iotago.TransactionIDs, *merklehasher.Proof[iotago.Identifier], peer.ID]
WarpSyncResponseReceived *event.Event6[iotago.CommitmentID, map[iotago.CommitmentID]iotago.BlockIDs, *merklehasher.Proof[iotago.Identifier], iotago.TransactionIDs, *merklehasher.Proof[iotago.Identifier], peer.ID]
Error *event.Event2[error, peer.ID]

event.Group[Events, *Events]
Expand All @@ -33,7 +33,7 @@ var NewEvents = event.CreateGroupConstructor(func() (newEvents *Events) {
AttestationsReceived: event.New4[*model.Commitment, []*iotago.Attestation, *merklehasher.Proof[iotago.Identifier], peer.ID](),
AttestationsRequestReceived: event.New2[iotago.CommitmentID, peer.ID](),
WarpSyncRequestReceived: event.New2[iotago.CommitmentID, peer.ID](),
WarpSyncResponseReceived: event.New6[iotago.CommitmentID, iotago.BlockIDs, *merklehasher.Proof[iotago.Identifier], iotago.TransactionIDs, *merklehasher.Proof[iotago.Identifier], peer.ID](),
WarpSyncResponseReceived: event.New6[iotago.CommitmentID, map[iotago.CommitmentID]iotago.BlockIDs, *merklehasher.Proof[iotago.Identifier], iotago.TransactionIDs, *merklehasher.Proof[iotago.Identifier], peer.ID](),
Error: event.New2[error, peer.ID](),
}
})
67 changes: 17 additions & 50 deletions pkg/network/protocols/core/models/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pkg/network/protocols/core/models/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,5 @@ message WarpSyncRequest {

message WarpSyncResponse {
bytes commitment_id = 1;
bytes block_ids = 2;
bytes tangle_merkle_proof = 3;
bytes transaction_ids = 4;
bytes mutations_merkle_proof = 5;
bytes payload = 2;
}
2 changes: 1 addition & 1 deletion pkg/network/protocols/core/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (p *Protocol) handlePacket(nbr peer.ID, packet proto.Message) (err error) {
case *nwmodels.Packet_WarpSyncRequest:
p.handleWarpSyncRequest(packetBody.WarpSyncRequest.GetCommitmentId(), nbr)
case *nwmodels.Packet_WarpSyncResponse:
p.handleWarpSyncResponse(packetBody.WarpSyncResponse.GetCommitmentId(), packetBody.WarpSyncResponse.GetBlockIds(), packetBody.WarpSyncResponse.GetTangleMerkleProof(), packetBody.WarpSyncResponse.GetTransactionIds(), packetBody.WarpSyncResponse.GetMutationsMerkleProof(), nbr)
p.handleWarpSyncResponse(packetBody.WarpSyncResponse.GetCommitmentId(), packetBody.WarpSyncResponse.GetPayload(), nbr)
default:
return ierrors.Errorf("unsupported packet; packet=%+v, packetBody=%T-%+v", packet, packetBody, packetBody)
}
Expand Down
54 changes: 22 additions & 32 deletions pkg/network/protocols/core/warp_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import (
"github.com/iotaledger/iota.go/v4/merklehasher"
)

type WarpSyncPayload struct {
BlockIDsBySlotCommitmentID map[iotago.CommitmentID]iotago.BlockIDs `serix:"0,lengthPrefixType=uint32"`
TangleMerkleProof *merklehasher.Proof[iotago.Identifier] `serix:"1"`
TransactionIDs iotago.TransactionIDs `serix:"2"`
MutationsMerkleProof *merklehasher.Proof[iotago.Identifier] `serix:"3"`
}

func (p *Protocol) SendWarpSyncRequest(id iotago.CommitmentID, to ...peer.ID) {
p.network.Send(&nwmodels.Packet{Body: &nwmodels.Packet_WarpSyncRequest{
WarpSyncRequest: &nwmodels.WarpSyncRequest{
Expand All @@ -19,16 +26,20 @@ func (p *Protocol) SendWarpSyncRequest(id iotago.CommitmentID, to ...peer.ID) {
}}, to...)
}

func (p *Protocol) SendWarpSyncResponse(id iotago.CommitmentID, blockIDs iotago.BlockIDs, tangleMerkleProof *merklehasher.Proof[iotago.Identifier], transactionIDs iotago.TransactionIDs, mutationsMerkleProof *merklehasher.Proof[iotago.Identifier], to ...peer.ID) {
func (p *Protocol) SendWarpSyncResponse(id iotago.CommitmentID, blockIDsBySlotCommitmentID map[iotago.CommitmentID]iotago.BlockIDs, tangleMerkleProof *merklehasher.Proof[iotago.Identifier], transactionIDs iotago.TransactionIDs, mutationsMerkleProof *merklehasher.Proof[iotago.Identifier], to ...peer.ID) {
serializer := p.apiProvider.APIForSlot(id.Slot())

payload := &WarpSyncPayload{
BlockIDsBySlotCommitmentID: blockIDsBySlotCommitmentID,
TangleMerkleProof: tangleMerkleProof,
TransactionIDs: transactionIDs,
MutationsMerkleProof: mutationsMerkleProof,
}

p.network.Send(&nwmodels.Packet{Body: &nwmodels.Packet_WarpSyncResponse{
WarpSyncResponse: &nwmodels.WarpSyncResponse{
CommitmentId: lo.PanicOnErr(id.Bytes()),
BlockIds: lo.PanicOnErr(serializer.Encode(blockIDs)),
TangleMerkleProof: lo.PanicOnErr(tangleMerkleProof.Bytes()),
TransactionIds: lo.PanicOnErr(serializer.Encode(transactionIDs)),
MutationsMerkleProof: lo.PanicOnErr(mutationsMerkleProof.Bytes()),
CommitmentId: lo.PanicOnErr(id.Bytes()),
Payload: lo.PanicOnErr(serializer.Encode(payload)),
},
}}, to...)
}
Expand All @@ -46,7 +57,7 @@ func (p *Protocol) handleWarpSyncRequest(commitmentIDBytes []byte, id peer.ID) {
})
}

func (p *Protocol) handleWarpSyncResponse(commitmentIDBytes []byte, blockIDsBytes []byte, tangleMerkleProofBytes []byte, transactionIDsBytes []byte, mutationProofBytes []byte, id peer.ID) {
func (p *Protocol) handleWarpSyncResponse(commitmentIDBytes []byte, payloadBytes []byte, id peer.ID) {
p.workerPool.Submit(func() {
commitmentID, _, err := iotago.CommitmentIDFromBytes(commitmentIDBytes)
if err != nil {
Expand All @@ -55,34 +66,13 @@ func (p *Protocol) handleWarpSyncResponse(commitmentIDBytes []byte, blockIDsByte
return
}

var blockIDs iotago.BlockIDs
if _, err = p.apiProvider.APIForSlot(commitmentID.Slot()).Decode(blockIDsBytes, &blockIDs, serix.WithValidation()); err != nil {
p.Events.Error.Trigger(ierrors.Wrap(err, "failed to deserialize block ids"), id)

return
}

tangleMerkleProof, _, err := merklehasher.ProofFromBytes[iotago.Identifier](tangleMerkleProofBytes)
if err != nil {
p.Events.Error.Trigger(ierrors.Wrapf(err, "failed to deserialize merkle proof when receiving waprsync response for commitment %s", commitmentID), id)

return
}

var transactionIDs iotago.TransactionIDs
if _, err = p.apiProvider.APIForSlot(commitmentID.Slot()).Decode(transactionIDsBytes, &transactionIDs, serix.WithValidation()); err != nil {
p.Events.Error.Trigger(ierrors.Wrap(err, "failed to deserialize transaction ids"), id)

return
}

mutationProof, _, err := merklehasher.ProofFromBytes[iotago.Identifier](mutationProofBytes)
if err != nil {
p.Events.Error.Trigger(ierrors.Wrapf(err, "failed to deserialize merkle proof when receiving waprsync response for commitment %s", commitmentID), id)
payload := new(WarpSyncPayload)
if _, err = p.apiProvider.APIForSlot(commitmentID.Slot()).Decode(payloadBytes, payload, serix.WithValidation()); err != nil {
p.Events.Error.Trigger(ierrors.Wrap(err, "failed to deserialize payload"), id)

return
}

p.Events.WarpSyncResponseReceived.Trigger(commitmentID, blockIDs, tangleMerkleProof, transactionIDs, mutationProof, id)
p.Events.WarpSyncResponseReceived.Trigger(commitmentID, payload.BlockIDsBySlotCommitmentID, payload.TangleMerkleProof, payload.TransactionIDs, payload.MutationsMerkleProof, id)
})
}
40 changes: 24 additions & 16 deletions pkg/protocol/block_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ func (b *BlockDispatcher) initNetworkConnection() {
}, b.warpSyncWorkers)
})

b.protocol.Events.Network.WarpSyncResponseReceived.Hook(func(commitmentID iotago.CommitmentID, blockIDs iotago.BlockIDs, tangleMerkleProof *merklehasher.Proof[iotago.Identifier], transactionIDs iotago.TransactionIDs, mutationMerkleProof *merklehasher.Proof[iotago.Identifier], src peer.ID) {
b.protocol.Events.Network.WarpSyncResponseReceived.Hook(func(commitmentID iotago.CommitmentID, blockIDsBySlotCommitmentID map[iotago.CommitmentID]iotago.BlockIDs, tangleMerkleProof *merklehasher.Proof[iotago.Identifier], transactionIDs iotago.TransactionIDs, mutationMerkleProof *merklehasher.Proof[iotago.Identifier], src peer.ID) {
b.runTask(func() {
b.protocol.HandleError(b.processWarpSyncResponse(commitmentID, blockIDs, tangleMerkleProof, transactionIDs, mutationMerkleProof, src))
b.protocol.HandleError(b.processWarpSyncResponse(commitmentID, blockIDsBySlotCommitmentID, tangleMerkleProof, transactionIDs, mutationMerkleProof, src))
}, b.warpSyncWorkers)
})
}
Expand All @@ -177,7 +177,7 @@ func (b *BlockDispatcher) processWarpSyncRequest(commitmentID iotago.CommitmentI
return ierrors.Wrapf(err, "commitment ID mismatch: %s != %s", commitment.ID(), commitmentID)
}

blockIDs, err := committedSlot.BlockIDs()
blocksIDsByCommitmentID, err := committedSlot.BlocksIDsBySlotCommitmentID()
if err != nil {
return ierrors.Wrapf(err, "failed to get block IDs from slot %d", commitmentID.Slot())
}
Expand All @@ -192,13 +192,13 @@ func (b *BlockDispatcher) processWarpSyncRequest(commitmentID iotago.CommitmentI
return ierrors.Wrapf(err, "failed to get roots from slot %d", commitmentID.Slot())
}

b.protocol.networkProtocol.SendWarpSyncResponse(commitmentID, blockIDs, roots.TangleProof(), transactionIDs, roots.MutationProof(), src)
b.protocol.networkProtocol.SendWarpSyncResponse(commitmentID, blocksIDsByCommitmentID, roots.TangleProof(), transactionIDs, roots.MutationProof(), src)

return nil
}

// processWarpSyncResponse processes a WarpSync response.
func (b *BlockDispatcher) processWarpSyncResponse(commitmentID iotago.CommitmentID, blockIDs iotago.BlockIDs, tangleMerkleProof *merklehasher.Proof[iotago.Identifier], transactionIDs iotago.TransactionIDs, mutationMerkleProof *merklehasher.Proof[iotago.Identifier], _ peer.ID) error {
func (b *BlockDispatcher) processWarpSyncResponse(commitmentID iotago.CommitmentID, blockIDsBySlotCommitmentID map[iotago.CommitmentID]iotago.BlockIDs, tangleMerkleProof *merklehasher.Proof[iotago.Identifier], transactionIDs iotago.TransactionIDs, mutationMerkleProof *merklehasher.Proof[iotago.Identifier], _ peer.ID) error {
if b.processedWarpSyncRequests.Has(commitmentID) {
return nil
}
Expand All @@ -220,6 +220,12 @@ func (b *BlockDispatcher) processWarpSyncResponse(commitmentID iotago.Commitment
return nil
}

// Flatten all blockIDs into a single slice.
var blockIDs iotago.BlockIDs
for _, ids := range blockIDsBySlotCommitmentID {
blockIDs = append(blockIDs, ids...)
}

acceptedBlocks := ads.NewSet[iotago.Identifier, iotago.BlockID](mapdb.NewMapDB(), iotago.BlockID.Bytes, iotago.BlockIDFromBytes)
for _, blockID := range blockIDs {
_ = acceptedBlocks.Add(blockID) // a mapdb can newer return an error
Expand Down Expand Up @@ -326,19 +332,21 @@ func (b *BlockDispatcher) processWarpSyncResponse(commitmentID iotago.Commitment
return nil
}

for _, blockID := range blockIDs {
block, _ := targetEngine.BlockDAG.GetOrRequestBlock(blockID)
if block == nil { // this should never happen as we're requesting the blocks for this slot so it can't be evicted.
b.protocol.HandleError(ierrors.Errorf("failed to get block %s", blockID))
continue
}
for slotCommitmentID, blockIDsForCommitment := range blockIDsBySlotCommitmentID {
for _, blockID := range blockIDsForCommitment {
block, _ := targetEngine.BlockDAG.GetOrRequestBlock(blockID)
if block == nil { // this should never happen as we're requesting the blocks for this slot so it can't be evicted.
b.protocol.HandleError(ierrors.Errorf("failed to get block %s", blockID))
continue
}

// We need to make sure that we add all blocks as root blocks because we don't know which blocks are root blocks without
// blocks from future slots. We're committing the current slot which then leads to the eviction of the blocks from the
// block cache and thus if not root blocks no block in the next slot can become solid.
targetEngine.EvictionState.AddRootBlock(block.ID(), block.SlotCommitmentID())
// We need to make sure that we add all blocks as root blocks because we don't know which blocks are root blocks without
// blocks from future slots. We're committing the current slot which then leads to the eviction of the blocks from the
// block cache and thus if not root blocks no block in the next slot can become solid.
targetEngine.EvictionState.AddRootBlock(blockID, slotCommitmentID)

block.Booked().OnUpdate(blockBookedFunc)
block.Booked().OnUpdate(blockBookedFunc)
}
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/protocol/engine/blocks/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (b *Block) SlotCommitmentID() iotago.CommitmentID {
return b.rootBlock.commitmentID
}

return b.modelBlock.ProtocolBlock().Header.SlotCommitmentID
return b.modelBlock.SlotCommitmentID()
}

// IsMissing returns a flag that indicates if the underlying Block data hasn't been stored, yet.
Expand Down
11 changes: 6 additions & 5 deletions pkg/protocol/engine/committed_slot_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,25 +52,26 @@ func (c *CommittedSlotAPI) Roots() (committedRoots *iotago.Roots, err error) {
return roots, nil
}

// BlockIDs returns the accepted block IDs of the slot.
func (c *CommittedSlotAPI) BlockIDs() (blockIDs iotago.BlockIDs, err error) {
// BlocksIDsBySlotCommitmentID returns the accepted block IDs of the slot grouped by their SlotCommitmentID.
func (c *CommittedSlotAPI) BlocksIDsBySlotCommitmentID() (map[iotago.CommitmentID]iotago.BlockIDs, error) {
if c.engine.Storage.Settings().LatestCommitment().Slot() < c.CommitmentID.Slot() {
return blockIDs, ierrors.Errorf("slot %d is not committed yet", c.CommitmentID)
return nil, ierrors.Errorf("slot %d is not committed yet", c.CommitmentID)
}

store, err := c.engine.Storage.Blocks(c.CommitmentID.Slot())
if err != nil {
return nil, ierrors.Errorf("failed to get block store of slot index %d", c.CommitmentID.Slot())
}

blockIDsBySlotCommitmentID := make(map[iotago.CommitmentID]iotago.BlockIDs)
if err := store.ForEachBlockInSlot(func(block *model.Block) error {
blockIDs = append(blockIDs, block.ID())
blockIDsBySlotCommitmentID[block.SlotCommitmentID()] = append(blockIDsBySlotCommitmentID[block.SlotCommitmentID()], block.ID())
return nil
}); err != nil {
return nil, ierrors.Wrapf(err, "failed to iterate over blocks of slot %d", c.CommitmentID.Slot())
}

return blockIDs, nil
return blockIDsBySlotCommitmentID, nil
}

func (c *CommittedSlotAPI) TransactionIDs() (iotago.TransactionIDs, error) {
Expand Down
Loading

0 comments on commit 8b75433

Please sign in to comment.