From ac7bf86ddc83f5359a978c6a22021bda160b0734 Mon Sep 17 00:00:00 2001 From: Franco Barpp Gomes Date: Fri, 24 May 2024 16:24:14 -0300 Subject: [PATCH] Add cache-based duplicate log detection to SafeEthClient (#180) * chore: Add hashicorp lru cache dependency * feat: Cache past logs to prevent missed or duplicated logs * feat: Filter logs from last block to make sure there are no missed * feat: Add block and tx info to hashLog * test: Add duplicate log test * test: Add mock tx hash to network mock eth client log emissions * test: Send proxy channels instead of returning pointers on controllable mock client * test: Check enqueued logs on duplicate log test --- core/safeclient/client.go | 31 ++++-- core/safeclient/client_test.go | 188 +++++++++++++++++++++++++++++++-- core/safeclient/utils.go | 17 +++ go.mod | 1 + go.sum | 4 +- 5 files changed, 223 insertions(+), 18 deletions(-) diff --git a/core/safeclient/client.go b/core/safeclient/client.go index d5b8c5d8..deae1daf 100644 --- a/core/safeclient/client.go +++ b/core/safeclient/client.go @@ -12,6 +12,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" + lru "github.com/hashicorp/golang-lru/v2" ) const ( @@ -147,6 +148,18 @@ func (s *SafeSubscription) SetUnderlyingSub(sub ethereum.Subscription) { } func (c *SafeEthClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { + logCache, err := lru.New[[32]byte, any](100) + if err != nil { + c.logger.Error("Failed to create log cache", "err", err) + return nil, err + } + + tryCacheLog := func(log *types.Log) bool { + hash := hashLog(log) + ok, _ := logCache.ContainsOrAdd(hash, nil) + return !ok + } + currentBlock, err := c.Client.BlockNumber(ctx) if err != nil { c.logger.Error("Failed to get current block number", "err", err) @@ -187,7 +200,7 @@ func (c *SafeEthClient) SubscribeFilterLogs(ctx context.Context, q ethereum.Filt rangeStartBlock = 0 } - fromBlock := max(lastBlock, rangeStartBlock) + 1 + fromBlock := max(lastBlock, rangeStartBlock+1) for ; fromBlock < currentBlock; fromBlock += (c.blockChunkSize + 1) { toBlock := min(fromBlock+c.blockChunkSize, currentBlock) @@ -230,8 +243,10 @@ func (c *SafeEthClient) SubscribeFilterLogs(ctx context.Context, q ethereum.Filt safeSub.SetUnderlyingSub(newSub) for _, log := range missedLogs { - lastBlock = max(lastBlock, log.BlockNumber) - ch <- log + if tryCacheLog(&log) { + lastBlock = max(lastBlock, log.BlockNumber) + ch <- log + } } return nil @@ -262,14 +277,10 @@ func (c *SafeEthClient) SubscribeFilterLogs(ctx context.Context, q ethereum.Filt c.logger.Debug("Safe subscription ended") return case log := <-proxyC: - // if that's the case, then most likely we got an event on filterLog and are getting the same one in the sub - if lastBlock > log.BlockNumber { - continue + if tryCacheLog(&log) { + lastBlock = max(lastBlock, log.BlockNumber) + ch <- log } - - // since resub pushes the missed blocks directly to the channel and updates lastBlock, this is ordered - lastBlock = log.BlockNumber - ch <- log case <-ticker.C: c.logger.Debug("Resub ticker fired") handleResub() diff --git a/core/safeclient/client_test.go b/core/safeclient/client_test.go index 983f6325..e3f6c08e 100644 --- a/core/safeclient/client_test.go +++ b/core/safeclient/client_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" @@ -133,7 +134,7 @@ func (m *MockEthClient) subscribeToClose() <-chan bool { return ch } -func NewMockEthClient(ctx context.Context, mockCtrl *gomock.Controller, mockNetwork *MockNetwork) *MockEthClient { +func NewMockEthClientFromNetwork(ctx context.Context, mockCtrl *gomock.Controller, mockNetwork *MockNetwork) *MockEthClient { fmt.Println("creating mock client") mockClient := &MockEthClient{ @@ -237,7 +238,9 @@ func NewMockEthClient(ctx context.Context, mockCtrl *gomock.Controller, mockNetw continue } - ch <- types.Log{BlockNumber: blockNum} + log := types.Log{BlockNumber: blockNum, Index: uint(blockNum)} + + ch <- log } } }() @@ -254,9 +257,133 @@ func NewMockEthClient(ctx context.Context, mockCtrl *gomock.Controller, mockNetw return mockClient } -func NewMockSafeClient(ctx context.Context, mockCtrl *gomock.Controller, logger logging.Logger, mockNetwork *MockNetwork) (*safeclient.SafeEthClient, error) { +func NewMockSafeClientFromNetwork(ctx context.Context, mockCtrl *gomock.Controller, logger logging.Logger, mockNetwork *MockNetwork) (*safeclient.SafeEthClient, error) { + client, err := safeclient.NewSafeEthClient("", logger, safeclient.WithCustomCreateClient(func(rpcUrl string, logger logging.Logger) (eth.Client, error) { + return NewMockEthClientFromNetwork(ctx, mockCtrl, mockNetwork), nil + })) + + return client, err +} + +func NewMockClientControllable(ctx context.Context, mockCtrl *gomock.Controller, headerProxyC <-chan *types.Header, logProxyC <-chan types.Log, blockNum *uint64) (mockClient *MockEthClient) { + fmt.Println("creating mock client") + + mockClient = &MockEthClient{ + MockEthClient: mocks.NewMockEthClient(mockCtrl), + } + + mockClient.EXPECT().SubscribeNewHead(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { + if mockClient.isClosed { + return nil, errors.New("connection refused") + } + + sub := mocks.NewMockSubscription(mockCtrl) + + closeCh := mockClient.subscribeToClose() + + subErrCh := make(chan error) + stopCh := make(chan struct{}) + + sub.EXPECT().Err().Return(subErrCh).AnyTimes() + sub.EXPECT().Unsubscribe().Do(func() { + close(stopCh) + }).AnyTimes() + + go func() { + for { + select { + case <-stopCh: + return + case <-ctx.Done(): + return + case closed := <-closeCh: + fmt.Println("closed", closed) + + closeCh = mockClient.subscribeToClose() + if closed { + subErrCh <- errors.New("connection refused") + } + case header := <-headerProxyC: + if mockClient.isClosed { + continue + } + + if !mockClient.isPaused { + ch <- header + } + } + } + }() + + return sub, nil + }, + ).AnyTimes() + + mockClient.EXPECT().SubscribeFilterLogs(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { + if mockClient.isClosed { + return nil, errors.New("connection refused") + } + + sub := mocks.NewMockSubscription(mockCtrl) + + closeCh := mockClient.subscribeToClose() + + subErrCh := make(chan error) + stopCh := make(chan struct{}) + + sub.EXPECT().Err().Return(subErrCh).AnyTimes() + sub.EXPECT().Unsubscribe().Do(func() { + close(stopCh) + }).AnyTimes() + + go func() { + for { + select { + case <-stopCh: + fmt.Println("subscription done") + return + case <-ctx.Done(): + fmt.Println("subscription done") + return + case closed := <-closeCh: + fmt.Println("closed", closed) + + closeCh = mockClient.subscribeToClose() + + if closed { + subErrCh <- errors.New("connection refused") + } + case log := <-logProxyC: + if mockClient.isClosed { + continue + } + + if !mockClient.isPaused { + ch <- log + } + } + } + }() + + return sub, nil + }, + ).AnyTimes() + + mockClient.EXPECT().BlockNumber(gomock.Any()).DoAndReturn( + func(ctx context.Context) (uint64, error) { + return *blockNum, nil + }, + ).AnyTimes() + + return mockClient +} + +func NewMockSafeClientControllable(ctx context.Context, mockCtrl *gomock.Controller, logger logging.Logger, headerProxyC <-chan *types.Header, logProxyC <-chan types.Log, blockNum *uint64) (*safeclient.SafeEthClient, error) { client, err := safeclient.NewSafeEthClient("", logger, safeclient.WithCustomCreateClient(func(rpcUrl string, logger logging.Logger) (eth.Client, error) { - return NewMockEthClient(ctx, mockCtrl, mockNetwork), nil + mockClient := NewMockClientControllable(ctx, mockCtrl, headerProxyC, logProxyC, blockNum) + return mockClient, nil })) return client, err @@ -274,7 +401,7 @@ func TestSubscribeNewHead(t *testing.T) { mockNetwork := NewMockNetwork(ctx, mockCtrl) - client, err := NewMockSafeClient(ctx, mockCtrl, logger, mockNetwork) + client, err := NewMockSafeClientFromNetwork(ctx, mockCtrl, logger, mockNetwork) assert.NoError(t, err) mockClient := client.Client.(*MockEthClient) @@ -361,7 +488,7 @@ func TestSubscribeFilterLogs(t *testing.T) { mockNetwork := NewMockNetwork(ctx, mockCtrl) - client, err := NewMockSafeClient(ctx, mockCtrl, logger, mockNetwork) + client, err := NewMockSafeClientFromNetwork(ctx, mockCtrl, logger, mockNetwork) assert.NoError(t, err) mockClient := client.Client.(*MockEthClient) @@ -416,3 +543,52 @@ func TestSubscribeFilterLogs(t *testing.T) { fmt.Println("log", log.BlockNumber) } } + +func TestLogCache(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + logger, err := logging.NewZapLogger("development") + assert.NoError(t, err) + + blockNum := uint64(0) + headerProxyC := make(chan *types.Header) + logProxyC := make(chan types.Log) + + client, err := NewMockSafeClientControllable(ctx, mockCtrl, logger, headerProxyC, logProxyC, &blockNum) + assert.NoError(t, err) + + logCh := make(chan types.Log, 10) + _, err = client.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, logCh) + assert.NoError(t, err) + + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(100 * time.Millisecond): + fmt.Println("sending log") + logProxyC <- types.Log{BlockNumber: 1, BlockHash: common.Hash{1}} + } + } + }() + + time.Sleep(2 * time.Second) + + assert.Equal(t, 1, len(logCh)) + + logProxyC <- types.Log{BlockNumber: 2, BlockHash: common.Hash{2}} + + time.Sleep(2 * time.Second) + + assert.Equal(t, 2, len(logCh)) + + log := <-logCh + assert.Equal(t, uint64(1), log.BlockNumber) + log = <-logCh + assert.Equal(t, uint64(2), log.BlockNumber) +} diff --git a/core/safeclient/utils.go b/core/safeclient/utils.go index 23461d53..d5d9d6d1 100644 --- a/core/safeclient/utils.go +++ b/core/safeclient/utils.go @@ -1,9 +1,13 @@ package safeclient import ( + "crypto/sha256" + "math/big" + "github.com/Layr-Labs/eigensdk-go/chainio/clients/eth" "github.com/Layr-Labs/eigensdk-go/logging" rpccalls "github.com/Layr-Labs/eigensdk-go/metrics/collectors/rpc_calls" + "github.com/ethereum/go-ethereum/core/types" ) func createDefaultClient(rpcUrl string, logger logging.Logger) (eth.Client, error) { @@ -23,3 +27,16 @@ func createInstrumentedClient(rpcUrl string, collector *rpccalls.Collector, logg logger.Debug("Created new instrumented eth client with collector") return client, nil } + +func hashLog(log *types.Log) [32]byte { + h := sha256.New() + + log.EncodeRLP(h) + + // EncodeRLP only serializes the address, topics and data, so adding some additional block and tx info + h.Write(log.BlockHash.Bytes()) + h.Write(log.TxHash.Bytes()) + h.Write(new(big.Int).SetUint64(uint64(log.Index)).Bytes()) + + return [32]byte(h.Sum(nil)) +} diff --git a/go.mod b/go.mod index 05ab9fc9..6a51a62e 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/mattn/go-sqlite3 v1.14.22 // indirect diff --git a/go.sum b/go.sum index dec6fb6a..d617e522 100644 --- a/go.sum +++ b/go.sum @@ -160,6 +160,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rH github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpxn4uE= github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4 h1:X4egAf/gcS1zATw6wn4Ej8vjuVGxeHdan+bRb2ebyv4= github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4/go.mod h1:5GuXa7vkL8u9FkFuWdVvfR5ix8hRB7DbOAaYULamFpc= github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao= @@ -410,8 +412,6 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= -google.golang.org/grpc v1.58.3 h1:BjnpXut1btbtgN/6sp+brB2Kbm2LjNXnidYujAVbSoQ= -google.golang.org/grpc v1.58.3/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=