From 8b75433284dbfc6549c6f6e741b1b25922ee4376 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Tue, 7 Nov 2023 17:35:26 +0100 Subject: [PATCH] Send all warpsync blockIDs grouped by their SlotCommitmentID --- pkg/model/block.go | 4 ++ pkg/network/protocols/core/events.go | 4 +- .../protocols/core/models/message.pb.go | 67 +++++-------------- .../protocols/core/models/message.proto | 5 +- pkg/network/protocols/core/protocol.go | 2 +- pkg/network/protocols/core/warp_sync.go | 54 ++++++--------- pkg/protocol/block_dispatcher.go | 40 ++++++----- pkg/protocol/engine/blocks/block.go | 2 +- pkg/protocol/engine/committed_slot_api.go | 11 +-- pkg/testsuite/mock/node.go | 2 +- pkg/testsuite/storage_commitments.go | 16 +++-- 11 files changed, 91 insertions(+), 116 deletions(-) diff --git a/pkg/model/block.go b/pkg/model/block.go index a41c491c0..3ada4793f 100644 --- a/pkg/model/block.go +++ b/pkg/model/block.go @@ -77,6 +77,10 @@ func (blk *Block) ID() iotago.BlockID { return blk.blockID } +func (blk *Block) SlotCommitmentID() iotago.CommitmentID { + return blk.block.Header.SlotCommitmentID +} + func (blk *Block) Data() []byte { return blk.data } diff --git a/pkg/network/protocols/core/events.go b/pkg/network/protocols/core/events.go index 8577388f6..4aa157fc1 100644 --- a/pkg/network/protocols/core/events.go +++ b/pkg/network/protocols/core/events.go @@ -17,7 +17,7 @@ type Events struct { AttestationsReceived *event.Event4[*model.Commitment, []*iotago.Attestation, *merklehasher.Proof[iotago.Identifier], peer.ID] AttestationsRequestReceived *event.Event2[iotago.CommitmentID, peer.ID] WarpSyncRequestReceived *event.Event2[iotago.CommitmentID, peer.ID] - WarpSyncResponseReceived *event.Event6[iotago.CommitmentID, iotago.BlockIDs, *merklehasher.Proof[iotago.Identifier], iotago.TransactionIDs, *merklehasher.Proof[iotago.Identifier], peer.ID] + WarpSyncResponseReceived *event.Event6[iotago.CommitmentID, map[iotago.CommitmentID]iotago.BlockIDs, *merklehasher.Proof[iotago.Identifier], iotago.TransactionIDs, *merklehasher.Proof[iotago.Identifier], peer.ID] Error *event.Event2[error, peer.ID] event.Group[Events, *Events] @@ -33,7 +33,7 @@ var NewEvents = event.CreateGroupConstructor(func() (newEvents *Events) { AttestationsReceived: event.New4[*model.Commitment, []*iotago.Attestation, *merklehasher.Proof[iotago.Identifier], peer.ID](), AttestationsRequestReceived: event.New2[iotago.CommitmentID, peer.ID](), WarpSyncRequestReceived: event.New2[iotago.CommitmentID, peer.ID](), - WarpSyncResponseReceived: event.New6[iotago.CommitmentID, iotago.BlockIDs, *merklehasher.Proof[iotago.Identifier], iotago.TransactionIDs, *merklehasher.Proof[iotago.Identifier], peer.ID](), + WarpSyncResponseReceived: event.New6[iotago.CommitmentID, map[iotago.CommitmentID]iotago.BlockIDs, *merklehasher.Proof[iotago.Identifier], iotago.TransactionIDs, *merklehasher.Proof[iotago.Identifier], peer.ID](), Error: event.New2[error, peer.ID](), } }) diff --git a/pkg/network/protocols/core/models/message.pb.go b/pkg/network/protocols/core/models/message.pb.go index 66dbe6a16..11faf96f1 100644 --- a/pkg/network/protocols/core/models/message.pb.go +++ b/pkg/network/protocols/core/models/message.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.4 // source: pkg/network/protocols/core/models/message.proto package models @@ -535,11 +535,8 @@ type WarpSyncResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - CommitmentId []byte `protobuf:"bytes,1,opt,name=commitment_id,json=commitmentId,proto3" json:"commitment_id,omitempty"` - BlockIds []byte `protobuf:"bytes,2,opt,name=block_ids,json=blockIds,proto3" json:"block_ids,omitempty"` - TangleMerkleProof []byte `protobuf:"bytes,3,opt,name=tangle_merkle_proof,json=tangleMerkleProof,proto3" json:"tangle_merkle_proof,omitempty"` - TransactionIds []byte `protobuf:"bytes,4,opt,name=transaction_ids,json=transactionIds,proto3" json:"transaction_ids,omitempty"` - MutationsMerkleProof []byte `protobuf:"bytes,5,opt,name=mutations_merkle_proof,json=mutationsMerkleProof,proto3" json:"mutations_merkle_proof,omitempty"` + CommitmentId []byte `protobuf:"bytes,1,opt,name=commitment_id,json=commitmentId,proto3" json:"commitment_id,omitempty"` + Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` } func (x *WarpSyncResponse) Reset() { @@ -581,30 +578,9 @@ func (x *WarpSyncResponse) GetCommitmentId() []byte { return nil } -func (x *WarpSyncResponse) GetBlockIds() []byte { - if x != nil { - return x.BlockIds - } - return nil -} - -func (x *WarpSyncResponse) GetTangleMerkleProof() []byte { - if x != nil { - return x.TangleMerkleProof - } - return nil -} - -func (x *WarpSyncResponse) GetTransactionIds() []byte { - if x != nil { - return x.TransactionIds - } - return nil -} - -func (x *WarpSyncResponse) GetMutationsMerkleProof() []byte { +func (x *WarpSyncResponse) GetPayload() []byte { if x != nil { - return x.MutationsMerkleProof + return x.Payload } return nil } @@ -676,26 +652,17 @@ var file_pkg_network_protocols_core_models_message_proto_rawDesc = []byte{ 0x0a, 0x0f, 0x57, 0x61, 0x72, 0x70, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x23, 0x0a, 0x0d, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, - 0x6d, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x22, 0xe3, 0x01, 0x0a, 0x10, 0x57, 0x61, 0x72, 0x70, 0x53, - 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x63, - 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x0c, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x49, 0x64, - 0x12, 0x1b, 0x0a, 0x09, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x08, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x49, 0x64, 0x73, 0x12, 0x2e, 0x0a, - 0x13, 0x74, 0x61, 0x6e, 0x67, 0x6c, 0x65, 0x5f, 0x6d, 0x65, 0x72, 0x6b, 0x6c, 0x65, 0x5f, 0x70, - 0x72, 0x6f, 0x6f, 0x66, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x11, 0x74, 0x61, 0x6e, 0x67, - 0x6c, 0x65, 0x4d, 0x65, 0x72, 0x6b, 0x6c, 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x12, 0x27, 0x0a, - 0x0f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x73, - 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0e, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, - 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x73, 0x12, 0x34, 0x0a, 0x16, 0x6d, 0x75, 0x74, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x73, 0x5f, 0x6d, 0x65, 0x72, 0x6b, 0x6c, 0x65, 0x5f, 0x70, 0x72, 0x6f, 0x6f, 0x66, - 0x18, 0x05, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x14, 0x6d, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x4d, 0x65, 0x72, 0x6b, 0x6c, 0x65, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x42, 0x43, 0x5a, 0x41, - 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x69, 0x6f, 0x74, 0x61, 0x6c, - 0x65, 0x64, 0x67, 0x65, 0x72, 0x2f, 0x69, 0x6f, 0x74, 0x61, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, - 0x70, 0x6b, 0x67, 0x2f, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x2f, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x63, 0x6f, 0x6c, 0x73, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, - 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6d, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x22, 0x51, 0x0a, 0x10, 0x57, 0x61, 0x72, 0x70, 0x53, 0x79, + 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x63, 0x6f, + 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x0c, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x6d, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, + 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x42, 0x43, 0x5a, 0x41, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x69, 0x6f, 0x74, 0x61, 0x6c, 0x65, 0x64, 0x67, + 0x65, 0x72, 0x2f, 0x69, 0x6f, 0x74, 0x61, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x70, 0x6b, 0x67, + 0x2f, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, + 0x6c, 0x73, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x73, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/network/protocols/core/models/message.proto b/pkg/network/protocols/core/models/message.proto index 11cfa0860..88b588189 100644 --- a/pkg/network/protocols/core/models/message.proto +++ b/pkg/network/protocols/core/models/message.proto @@ -49,8 +49,5 @@ message WarpSyncRequest { message WarpSyncResponse { bytes commitment_id = 1; - bytes block_ids = 2; - bytes tangle_merkle_proof = 3; - bytes transaction_ids = 4; - bytes mutations_merkle_proof = 5; + bytes payload = 2; } diff --git a/pkg/network/protocols/core/protocol.go b/pkg/network/protocols/core/protocol.go index 0ced4c663..08aa50890 100644 --- a/pkg/network/protocols/core/protocol.go +++ b/pkg/network/protocols/core/protocol.go @@ -127,7 +127,7 @@ func (p *Protocol) handlePacket(nbr peer.ID, packet proto.Message) (err error) { case *nwmodels.Packet_WarpSyncRequest: p.handleWarpSyncRequest(packetBody.WarpSyncRequest.GetCommitmentId(), nbr) case *nwmodels.Packet_WarpSyncResponse: - p.handleWarpSyncResponse(packetBody.WarpSyncResponse.GetCommitmentId(), packetBody.WarpSyncResponse.GetBlockIds(), packetBody.WarpSyncResponse.GetTangleMerkleProof(), packetBody.WarpSyncResponse.GetTransactionIds(), packetBody.WarpSyncResponse.GetMutationsMerkleProof(), nbr) + p.handleWarpSyncResponse(packetBody.WarpSyncResponse.GetCommitmentId(), packetBody.WarpSyncResponse.GetPayload(), nbr) default: return ierrors.Errorf("unsupported packet; packet=%+v, packetBody=%T-%+v", packet, packetBody, packetBody) } diff --git a/pkg/network/protocols/core/warp_sync.go b/pkg/network/protocols/core/warp_sync.go index ae9206edc..ace474241 100644 --- a/pkg/network/protocols/core/warp_sync.go +++ b/pkg/network/protocols/core/warp_sync.go @@ -11,6 +11,13 @@ import ( "github.com/iotaledger/iota.go/v4/merklehasher" ) +type WarpSyncPayload struct { + BlockIDsBySlotCommitmentID map[iotago.CommitmentID]iotago.BlockIDs `serix:"0,lengthPrefixType=uint32"` + TangleMerkleProof *merklehasher.Proof[iotago.Identifier] `serix:"1"` + TransactionIDs iotago.TransactionIDs `serix:"2"` + MutationsMerkleProof *merklehasher.Proof[iotago.Identifier] `serix:"3"` +} + func (p *Protocol) SendWarpSyncRequest(id iotago.CommitmentID, to ...peer.ID) { p.network.Send(&nwmodels.Packet{Body: &nwmodels.Packet_WarpSyncRequest{ WarpSyncRequest: &nwmodels.WarpSyncRequest{ @@ -19,16 +26,20 @@ func (p *Protocol) SendWarpSyncRequest(id iotago.CommitmentID, to ...peer.ID) { }}, to...) } -func (p *Protocol) SendWarpSyncResponse(id iotago.CommitmentID, blockIDs iotago.BlockIDs, tangleMerkleProof *merklehasher.Proof[iotago.Identifier], transactionIDs iotago.TransactionIDs, mutationsMerkleProof *merklehasher.Proof[iotago.Identifier], to ...peer.ID) { +func (p *Protocol) SendWarpSyncResponse(id iotago.CommitmentID, blockIDsBySlotCommitmentID map[iotago.CommitmentID]iotago.BlockIDs, tangleMerkleProof *merklehasher.Proof[iotago.Identifier], transactionIDs iotago.TransactionIDs, mutationsMerkleProof *merklehasher.Proof[iotago.Identifier], to ...peer.ID) { serializer := p.apiProvider.APIForSlot(id.Slot()) + payload := &WarpSyncPayload{ + BlockIDsBySlotCommitmentID: blockIDsBySlotCommitmentID, + TangleMerkleProof: tangleMerkleProof, + TransactionIDs: transactionIDs, + MutationsMerkleProof: mutationsMerkleProof, + } + p.network.Send(&nwmodels.Packet{Body: &nwmodels.Packet_WarpSyncResponse{ WarpSyncResponse: &nwmodels.WarpSyncResponse{ - CommitmentId: lo.PanicOnErr(id.Bytes()), - BlockIds: lo.PanicOnErr(serializer.Encode(blockIDs)), - TangleMerkleProof: lo.PanicOnErr(tangleMerkleProof.Bytes()), - TransactionIds: lo.PanicOnErr(serializer.Encode(transactionIDs)), - MutationsMerkleProof: lo.PanicOnErr(mutationsMerkleProof.Bytes()), + CommitmentId: lo.PanicOnErr(id.Bytes()), + Payload: lo.PanicOnErr(serializer.Encode(payload)), }, }}, to...) } @@ -46,7 +57,7 @@ func (p *Protocol) handleWarpSyncRequest(commitmentIDBytes []byte, id peer.ID) { }) } -func (p *Protocol) handleWarpSyncResponse(commitmentIDBytes []byte, blockIDsBytes []byte, tangleMerkleProofBytes []byte, transactionIDsBytes []byte, mutationProofBytes []byte, id peer.ID) { +func (p *Protocol) handleWarpSyncResponse(commitmentIDBytes []byte, payloadBytes []byte, id peer.ID) { p.workerPool.Submit(func() { commitmentID, _, err := iotago.CommitmentIDFromBytes(commitmentIDBytes) if err != nil { @@ -55,34 +66,13 @@ func (p *Protocol) handleWarpSyncResponse(commitmentIDBytes []byte, blockIDsByte return } - var blockIDs iotago.BlockIDs - if _, err = p.apiProvider.APIForSlot(commitmentID.Slot()).Decode(blockIDsBytes, &blockIDs, serix.WithValidation()); err != nil { - p.Events.Error.Trigger(ierrors.Wrap(err, "failed to deserialize block ids"), id) - - return - } - - tangleMerkleProof, _, err := merklehasher.ProofFromBytes[iotago.Identifier](tangleMerkleProofBytes) - if err != nil { - p.Events.Error.Trigger(ierrors.Wrapf(err, "failed to deserialize merkle proof when receiving waprsync response for commitment %s", commitmentID), id) - - return - } - - var transactionIDs iotago.TransactionIDs - if _, err = p.apiProvider.APIForSlot(commitmentID.Slot()).Decode(transactionIDsBytes, &transactionIDs, serix.WithValidation()); err != nil { - p.Events.Error.Trigger(ierrors.Wrap(err, "failed to deserialize transaction ids"), id) - - return - } - - mutationProof, _, err := merklehasher.ProofFromBytes[iotago.Identifier](mutationProofBytes) - if err != nil { - p.Events.Error.Trigger(ierrors.Wrapf(err, "failed to deserialize merkle proof when receiving waprsync response for commitment %s", commitmentID), id) + payload := new(WarpSyncPayload) + if _, err = p.apiProvider.APIForSlot(commitmentID.Slot()).Decode(payloadBytes, payload, serix.WithValidation()); err != nil { + p.Events.Error.Trigger(ierrors.Wrap(err, "failed to deserialize payload"), id) return } - p.Events.WarpSyncResponseReceived.Trigger(commitmentID, blockIDs, tangleMerkleProof, transactionIDs, mutationProof, id) + p.Events.WarpSyncResponseReceived.Trigger(commitmentID, payload.BlockIDsBySlotCommitmentID, payload.TangleMerkleProof, payload.TransactionIDs, payload.MutationsMerkleProof, id) }) } diff --git a/pkg/protocol/block_dispatcher.go b/pkg/protocol/block_dispatcher.go index 15b2fcfa3..445211d45 100644 --- a/pkg/protocol/block_dispatcher.go +++ b/pkg/protocol/block_dispatcher.go @@ -154,9 +154,9 @@ func (b *BlockDispatcher) initNetworkConnection() { }, b.warpSyncWorkers) }) - b.protocol.Events.Network.WarpSyncResponseReceived.Hook(func(commitmentID iotago.CommitmentID, blockIDs iotago.BlockIDs, tangleMerkleProof *merklehasher.Proof[iotago.Identifier], transactionIDs iotago.TransactionIDs, mutationMerkleProof *merklehasher.Proof[iotago.Identifier], src peer.ID) { + b.protocol.Events.Network.WarpSyncResponseReceived.Hook(func(commitmentID iotago.CommitmentID, blockIDsBySlotCommitmentID map[iotago.CommitmentID]iotago.BlockIDs, tangleMerkleProof *merklehasher.Proof[iotago.Identifier], transactionIDs iotago.TransactionIDs, mutationMerkleProof *merklehasher.Proof[iotago.Identifier], src peer.ID) { b.runTask(func() { - b.protocol.HandleError(b.processWarpSyncResponse(commitmentID, blockIDs, tangleMerkleProof, transactionIDs, mutationMerkleProof, src)) + b.protocol.HandleError(b.processWarpSyncResponse(commitmentID, blockIDsBySlotCommitmentID, tangleMerkleProof, transactionIDs, mutationMerkleProof, src)) }, b.warpSyncWorkers) }) } @@ -177,7 +177,7 @@ func (b *BlockDispatcher) processWarpSyncRequest(commitmentID iotago.CommitmentI return ierrors.Wrapf(err, "commitment ID mismatch: %s != %s", commitment.ID(), commitmentID) } - blockIDs, err := committedSlot.BlockIDs() + blocksIDsByCommitmentID, err := committedSlot.BlocksIDsBySlotCommitmentID() if err != nil { return ierrors.Wrapf(err, "failed to get block IDs from slot %d", commitmentID.Slot()) } @@ -192,13 +192,13 @@ func (b *BlockDispatcher) processWarpSyncRequest(commitmentID iotago.CommitmentI return ierrors.Wrapf(err, "failed to get roots from slot %d", commitmentID.Slot()) } - b.protocol.networkProtocol.SendWarpSyncResponse(commitmentID, blockIDs, roots.TangleProof(), transactionIDs, roots.MutationProof(), src) + b.protocol.networkProtocol.SendWarpSyncResponse(commitmentID, blocksIDsByCommitmentID, roots.TangleProof(), transactionIDs, roots.MutationProof(), src) return nil } // processWarpSyncResponse processes a WarpSync response. -func (b *BlockDispatcher) processWarpSyncResponse(commitmentID iotago.CommitmentID, blockIDs iotago.BlockIDs, tangleMerkleProof *merklehasher.Proof[iotago.Identifier], transactionIDs iotago.TransactionIDs, mutationMerkleProof *merklehasher.Proof[iotago.Identifier], _ peer.ID) error { +func (b *BlockDispatcher) processWarpSyncResponse(commitmentID iotago.CommitmentID, blockIDsBySlotCommitmentID map[iotago.CommitmentID]iotago.BlockIDs, tangleMerkleProof *merklehasher.Proof[iotago.Identifier], transactionIDs iotago.TransactionIDs, mutationMerkleProof *merklehasher.Proof[iotago.Identifier], _ peer.ID) error { if b.processedWarpSyncRequests.Has(commitmentID) { return nil } @@ -220,6 +220,12 @@ func (b *BlockDispatcher) processWarpSyncResponse(commitmentID iotago.Commitment return nil } + // Flatten all blockIDs into a single slice. + var blockIDs iotago.BlockIDs + for _, ids := range blockIDsBySlotCommitmentID { + blockIDs = append(blockIDs, ids...) + } + acceptedBlocks := ads.NewSet[iotago.Identifier, iotago.BlockID](mapdb.NewMapDB(), iotago.BlockID.Bytes, iotago.BlockIDFromBytes) for _, blockID := range blockIDs { _ = acceptedBlocks.Add(blockID) // a mapdb can newer return an error @@ -326,19 +332,21 @@ func (b *BlockDispatcher) processWarpSyncResponse(commitmentID iotago.Commitment return nil } - for _, blockID := range blockIDs { - block, _ := targetEngine.BlockDAG.GetOrRequestBlock(blockID) - if block == nil { // this should never happen as we're requesting the blocks for this slot so it can't be evicted. - b.protocol.HandleError(ierrors.Errorf("failed to get block %s", blockID)) - continue - } + for slotCommitmentID, blockIDsForCommitment := range blockIDsBySlotCommitmentID { + for _, blockID := range blockIDsForCommitment { + block, _ := targetEngine.BlockDAG.GetOrRequestBlock(blockID) + if block == nil { // this should never happen as we're requesting the blocks for this slot so it can't be evicted. + b.protocol.HandleError(ierrors.Errorf("failed to get block %s", blockID)) + continue + } - // We need to make sure that we add all blocks as root blocks because we don't know which blocks are root blocks without - // blocks from future slots. We're committing the current slot which then leads to the eviction of the blocks from the - // block cache and thus if not root blocks no block in the next slot can become solid. - targetEngine.EvictionState.AddRootBlock(block.ID(), block.SlotCommitmentID()) + // We need to make sure that we add all blocks as root blocks because we don't know which blocks are root blocks without + // blocks from future slots. We're committing the current slot which then leads to the eviction of the blocks from the + // block cache and thus if not root blocks no block in the next slot can become solid. + targetEngine.EvictionState.AddRootBlock(blockID, slotCommitmentID) - block.Booked().OnUpdate(blockBookedFunc) + block.Booked().OnUpdate(blockBookedFunc) + } } return nil diff --git a/pkg/protocol/engine/blocks/block.go b/pkg/protocol/engine/blocks/block.go index 10df6cbb5..88ce9192e 100644 --- a/pkg/protocol/engine/blocks/block.go +++ b/pkg/protocol/engine/blocks/block.go @@ -247,7 +247,7 @@ func (b *Block) SlotCommitmentID() iotago.CommitmentID { return b.rootBlock.commitmentID } - return b.modelBlock.ProtocolBlock().Header.SlotCommitmentID + return b.modelBlock.SlotCommitmentID() } // IsMissing returns a flag that indicates if the underlying Block data hasn't been stored, yet. diff --git a/pkg/protocol/engine/committed_slot_api.go b/pkg/protocol/engine/committed_slot_api.go index 6b9c22c2c..5a0cb5aa1 100644 --- a/pkg/protocol/engine/committed_slot_api.go +++ b/pkg/protocol/engine/committed_slot_api.go @@ -52,10 +52,10 @@ func (c *CommittedSlotAPI) Roots() (committedRoots *iotago.Roots, err error) { return roots, nil } -// BlockIDs returns the accepted block IDs of the slot. -func (c *CommittedSlotAPI) BlockIDs() (blockIDs iotago.BlockIDs, err error) { +// BlocksIDsBySlotCommitmentID returns the accepted block IDs of the slot grouped by their SlotCommitmentID. +func (c *CommittedSlotAPI) BlocksIDsBySlotCommitmentID() (map[iotago.CommitmentID]iotago.BlockIDs, error) { if c.engine.Storage.Settings().LatestCommitment().Slot() < c.CommitmentID.Slot() { - return blockIDs, ierrors.Errorf("slot %d is not committed yet", c.CommitmentID) + return nil, ierrors.Errorf("slot %d is not committed yet", c.CommitmentID) } store, err := c.engine.Storage.Blocks(c.CommitmentID.Slot()) @@ -63,14 +63,15 @@ func (c *CommittedSlotAPI) BlockIDs() (blockIDs iotago.BlockIDs, err error) { return nil, ierrors.Errorf("failed to get block store of slot index %d", c.CommitmentID.Slot()) } + blockIDsBySlotCommitmentID := make(map[iotago.CommitmentID]iotago.BlockIDs) if err := store.ForEachBlockInSlot(func(block *model.Block) error { - blockIDs = append(blockIDs, block.ID()) + blockIDsBySlotCommitmentID[block.SlotCommitmentID()] = append(blockIDsBySlotCommitmentID[block.SlotCommitmentID()], block.ID()) return nil }); err != nil { return nil, ierrors.Wrapf(err, "failed to iterate over blocks of slot %d", c.CommitmentID.Slot()) } - return blockIDs, nil + return blockIDsBySlotCommitmentID, nil } func (c *CommittedSlotAPI) TransactionIDs() (iotago.TransactionIDs, error) { diff --git a/pkg/testsuite/mock/node.go b/pkg/testsuite/mock/node.go index e2a8e2ed2..0c6efdbe7 100644 --- a/pkg/testsuite/mock/node.go +++ b/pkg/testsuite/mock/node.go @@ -182,7 +182,7 @@ func (n *Node) hookLogging(failOnBlockFiltered bool) { fmt.Printf("%s > Network.AttestationsRequestReceived: from %s %s\n", n.Name, source, id) }) - events.Network.WarpSyncResponseReceived.Hook(func(id iotago.CommitmentID, ds iotago.BlockIDs, m *merklehasher.Proof[iotago.Identifier], ds2 iotago.TransactionIDs, m2 *merklehasher.Proof[iotago.Identifier], id2 peer.ID) { + events.Network.WarpSyncResponseReceived.Hook(func(id iotago.CommitmentID, ds map[iotago.CommitmentID]iotago.BlockIDs, m *merklehasher.Proof[iotago.Identifier], ds2 iotago.TransactionIDs, m2 *merklehasher.Proof[iotago.Identifier], id2 peer.ID) { fmt.Printf("%s > Network.WarpSyncResponseReceived: from %s %s\n", n.Name, id2, id) }) diff --git a/pkg/testsuite/storage_commitments.go b/pkg/testsuite/storage_commitments.go index 407682a9c..970b14a7a 100644 --- a/pkg/testsuite/storage_commitments.go +++ b/pkg/testsuite/storage_commitments.go @@ -58,7 +58,7 @@ func (t *TestSuite) AssertEqualStoredCommitmentAtIndex(index iotago.SlotIndex, n }) } -func (t *TestSuite) AssertStorageCommitmentBlocks(slot iotago.SlotIndex, expectedBlocks iotago.BlockIDs, nodes ...*mock.Node) { +func (t *TestSuite) AssertStorageCommitmentBlocks(slot iotago.SlotIndex, expectedBlocksBySlotCommitmentID map[iotago.CommitmentID]iotago.BlockIDs, nodes ...*mock.Node) { mustNodes(nodes) t.Eventually(func() error { @@ -73,13 +73,21 @@ func (t *TestSuite) AssertStorageCommitmentBlocks(slot iotago.SlotIndex, expecte return ierrors.Wrapf(err, "AssertStorageCommitmentBlocks: %s: error getting committed slot for commitment: %s", node.Name, storedCommitment.ID()) } - committedBlocks, err := committedSlot.BlockIDs() + committedBlocksBySlotCommitmentID, err := committedSlot.BlocksIDsBySlotCommitmentID() if err != nil { return ierrors.Wrapf(err, "AssertStorageCommitmentBlocks: %s: error getting committed blocks for slot: %d", node.Name, slot) } - if !assert.Equal(t.fakeTesting, committedBlocks, expectedBlocks) { - return ierrors.Errorf("AssertStorageCommitmentBlocks: %s: expected %s, got %s", node.Name, expectedBlocks, committedBlocks) + if len(committedBlocksBySlotCommitmentID) == 0 { + committedBlocksBySlotCommitmentID = nil + } + + if len(expectedBlocksBySlotCommitmentID) == 0 { + expectedBlocksBySlotCommitmentID = nil + } + + if !assert.Equal(t.fakeTesting, committedBlocksBySlotCommitmentID, expectedBlocksBySlotCommitmentID) { + return ierrors.Errorf("AssertStorageCommitmentBlocks: %s: expected %s, got %s", node.Name, expectedBlocksBySlotCommitmentID, committedBlocksBySlotCommitmentID) } }