Skip to content

Commit

Permalink
Add cache-based duplicate log detection to SafeEthClient (#180)
Browse files Browse the repository at this point in the history
* chore: Add hashicorp lru cache dependency

* feat: Cache past logs to prevent missed or duplicated logs

* feat: Filter logs from last block to make sure there are no missed

* feat: Add block and tx info to hashLog

* test: Add duplicate log test

* test: Add mock tx hash to network mock eth client log emissions

* test: Send proxy channels instead of returning pointers on controllable mock client

* test: Check enqueued logs on duplicate log test
  • Loading branch information
Hyodar authored May 24, 2024
1 parent 6ca24f9 commit ac7bf86
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 18 deletions.
31 changes: 21 additions & 10 deletions core/safeclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
lru "github.com/hashicorp/golang-lru/v2"
)

const (
Expand Down Expand Up @@ -147,6 +148,18 @@ func (s *SafeSubscription) SetUnderlyingSub(sub ethereum.Subscription) {
}

func (c *SafeEthClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) {
logCache, err := lru.New[[32]byte, any](100)
if err != nil {
c.logger.Error("Failed to create log cache", "err", err)
return nil, err
}

tryCacheLog := func(log *types.Log) bool {
hash := hashLog(log)
ok, _ := logCache.ContainsOrAdd(hash, nil)
return !ok
}

currentBlock, err := c.Client.BlockNumber(ctx)
if err != nil {
c.logger.Error("Failed to get current block number", "err", err)
Expand Down Expand Up @@ -187,7 +200,7 @@ func (c *SafeEthClient) SubscribeFilterLogs(ctx context.Context, q ethereum.Filt
rangeStartBlock = 0
}

fromBlock := max(lastBlock, rangeStartBlock) + 1
fromBlock := max(lastBlock, rangeStartBlock+1)

for ; fromBlock < currentBlock; fromBlock += (c.blockChunkSize + 1) {
toBlock := min(fromBlock+c.blockChunkSize, currentBlock)
Expand Down Expand Up @@ -230,8 +243,10 @@ func (c *SafeEthClient) SubscribeFilterLogs(ctx context.Context, q ethereum.Filt
safeSub.SetUnderlyingSub(newSub)

for _, log := range missedLogs {
lastBlock = max(lastBlock, log.BlockNumber)
ch <- log
if tryCacheLog(&log) {
lastBlock = max(lastBlock, log.BlockNumber)
ch <- log
}
}

return nil
Expand Down Expand Up @@ -262,14 +277,10 @@ func (c *SafeEthClient) SubscribeFilterLogs(ctx context.Context, q ethereum.Filt
c.logger.Debug("Safe subscription ended")
return
case log := <-proxyC:
// if that's the case, then most likely we got an event on filterLog and are getting the same one in the sub
if lastBlock > log.BlockNumber {
continue
if tryCacheLog(&log) {
lastBlock = max(lastBlock, log.BlockNumber)
ch <- log
}

// since resub pushes the missed blocks directly to the channel and updates lastBlock, this is ordered
lastBlock = log.BlockNumber
ch <- log
case <-ticker.C:
c.logger.Debug("Resub ticker fired")
handleResub()
Expand Down
188 changes: 182 additions & 6 deletions core/safeclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
Expand Down Expand Up @@ -133,7 +134,7 @@ func (m *MockEthClient) subscribeToClose() <-chan bool {
return ch
}

func NewMockEthClient(ctx context.Context, mockCtrl *gomock.Controller, mockNetwork *MockNetwork) *MockEthClient {
func NewMockEthClientFromNetwork(ctx context.Context, mockCtrl *gomock.Controller, mockNetwork *MockNetwork) *MockEthClient {
fmt.Println("creating mock client")

mockClient := &MockEthClient{
Expand Down Expand Up @@ -237,7 +238,9 @@ func NewMockEthClient(ctx context.Context, mockCtrl *gomock.Controller, mockNetw
continue
}

ch <- types.Log{BlockNumber: blockNum}
log := types.Log{BlockNumber: blockNum, Index: uint(blockNum)}

ch <- log
}
}
}()
Expand All @@ -254,9 +257,133 @@ func NewMockEthClient(ctx context.Context, mockCtrl *gomock.Controller, mockNetw
return mockClient
}

func NewMockSafeClient(ctx context.Context, mockCtrl *gomock.Controller, logger logging.Logger, mockNetwork *MockNetwork) (*safeclient.SafeEthClient, error) {
func NewMockSafeClientFromNetwork(ctx context.Context, mockCtrl *gomock.Controller, logger logging.Logger, mockNetwork *MockNetwork) (*safeclient.SafeEthClient, error) {
client, err := safeclient.NewSafeEthClient("", logger, safeclient.WithCustomCreateClient(func(rpcUrl string, logger logging.Logger) (eth.Client, error) {
return NewMockEthClientFromNetwork(ctx, mockCtrl, mockNetwork), nil
}))

return client, err
}

func NewMockClientControllable(ctx context.Context, mockCtrl *gomock.Controller, headerProxyC <-chan *types.Header, logProxyC <-chan types.Log, blockNum *uint64) (mockClient *MockEthClient) {
fmt.Println("creating mock client")

mockClient = &MockEthClient{
MockEthClient: mocks.NewMockEthClient(mockCtrl),
}

mockClient.EXPECT().SubscribeNewHead(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
if mockClient.isClosed {
return nil, errors.New("connection refused")
}

sub := mocks.NewMockSubscription(mockCtrl)

closeCh := mockClient.subscribeToClose()

subErrCh := make(chan error)
stopCh := make(chan struct{})

sub.EXPECT().Err().Return(subErrCh).AnyTimes()
sub.EXPECT().Unsubscribe().Do(func() {
close(stopCh)
}).AnyTimes()

go func() {
for {
select {
case <-stopCh:
return
case <-ctx.Done():
return
case closed := <-closeCh:
fmt.Println("closed", closed)

closeCh = mockClient.subscribeToClose()
if closed {
subErrCh <- errors.New("connection refused")
}
case header := <-headerProxyC:
if mockClient.isClosed {
continue
}

if !mockClient.isPaused {
ch <- header
}
}
}
}()

return sub, nil
},
).AnyTimes()

mockClient.EXPECT().SubscribeFilterLogs(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) {
if mockClient.isClosed {
return nil, errors.New("connection refused")
}

sub := mocks.NewMockSubscription(mockCtrl)

closeCh := mockClient.subscribeToClose()

subErrCh := make(chan error)
stopCh := make(chan struct{})

sub.EXPECT().Err().Return(subErrCh).AnyTimes()
sub.EXPECT().Unsubscribe().Do(func() {
close(stopCh)
}).AnyTimes()

go func() {
for {
select {
case <-stopCh:
fmt.Println("subscription done")
return
case <-ctx.Done():
fmt.Println("subscription done")
return
case closed := <-closeCh:
fmt.Println("closed", closed)

closeCh = mockClient.subscribeToClose()

if closed {
subErrCh <- errors.New("connection refused")
}
case log := <-logProxyC:
if mockClient.isClosed {
continue
}

if !mockClient.isPaused {
ch <- log
}
}
}
}()

return sub, nil
},
).AnyTimes()

mockClient.EXPECT().BlockNumber(gomock.Any()).DoAndReturn(
func(ctx context.Context) (uint64, error) {
return *blockNum, nil
},
).AnyTimes()

return mockClient
}

func NewMockSafeClientControllable(ctx context.Context, mockCtrl *gomock.Controller, logger logging.Logger, headerProxyC <-chan *types.Header, logProxyC <-chan types.Log, blockNum *uint64) (*safeclient.SafeEthClient, error) {
client, err := safeclient.NewSafeEthClient("", logger, safeclient.WithCustomCreateClient(func(rpcUrl string, logger logging.Logger) (eth.Client, error) {
return NewMockEthClient(ctx, mockCtrl, mockNetwork), nil
mockClient := NewMockClientControllable(ctx, mockCtrl, headerProxyC, logProxyC, blockNum)
return mockClient, nil
}))

return client, err
Expand All @@ -274,7 +401,7 @@ func TestSubscribeNewHead(t *testing.T) {

mockNetwork := NewMockNetwork(ctx, mockCtrl)

client, err := NewMockSafeClient(ctx, mockCtrl, logger, mockNetwork)
client, err := NewMockSafeClientFromNetwork(ctx, mockCtrl, logger, mockNetwork)
assert.NoError(t, err)
mockClient := client.Client.(*MockEthClient)

Expand Down Expand Up @@ -361,7 +488,7 @@ func TestSubscribeFilterLogs(t *testing.T) {

mockNetwork := NewMockNetwork(ctx, mockCtrl)

client, err := NewMockSafeClient(ctx, mockCtrl, logger, mockNetwork)
client, err := NewMockSafeClientFromNetwork(ctx, mockCtrl, logger, mockNetwork)
assert.NoError(t, err)

mockClient := client.Client.(*MockEthClient)
Expand Down Expand Up @@ -416,3 +543,52 @@ func TestSubscribeFilterLogs(t *testing.T) {
fmt.Println("log", log.BlockNumber)
}
}

func TestLogCache(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

logger, err := logging.NewZapLogger("development")
assert.NoError(t, err)

blockNum := uint64(0)
headerProxyC := make(chan *types.Header)
logProxyC := make(chan types.Log)

client, err := NewMockSafeClientControllable(ctx, mockCtrl, logger, headerProxyC, logProxyC, &blockNum)
assert.NoError(t, err)

logCh := make(chan types.Log, 10)
_, err = client.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, logCh)
assert.NoError(t, err)

go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(100 * time.Millisecond):
fmt.Println("sending log")
logProxyC <- types.Log{BlockNumber: 1, BlockHash: common.Hash{1}}
}
}
}()

time.Sleep(2 * time.Second)

assert.Equal(t, 1, len(logCh))

logProxyC <- types.Log{BlockNumber: 2, BlockHash: common.Hash{2}}

time.Sleep(2 * time.Second)

assert.Equal(t, 2, len(logCh))

log := <-logCh
assert.Equal(t, uint64(1), log.BlockNumber)
log = <-logCh
assert.Equal(t, uint64(2), log.BlockNumber)
}
17 changes: 17 additions & 0 deletions core/safeclient/utils.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package safeclient

import (
"crypto/sha256"
"math/big"

"github.com/Layr-Labs/eigensdk-go/chainio/clients/eth"
"github.com/Layr-Labs/eigensdk-go/logging"
rpccalls "github.com/Layr-Labs/eigensdk-go/metrics/collectors/rpc_calls"
"github.com/ethereum/go-ethereum/core/types"
)

func createDefaultClient(rpcUrl string, logger logging.Logger) (eth.Client, error) {
Expand All @@ -23,3 +27,16 @@ func createInstrumentedClient(rpcUrl string, collector *rpccalls.Collector, logg
logger.Debug("Created new instrumented eth client with collector")
return client, nil
}

func hashLog(log *types.Log) [32]byte {
h := sha256.New()

log.EncodeRLP(h)

// EncodeRLP only serializes the address, topics and data, so adding some additional block and tx info
h.Write(log.BlockHash.Bytes())
h.Write(log.TxHash.Bytes())
h.Write(new(big.Int).SetUint64(uint64(log.Index)).Bytes())

return [32]byte(h.Sum(nil))
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/mattn/go-sqlite3 v1.14.22 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rH
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpxn4uE=
github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4 h1:X4egAf/gcS1zATw6wn4Ej8vjuVGxeHdan+bRb2ebyv4=
github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4/go.mod h1:5GuXa7vkL8u9FkFuWdVvfR5ix8hRB7DbOAaYULamFpc=
github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao=
Expand Down Expand Up @@ -410,8 +412,6 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/grpc v1.58.3 h1:BjnpXut1btbtgN/6sp+brB2Kbm2LjNXnidYujAVbSoQ=
google.golang.org/grpc v1.58.3/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
Expand Down

0 comments on commit ac7bf86

Please sign in to comment.