Skip to content

Commit

Permalink
Merge pull request #422 from ethereum-optimism/interop-mempool-filter
Browse files Browse the repository at this point in the history
interop: Ingress Filtering for Interop Enabled Mempool
  • Loading branch information
axelKingsley authored Nov 5, 2024
2 parents 48cf9ac + 262842b commit d6a8b0b
Show file tree
Hide file tree
Showing 15 changed files with 503 additions and 20 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ var (
utils.RollupHistoricalRPCFlag,
utils.RollupHistoricalRPCTimeoutFlag,
utils.RollupInteropRPCFlag,
utils.RollupInteropMempoolFilteringFlag,
utils.RollupDisableTxPoolGossipFlag,
utils.RollupComputePendingBlock,
utils.RollupHaltOnIncompatibleProtocolVersionFlag,
Expand Down
9 changes: 9 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,12 @@ var (
Category: flags.RollupCategory,
}

RollupInteropMempoolFilteringFlag = &cli.BoolFlag{
Name: "rollup.interopmempoolfiltering",
Usage: "If using interop, transactions are checked for interop validity before being added to the mempool (experimental).",
Category: flags.RollupCategory,
}

RollupDisableTxPoolGossipFlag = &cli.BoolFlag{
Name: "rollup.disabletxpoolgossip",
Usage: "Disable transaction pool gossip.",
Expand Down Expand Up @@ -1950,6 +1956,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.IsSet(RollupInteropRPCFlag.Name) {
cfg.InteropMessageRPC = ctx.String(RollupInteropRPCFlag.Name)
}
if ctx.IsSet(RollupInteropMempoolFilteringFlag.Name) {
cfg.InteropMempoolFiltering = ctx.Bool(RollupInteropMempoolFilteringFlag.Name)
}
cfg.RollupDisableTxPoolGossip = ctx.Bool(RollupDisableTxPoolGossipFlag.Name)
cfg.RollupDisableTxPoolAdmission = cfg.RollupSequencerHTTP != "" && !ctx.Bool(RollupEnableTxPoolAdmissionFlag.Name)
cfg.RollupHaltOnIncompatibleProtocolVersion = ctx.String(RollupHaltOnIncompatibleProtocolVersionFlag.Name)
Expand Down
4 changes: 4 additions & 0 deletions core/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ var (
// current network configuration.
ErrTxTypeNotSupported = types.ErrTxTypeNotSupported

// ErrTxFilteredOut indicates an ingress filter has rejected the transaction from
// being included in the pool.
ErrTxFilteredOut = errors.New("transaction filtered out")

// ErrTipAboveFeeCap is a sanity error to ensure no one is able to specify a
// transaction with a tip higher than the total fee cap.
ErrTipAboveFeeCap = errors.New("max priority fee per gas higher than max fee per gas")
Expand Down
57 changes: 57 additions & 0 deletions core/txpool/ingress_filters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package txpool

import (
"context"
"time"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/types/interoptypes"
"github.com/ethereum/go-ethereum/log"
)

// IngressFilter is an interface that allows filtering of transactions before they are added to the transaction pool.
// Implementations of this interface can be used to filter transactions based on various criteria.
// FilterTx will return true if the transaction should be allowed, and false if it should be rejected.
type IngressFilter interface {
FilterTx(ctx context.Context, tx *types.Transaction) bool
}

type interopFilter struct {
logsFn func(tx *types.Transaction) ([]*types.Log, error)
checkFn func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error
}

func NewInteropFilter(
logsFn func(tx *types.Transaction) ([]*types.Log, error),
checkFn func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error) IngressFilter {
return &interopFilter{
logsFn: logsFn,
checkFn: checkFn,
}
}

// FilterTx implements IngressFilter.FilterTx
// it gets logs checks for message safety based on the function provided
func (f *interopFilter) FilterTx(ctx context.Context, tx *types.Transaction) bool {
logs, err := f.logsFn(tx)
if err != nil {
log.Debug("Failed to retrieve logs of tx", "txHash", tx.Hash(), "err", err)
return false // default to deny if logs cannot be retrieved
}
if len(logs) == 0 {
return true // default to allow if there are no logs
}
ems, err := interoptypes.ExecutingMessagesFromLogs(logs)
if err != nil {
log.Debug("Failed to parse executing messages of tx", "txHash", tx.Hash(), "err", err)
return false // default to deny if logs cannot be parsed
}
if len(ems) == 0 {
return true // default to allow if there are no executing messages
}

ctx, cancel := context.WithTimeout(ctx, time.Second*2)
defer cancel()
// check with the supervisor if the transaction should be allowed given the executing messages
return f.checkFn(ctx, ems, interoptypes.Unsafe) == nil
}
188 changes: 188 additions & 0 deletions core/txpool/ingress_filters_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package txpool

import (
"context"
"errors"
"math/big"
"net"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/types/interoptypes"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"
)

func TestInteropFilter(t *testing.T) {
// some placeholder transaction to test with
tx := types.NewTx(&types.DynamicFeeTx{
ChainID: big.NewInt(1),
Nonce: 1,
To: &common.Address{},
Value: big.NewInt(1),
Data: []byte{},
})
t.Run("Tx has no logs", func(t *testing.T) {
logFn := func(tx *types.Transaction) ([]*types.Log, error) {
return []*types.Log{}, nil
}
checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error {
// make this return error, but it won't be called because logs are empty
return errors.New("error")
}
// when there are no logs to process, the transaction should be allowed
filter := NewInteropFilter(logFn, checkFn)
require.True(t, filter.FilterTx(context.Background(), tx))
})
t.Run("Tx errored when getting logs", func(t *testing.T) {
logFn := func(tx *types.Transaction) ([]*types.Log, error) {
return []*types.Log{}, errors.New("error")
}
checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error {
// make this return error, but it won't be called because logs retrieval errored
return errors.New("error")
}
// when log retrieval errors, the transaction should be denied
filter := NewInteropFilter(logFn, checkFn)
require.False(t, filter.FilterTx(context.Background(), tx))
})
t.Run("Tx has no executing messages", func(t *testing.T) {
logFn := func(tx *types.Transaction) ([]*types.Log, error) {
l1 := &types.Log{
Topics: []common.Hash{common.BytesToHash([]byte("topic1"))},
}
return []*types.Log{l1}, nil
}
checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error {
// make this return error, but it won't be called because logs retrieval doesn't have executing messages
return errors.New("error")
}
// when no executing messages are included, the transaction should be allowed
filter := NewInteropFilter(logFn, checkFn)
require.True(t, filter.FilterTx(context.Background(), tx))
})
t.Run("Tx has valid executing message", func(t *testing.T) {
// build a basic executing message
// the executing message must pass basic decode validation,
// but the validity check is done by the checkFn
l1 := &types.Log{
Address: params.InteropCrossL2InboxAddress,
Topics: []common.Hash{
common.BytesToHash(interoptypes.ExecutingMessageEventTopic[:]),
common.BytesToHash([]byte("payloadHash")),
},
Data: []byte{},
}
// using all 0s for data allows all takeZeros to pass
for i := 0; i < 32*5; i++ {
l1.Data = append(l1.Data, 0)
}
logFn := func(tx *types.Transaction) ([]*types.Log, error) {
return []*types.Log{l1}, nil
}
var spyEMs []interoptypes.Message
checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error {
spyEMs = ems
return nil
}
// when there is one executing message, the transaction should be allowed
// if the checkFn returns nil
filter := NewInteropFilter(logFn, checkFn)
require.True(t, filter.FilterTx(context.Background(), tx))
// confirm that one executing message was passed to the checkFn
require.Equal(t, 1, len(spyEMs))
})
t.Run("Tx has invalid executing message", func(t *testing.T) {
// build a basic executing message
// the executing message must pass basic decode validation,
// but the validity check is done by the checkFn
l1 := &types.Log{
Address: params.InteropCrossL2InboxAddress,
Topics: []common.Hash{
common.BytesToHash(interoptypes.ExecutingMessageEventTopic[:]),
common.BytesToHash([]byte("payloadHash")),
},
Data: []byte{},
}
// using all 0s for data allows all takeZeros to pass
for i := 0; i < 32*5; i++ {
l1.Data = append(l1.Data, 0)
}
logFn := func(tx *types.Transaction) ([]*types.Log, error) {
return []*types.Log{l1}, nil
}
var spyEMs []interoptypes.Message
checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error {
spyEMs = ems
return errors.New("error")
}
// when there is one executing message, and the checkFn returns an error,
// (ie the supervisor rejects the transaction) the transaction should be denied
filter := NewInteropFilter(logFn, checkFn)
require.False(t, filter.FilterTx(context.Background(), tx))
// confirm that one executing message was passed to the checkFn
require.Equal(t, 1, len(spyEMs))
})
}

func TestInteropFilterRPCFailures(t *testing.T) {
tests := []struct {
name string
networkErr bool
timeout bool
invalidResp bool
}{
{
name: "Network Error",
networkErr: true,
},
{
name: "Timeout",
timeout: true,
},
{
name: "Invalid Response",
invalidResp: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create mock log function that always returns our test log
logFn := func(tx *types.Transaction) ([]*types.Log, error) {
log := &types.Log{
Address: params.InteropCrossL2InboxAddress,
Topics: []common.Hash{
common.BytesToHash(interoptypes.ExecutingMessageEventTopic[:]),
common.BytesToHash([]byte("payloadHash")),
},
Data: make([]byte, 32*5),
}
return []*types.Log{log}, nil
}

// Create mock check function that simulates RPC failures
checkFn := func(ctx context.Context, ems []interoptypes.Message, safety interoptypes.SafetyLevel) error {
if tt.networkErr {
return &net.OpError{Op: "dial", Err: errors.New("connection refused")}
}

if tt.timeout {
return context.DeadlineExceeded
}

if tt.invalidResp {
return errors.New("invalid response format")
}

return nil
}

// Create and test filter
filter := NewInteropFilter(logFn, checkFn)
result := filter.FilterTx(context.Background(), &types.Transaction{})
require.Equal(t, false, result, "FilterTx result mismatch")
})
}
}
42 changes: 36 additions & 6 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package txpool

import (
"context"
"errors"
"fmt"
"math/big"
Expand Down Expand Up @@ -77,22 +78,32 @@ type TxPool struct {
term chan struct{} // Termination channel to detect a closed pool

sync chan chan error // Testing / simulator channel to block until internal reset is done

ingressFilters []IngressFilter // List of filters to apply to incoming transactions

filterCtx context.Context // Filters may use external resources
filterCancel context.CancelFunc // Filter calls are cancelled on shutdown
}

// New creates a new transaction pool to gather, sort and filter inbound
// transactions from the network.
func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) {
func New(gasTip uint64, chain BlockChain, subpools []SubPool, poolFilters []IngressFilter) (*TxPool, error) {
// Retrieve the current head so that all subpools and this main coordinator
// pool will have the same starting state, even if the chain moves forward
// during initialization.
head := chain.CurrentBlock()

filterCtx, filterCancel := context.WithCancel(context.Background())

pool := &TxPool{
subpools: subpools,
reservations: make(map[common.Address]SubPool),
quit: make(chan chan error),
term: make(chan struct{}),
sync: make(chan chan error),
subpools: subpools,
reservations: make(map[common.Address]SubPool),
quit: make(chan chan error),
term: make(chan struct{}),
sync: make(chan chan error),
ingressFilters: poolFilters,
filterCtx: filterCtx,
filterCancel: filterCancel,
}
for i, subpool := range subpools {
if err := subpool.Init(gasTip, head, pool.reserver(i, subpool)); err != nil {
Expand Down Expand Up @@ -156,6 +167,8 @@ func (p *TxPool) reserver(id int, subpool SubPool) AddressReserver {
func (p *TxPool) Close() error {
var errs []error

p.filterCancel() // Cancel filter work, these in-flight txs will be not be allowed through before shutdown

// Terminate the reset loop and wait for it to finish
errc := make(chan error)
p.quit <- errc
Expand Down Expand Up @@ -319,11 +332,23 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
// so we can piece back the returned errors into the original order.
txsets := make([][]*types.Transaction, len(p.subpools))
splits := make([]int, len(txs))
filtered_out := make([]bool, len(txs))

for i, tx := range txs {
// Mark this transaction belonging to no-subpool
splits[i] = -1

// Filter the transaction through the ingress filters
for _, f := range p.ingressFilters {
if !f.FilterTx(p.filterCtx, tx) {
filtered_out[i] = true
}
}
// if the transaction is filtered out, don't add it to any subpool
if filtered_out[i] {
continue
}

// Try to find a subpool that accepts the transaction
for j, subpool := range p.subpools {
if subpool.Filter(tx) {
Expand All @@ -341,6 +366,11 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
}
errs := make([]error, len(txs))
for i, split := range splits {
// If the transaction was filtered out, mark it as such
if filtered_out[i] {
errs[i] = core.ErrTxFilteredOut
continue
}
// If the transaction was rejected by all subpools, mark it unsupported
if split == -1 {
errs[i] = core.ErrTxTypeNotSupported
Expand Down
Loading

0 comments on commit d6a8b0b

Please sign in to comment.