Skip to content

Commit

Permalink
Merge pull request #855 from iotaledger/feat/wallclock-drift
Browse files Browse the repository at this point in the history
Make sure user knows that block was filtered out due to wallclock drift
  • Loading branch information
alexsporn authored Mar 26, 2024
2 parents b18b728 + 141ca7b commit a12e99a
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 17 deletions.
34 changes: 34 additions & 0 deletions pkg/protocol/blocks.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package protocol

import (
"time"

"github.com/libp2p/go-libp2p/core/peer"

"github.com/iotaledger/hive.go/ds/types"
Expand All @@ -15,6 +17,12 @@ import (
iotago "github.com/iotaledger/iota.go/v4"
)

var (
ErrBlockTimeTooFarAheadInFuture = ierrors.New("a block cannot be too far ahead in the future")
ErrUnsolidifiableCommitment = ierrors.New("block referencing unsolidifiable commitment is not allowed")
ErrFailToUpdateDropBuffer = ierrors.New("failed to update dropped blocks buffer")
)

// Blocks is a subcomponent of the protocol that is responsible for handling block requests and responses.
type Blocks struct {
// protocol contains a reference to the Protocol instance that this component belongs to.
Expand Down Expand Up @@ -87,18 +95,44 @@ func (b *Blocks) SendResponse(block *model.Block) {
// ProcessResponse processes the given block response.
func (b *Blocks) ProcessResponse(block *model.Block, from peer.ID) {
b.workerPool.Submit(func() {
// this check must happen before the block reaches the Engine. The Protocol needs a perception of the current time,
// otherwise a malicous actor might trigger a chain switch by sending a block with a commitment in the future.
if timeDelta := time.Since(block.ProtocolBlock().Header.IssuingTime); timeDelta < -b.protocol.Options.MaxAllowedWallClockDrift {
b.LogWarn("filtered block, issuing time ahead", "block", block.ID(), "issuingTime", block.ProtocolBlock().Header.IssuingTime, "timeDelta", timeDelta, "deltaAllowed", b.protocol.Options.MaxAllowedWallClockDrift, "from", from, "err", ErrBlockTimeTooFarAheadInFuture)

b.protocol.Events.ProtocolFilter.Trigger(&BlockFilteredEvent{
Block: block,
Reason: ierrors.WithMessagef(ErrBlockTimeTooFarAheadInFuture, "block issuing time ahead by %v, time delta allowed: %d", -timeDelta, b.protocol.Options.MaxAllowedWallClockDrift),
Source: from,
})

return
}

// abort if the commitment belongs to an evicted slot
commitment, err := b.protocol.Commitments.Get(block.ProtocolBlock().Header.SlotCommitmentID, true)
if err != nil && ierrors.Is(err, ErrorSlotEvicted) {
b.LogError("dropped block referencing unsolidifiable commitment", "commitmentID", block.ProtocolBlock().Header.SlotCommitmentID, "blockID", block.ID(), "err", err)

b.protocol.Events.ProtocolFilter.Trigger(&BlockFilteredEvent{
Block: block,
Reason: ierrors.WithMessagef(ErrUnsolidifiableCommitment, "commitment %s slot has been evicted", block.ProtocolBlock().Header.SlotCommitmentID.String()),
Source: from,
})

return
}

// add the block to the dropped blocks buffer if we could not dispatch it to the chain
if commitment == nil || !commitment.Chain.Get().DispatchBlock(block, from) {
if !b.droppedBlocksBuffer.Add(block.ProtocolBlock().Header.SlotCommitmentID, types.NewTuple(block, from)) {
b.LogError("failed to add dropped block referencing unsolid commitment to dropped blocks buffer", "commitmentID", block.ProtocolBlock().Header.SlotCommitmentID, "blockID", block.ID())

b.protocol.Events.ProtocolFilter.Trigger(&BlockFilteredEvent{
Block: block,
Reason: ierrors.WithMessagef(ErrFailToUpdateDropBuffer, "failed to add block %s to dropped blocks buffer", block.ID().String()),
Source: from,
})
} else {
b.LogTrace("dropped block referencing unsolid commitment added to dropped blocks buffer", "commitmentID", block.ProtocolBlock().Header.SlotCommitmentID, "blockID", block.ID())
}
Expand Down
20 changes: 17 additions & 3 deletions pkg/protocol/events.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
package protocol

import "github.com/iotaledger/iota-core/pkg/protocol/engine"
import (
"github.com/libp2p/go-libp2p/core/peer"

"github.com/iotaledger/hive.go/runtime/event"
"github.com/iotaledger/iota-core/pkg/model"
"github.com/iotaledger/iota-core/pkg/protocol/engine"
)

// Events exposes the Events of the main engine of the protocol at a single endpoint.
//
// TODO: It should be replaced with reactive calls to the corresponding events and be deleted but we can do this in a
// later PR (to minimize the code changes to review).
type Events struct {
Engine *engine.Events
Engine *engine.Events
ProtocolFilter *event.Event1[*BlockFilteredEvent]
}

// NewEvents creates a new Events instance.
func NewEvents() *Events {
return &Events{
Engine: engine.NewEvents(),
Engine: engine.NewEvents(),
ProtocolFilter: event.New1[*BlockFilteredEvent](),
}
}

type BlockFilteredEvent struct {
Block *model.Block
Reason error
Source peer.ID
}
12 changes: 0 additions & 12 deletions pkg/protocol/network.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
package protocol

import (
"time"

"github.com/libp2p/go-libp2p/core/peer"

"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/log"
"github.com/iotaledger/iota-core/pkg/model"
"github.com/iotaledger/iota-core/pkg/network"
"github.com/iotaledger/iota-core/pkg/network/protocols/core"
)

var ErrBlockTimeTooFarAheadInFuture = ierrors.New("a block cannot be too far ahead in the future")

// Network is a subcomponent of the protocol that is responsible for handling the network communication.
type Network struct {
// Protocol contains the network endpoint of the protocol.
Expand Down Expand Up @@ -42,13 +37,6 @@ func newNetwork(protocol *Protocol, networkEndpoint network.Endpoint) *Network {
// OnBlockReceived overwrites the OnBlockReceived method of the core protocol to filter out invalid blocks.
func (n *Network) OnBlockReceived(callback func(block *model.Block, src peer.ID)) (unsubscribe func()) {
return n.Protocol.OnBlockReceived(func(block *model.Block, src peer.ID) {
// filter blocks from the future
if timeDelta := time.Since(block.ProtocolBlock().Header.IssuingTime); timeDelta < -n.protocol.Options.MaxAllowedWallClockDrift {
n.LogWarn("filtered block, issuing time ahead", "block", block.ID(), "issuingTime", block.ProtocolBlock().Header.IssuingTime, "timeDelta", timeDelta, "deltaAllowed", n.protocol.Options.MaxAllowedWallClockDrift, "from", src, "err", ErrBlockTimeTooFarAheadInFuture)

return
}

callback(block, src)
})
}
12 changes: 11 additions & 1 deletion pkg/requesthandler/blockissuance.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/hive.go/runtime/event"
"github.com/iotaledger/iota-core/pkg/model"
"github.com/iotaledger/iota-core/pkg/protocol"
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"
"github.com/iotaledger/iota-core/pkg/protocol/engine/filter/postsolidfilter"
"github.com/iotaledger/iota-core/pkg/protocol/engine/filter/presolidfilter"
Expand Down Expand Up @@ -81,6 +82,15 @@ func (r *RequestHandler) submitBlockAndAwaitRetainer(ctx context.Context, block
blockCtxCancel(errBlockRetained)
}, event.WithWorkerPool(r.workerPool)).Unhook

protocolFilteredUnhook := r.protocol.Events.ProtocolFilter.Hook(func(event *protocol.BlockFilteredEvent) {
if blockID != event.Block.ID() {
return
}

// signal that block was dropped by the protocol
blockCtxCancel(event.Reason)
}, event.WithWorkerPool(r.workerPool)).Unhook

blockPreFilteredUnhook := r.protocol.Events.Engine.PreSolidFilter.BlockPreFiltered.Hook(func(event *presolidfilter.BlockPreFilteredEvent) {
if blockID != event.Block.ID() {
return
Expand All @@ -99,7 +109,7 @@ func (r *RequestHandler) submitBlockAndAwaitRetainer(ctx context.Context, block
blockCtxCancel(event.Reason)
}, event.WithWorkerPool(r.workerPool)).Unhook

defer lo.BatchReverse(txRetainedUnhook, blockRetainedUnhook, blockPreFilteredUnhook, blockPostFilteredUnhook)()
defer lo.BatchReverse(txRetainedUnhook, blockRetainedUnhook, protocolFilteredUnhook, blockPreFilteredUnhook, blockPostFilteredUnhook)()

if err := r.submitBlock(block); err != nil {
return ierrors.Wrapf(err, "failed to issue block %s", blockID)
Expand Down
6 changes: 5 additions & 1 deletion pkg/tests/blocktime_monotonicity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/hive.go/runtime/options"
"github.com/iotaledger/iota-core/pkg/protocol"
Expand Down Expand Up @@ -49,11 +50,14 @@ func Test_MaxAllowedWallClockDrift(t *testing.T) {

tooFarAheadFutureBlock := lo.PanicOnErr(node0.Validator.CreateBasicBlock(context.Background(), "tooFarAheadFuture", mock.WithBasicBlockHeader(mock.WithIssuingTime(time.Now().Add(allowedDrift).Add(1*time.Second)))))
ts.RegisterBlock("tooFarAheadFuture", tooFarAheadFutureBlock)
require.NoError(t, node0.Validator.SubmitBlockWithoutAwaitingBooking(tooFarAheadFutureBlock.ModelBlock(), node0))
err := node0.Validator.SubmitBlock(context.Background(), tooFarAheadFutureBlock.ModelBlock())
require.Error(t, err)
require.True(t, ierrors.Is(err, protocol.ErrBlockTimeTooFarAheadInFuture))

ts.AssertBlocksExist(ts.Blocks("past", "present", "acceptedFuture"), true, node0.Client)
ts.AssertBlocksExist(ts.Blocks("tooFarAheadFuture"), false, node0.Client)
}

func Test_BlockTimeMonotonicity(t *testing.T) {
ts := testsuite.NewTestSuite(t,
testsuite.WithProtocolParametersOptions(
Expand Down

0 comments on commit a12e99a

Please sign in to comment.