From 1b7960156378b7f9a732a1d2f3d869df2fcaddb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daria=20Dziuba=C5=82towska?= Date: Mon, 25 Mar 2024 15:02:47 +0100 Subject: [PATCH] Move wallclock check to ProcessResponse, trigger protocol event and hook to it --- pkg/protocol/blocks.go | 34 ++++++++++++++++++++++++ pkg/protocol/events.go | 20 +++++++++++--- pkg/protocol/network.go | 12 --------- pkg/requesthandler/blockissuance.go | 19 ++++++++++++- pkg/tests/blocktime_monotonicity_test.go | 6 ++++- 5 files changed, 74 insertions(+), 17 deletions(-) diff --git a/pkg/protocol/blocks.go b/pkg/protocol/blocks.go index dcdbe0b9a..394c8f140 100644 --- a/pkg/protocol/blocks.go +++ b/pkg/protocol/blocks.go @@ -1,6 +1,8 @@ package protocol import ( + "time" + "github.com/libp2p/go-libp2p/core/peer" "github.com/iotaledger/hive.go/ds/types" @@ -15,6 +17,12 @@ import ( iotago "github.com/iotaledger/iota.go/v4" ) +var ( + ErrBlockTimeTooFarAheadInFuture = ierrors.New("a block cannot be too far ahead in the future") + ErrUnsolidifiableCommitment = ierrors.New("block referencing unsolidifiable commitment is not allowed") + ErrFailToUpdateDropBuffer = ierrors.New("failed to update dropped blocks buffer") +) + // Blocks is a subcomponent of the protocol that is responsible for handling block requests and responses. type Blocks struct { // protocol contains a reference to the Protocol instance that this component belongs to. @@ -87,11 +95,31 @@ func (b *Blocks) SendResponse(block *model.Block) { // ProcessResponse processes the given block response. func (b *Blocks) ProcessResponse(block *model.Block, from peer.ID) { b.workerPool.Submit(func() { + // this check must happen before the block reaches the Engine. The Protocol needs a perception of the current time, + // otherwise a malicous actor might trigger a chain switch by sending a block with a commitment in the future. + if timeDelta := time.Since(block.ProtocolBlock().Header.IssuingTime); timeDelta < -b.protocol.Options.MaxAllowedWallClockDrift { + b.LogWarn("filtered block, issuing time ahead", "block", block.ID(), "issuingTime", block.ProtocolBlock().Header.IssuingTime, "timeDelta", timeDelta, "deltaAllowed", b.protocol.Options.MaxAllowedWallClockDrift, "from", from, "err", ErrBlockTimeTooFarAheadInFuture) + + b.protocol.Events.ProtocolFilter.Trigger(&BlockFilteredEvent{ + Block: block, + Reason: ierrors.WithMessagef(ErrBlockTimeTooFarAheadInFuture, "block issuing time ahead by %v, time delta allowed: %d", -timeDelta, b.protocol.Options.MaxAllowedWallClockDrift), + Source: from, + }) + + return + } + // abort if the commitment belongs to an evicted slot commitment, err := b.protocol.Commitments.Get(block.ProtocolBlock().Header.SlotCommitmentID, true) if err != nil && ierrors.Is(err, ErrorSlotEvicted) { b.LogError("dropped block referencing unsolidifiable commitment", "commitmentID", block.ProtocolBlock().Header.SlotCommitmentID, "blockID", block.ID(), "err", err) + b.protocol.Events.ProtocolFilter.Trigger(&BlockFilteredEvent{ + Block: block, + Reason: ierrors.WithMessagef(ErrUnsolidifiableCommitment, "commitment %s slot has been evicted", block.ProtocolBlock().Header.SlotCommitmentID.String()), + Source: from, + }) + return } @@ -99,6 +127,12 @@ func (b *Blocks) ProcessResponse(block *model.Block, from peer.ID) { if commitment == nil || !commitment.Chain.Get().DispatchBlock(block, from) { if !b.droppedBlocksBuffer.Add(block.ProtocolBlock().Header.SlotCommitmentID, types.NewTuple(block, from)) { b.LogError("failed to add dropped block referencing unsolid commitment to dropped blocks buffer", "commitmentID", block.ProtocolBlock().Header.SlotCommitmentID, "blockID", block.ID()) + + b.protocol.Events.ProtocolFilter.Trigger(&BlockFilteredEvent{ + Block: block, + Reason: ierrors.WithMessagef(ErrFailToUpdateDropBuffer, "failed to add block %s to dropped blocks buffer", block.ID().String()), + Source: from, + }) } else { b.LogTrace("dropped block referencing unsolid commitment added to dropped blocks buffer", "commitmentID", block.ProtocolBlock().Header.SlotCommitmentID, "blockID", block.ID()) } diff --git a/pkg/protocol/events.go b/pkg/protocol/events.go index 098c143a7..84b1239ad 100644 --- a/pkg/protocol/events.go +++ b/pkg/protocol/events.go @@ -1,18 +1,32 @@ package protocol -import "github.com/iotaledger/iota-core/pkg/protocol/engine" +import ( + "github.com/libp2p/go-libp2p/core/peer" + + "github.com/iotaledger/hive.go/runtime/event" + "github.com/iotaledger/iota-core/pkg/model" + "github.com/iotaledger/iota-core/pkg/protocol/engine" +) // Events exposes the Events of the main engine of the protocol at a single endpoint. // // TODO: It should be replaced with reactive calls to the corresponding events and be deleted but we can do this in a // later PR (to minimize the code changes to review). type Events struct { - Engine *engine.Events + Engine *engine.Events + ProtocolFilter *event.Event1[*BlockFilteredEvent] } // NewEvents creates a new Events instance. func NewEvents() *Events { return &Events{ - Engine: engine.NewEvents(), + Engine: engine.NewEvents(), + ProtocolFilter: event.New1[*BlockFilteredEvent](), } } + +type BlockFilteredEvent struct { + Block *model.Block + Reason error + Source peer.ID +} diff --git a/pkg/protocol/network.go b/pkg/protocol/network.go index ba419ab18..802d5ab38 100644 --- a/pkg/protocol/network.go +++ b/pkg/protocol/network.go @@ -1,19 +1,14 @@ package protocol import ( - "time" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/log" "github.com/iotaledger/iota-core/pkg/model" "github.com/iotaledger/iota-core/pkg/network" "github.com/iotaledger/iota-core/pkg/network/protocols/core" ) -var ErrBlockTimeTooFarAheadInFuture = ierrors.New("a block cannot be too far ahead in the future") - // Network is a subcomponent of the protocol that is responsible for handling the network communication. type Network struct { // Protocol contains the network endpoint of the protocol. @@ -42,13 +37,6 @@ func newNetwork(protocol *Protocol, networkEndpoint network.Endpoint) *Network { // OnBlockReceived overwrites the OnBlockReceived method of the core protocol to filter out invalid blocks. func (n *Network) OnBlockReceived(callback func(block *model.Block, src peer.ID)) (unsubscribe func()) { return n.Protocol.OnBlockReceived(func(block *model.Block, src peer.ID) { - // filter blocks from the future - if timeDelta := time.Since(block.ProtocolBlock().Header.IssuingTime); timeDelta < -n.protocol.Options.MaxAllowedWallClockDrift { - n.LogWarn("filtered block, issuing time ahead", "block", block.ID(), "issuingTime", block.ProtocolBlock().Header.IssuingTime, "timeDelta", timeDelta, "deltaAllowed", n.protocol.Options.MaxAllowedWallClockDrift, "from", src, "err", ErrBlockTimeTooFarAheadInFuture) - - return - } - callback(block, src) }) } diff --git a/pkg/requesthandler/blockissuance.go b/pkg/requesthandler/blockissuance.go index c7b65aa76..10a037da4 100644 --- a/pkg/requesthandler/blockissuance.go +++ b/pkg/requesthandler/blockissuance.go @@ -8,6 +8,7 @@ import ( "github.com/iotaledger/hive.go/lo" "github.com/iotaledger/hive.go/runtime/event" "github.com/iotaledger/iota-core/pkg/model" + "github.com/iotaledger/iota-core/pkg/protocol" "github.com/iotaledger/iota-core/pkg/protocol/engine/blocks" "github.com/iotaledger/iota-core/pkg/protocol/engine/filter/postsolidfilter" "github.com/iotaledger/iota-core/pkg/protocol/engine/filter/presolidfilter" @@ -34,21 +35,36 @@ func (r *RequestHandler) submitBlockAndAwaitEvent(ctx context.Context, block *mo // it will never trigger one of the below events. processingCtx, processingCtxCancel := context.WithTimeout(ctx, 5*time.Second) defer processingCtxCancel() + // Calculate the blockID so that we don't capture the block pointer in the event handlers. blockID := block.ID() evtUnhook := evt.Hook(func(eventBlock *blocks.Block) { if blockID != eventBlock.ID() { return } + select { case filtered <- nil: case <-exit: } }, event.WithWorkerPool(r.workerPool)).Unhook + + protocolFilteredUnhook := r.protocol.Events.ProtocolFilter.Hook(func(event *protocol.BlockFilteredEvent) { + if blockID != event.Block.ID() { + return + } + + select { + case filtered <- event.Reason: + case <-exit: + } + }, event.WithWorkerPool(r.workerPool)).Unhook + prefilteredUnhook := r.protocol.Events.Engine.PreSolidFilter.BlockPreFiltered.Hook(func(event *presolidfilter.BlockPreFilteredEvent) { if blockID != event.Block.ID() { return } + select { case filtered <- event.Reason: case <-exit: @@ -65,11 +81,12 @@ func (r *RequestHandler) submitBlockAndAwaitEvent(ctx context.Context, block *mo } }, event.WithWorkerPool(r.workerPool)).Unhook - defer lo.BatchReverse(evtUnhook, prefilteredUnhook, postfilteredUnhook)() + defer lo.BatchReverse(evtUnhook, protocolFilteredUnhook, prefilteredUnhook, postfilteredUnhook)() if err := r.submitBlock(block); err != nil { return ierrors.Wrapf(err, "failed to issue block %s", blockID) } + select { case <-processingCtx.Done(): return ierrors.Errorf("context canceled whilst waiting for event on block %s", blockID) diff --git a/pkg/tests/blocktime_monotonicity_test.go b/pkg/tests/blocktime_monotonicity_test.go index 44aa26f64..703824adc 100644 --- a/pkg/tests/blocktime_monotonicity_test.go +++ b/pkg/tests/blocktime_monotonicity_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/lo" "github.com/iotaledger/hive.go/runtime/options" "github.com/iotaledger/iota-core/pkg/protocol" @@ -49,11 +50,14 @@ func Test_MaxAllowedWallClockDrift(t *testing.T) { tooFarAheadFutureBlock := lo.PanicOnErr(node0.Validator.CreateBasicBlock(context.Background(), "tooFarAheadFuture", mock.WithBasicBlockHeader(mock.WithIssuingTime(time.Now().Add(allowedDrift).Add(1*time.Second))))) ts.RegisterBlock("tooFarAheadFuture", tooFarAheadFutureBlock) - require.NoError(t, node0.Validator.SubmitBlockWithoutAwaitingBooking(tooFarAheadFutureBlock.ModelBlock(), node0)) + err := node0.Validator.SubmitBlock(context.Background(), tooFarAheadFutureBlock.ModelBlock()) + require.Error(t, err) + require.True(t, ierrors.Is(err, protocol.ErrBlockTimeTooFarAheadInFuture)) ts.AssertBlocksExist(ts.Blocks("past", "present", "acceptedFuture"), true, node0.Client) ts.AssertBlocksExist(ts.Blocks("tooFarAheadFuture"), false, node0.Client) } + func Test_BlockTimeMonotonicity(t *testing.T) { ts := testsuite.NewTestSuite(t, testsuite.WithProtocolParametersOptions(