From cdc20867a616a1b3df25d331e1e16fbf276cb60b Mon Sep 17 00:00:00 2001 From: Lautaro Emanuel Date: Mon, 3 Jun 2024 17:26:24 -0300 Subject: [PATCH 1/4] Avoid data races in `TestSubscribeFilterLogs` --- core/safeclient/client_test.go | 57 +++++++++++++++++++++++++++------- 1 file changed, 45 insertions(+), 12 deletions(-) diff --git a/core/safeclient/client_test.go b/core/safeclient/client_test.go index a4f97d4d..656c2fd2 100644 --- a/core/safeclient/client_test.go +++ b/core/safeclient/client_test.go @@ -24,6 +24,7 @@ import ( type MockNetwork struct { blockTicker *time.Ticker blockNum uint64 + blockNumLock sync.Mutex blockSubscribers []chan<- uint64 blockSubscribersLock sync.Mutex } @@ -44,12 +45,14 @@ func NewMockNetwork(ctx context.Context, mockCtrl *gomock.Controller) *MockNetwo case <-ctx.Done(): return case <-mockNetwork.blockTicker.C: + mockNetwork.blockNumLock.Lock() mockNetwork.blockNum++ + mockNetwork.blockNumLock.Unlock() + + mockNetwork.blockSubscribersLock.Lock() for _, ch := range mockNetwork.blockSubscribers { ch <- mockNetwork.blockNum } - - mockNetwork.blockSubscribersLock.Lock() mockNetwork.blockSubscribers = nil mockNetwork.blockSubscribersLock.Unlock() } @@ -72,11 +75,15 @@ func (m *MockNetwork) ResumeBlockProduction() { } func (m *MockNetwork) BlockNumber() uint64 { + m.blockNumLock.Lock() + defer m.blockNumLock.Unlock() + return m.blockNum } type MockEthClient struct { *mocks.MockEthClient + stateLock sync.Mutex isClosed bool isPaused bool closeSubscribers []chan<- bool @@ -113,14 +120,23 @@ func (m *MockNetwork) SubscribeToBlocks() <-chan uint64 { } func (m *MockEthClient) ReopenConnection() { + m.stateLock.Lock() + defer m.stateLock.Unlock() + m.isClosed = false } func (m *MockEthClient) PauseHeaderSubscriptions() { + m.stateLock.Lock() + defer m.stateLock.Unlock() + m.isPaused = true } func (m *MockEthClient) ResumeHeaderSubscriptions() { + m.stateLock.Lock() + defer m.stateLock.Unlock() + m.isPaused = false } @@ -195,7 +211,10 @@ func NewMockEthClientFromNetwork(ctx context.Context, mockCtrl *gomock.Controlle mockClient.EXPECT().SubscribeFilterLogs(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { - if mockClient.isClosed { + mockClient.stateLock.Lock() + isClosed := mockClient.isClosed + mockClient.stateLock.Unlock() + if isClosed { return nil, errors.New("connection refused") } @@ -234,7 +253,10 @@ func NewMockEthClientFromNetwork(ctx context.Context, mockCtrl *gomock.Controlle blockCh = mockNetwork.SubscribeToBlocks() - if mockClient.isClosed { + mockClient.stateLock.Lock() + isClosed := mockClient.isClosed + mockClient.stateLock.Unlock() + if isClosed { continue } @@ -250,6 +272,9 @@ func NewMockEthClientFromNetwork(ctx context.Context, mockCtrl *gomock.Controlle mockClient.EXPECT().BlockNumber(gomock.Any()).DoAndReturn( func(ctx context.Context) (uint64, error) { + mockNetwork.blockNumLock.Lock() + defer mockNetwork.blockNumLock.Unlock() + return mockNetwork.blockNum, nil }, ).AnyTimes() @@ -504,8 +529,7 @@ func TestSubscribeFilterLogs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger, err := logging.NewZapLogger("development") - assert.NoError(t, err) + logger := logging.NewNoopLogger() mockNetwork := NewMockNetwork(ctx, mockCtrl) @@ -517,10 +541,15 @@ func TestSubscribeFilterLogs(t *testing.T) { mockClient := client.Client.(*MockEthClient) logCh := make(chan types.Log) - flushLogCh := func() { - select { - case <-logCh: - case <-time.After(100 * time.Millisecond): + flushLogCh := func() int { + logCount := 0 + for { + select { + case <-logCh: + logCount++ + case <-time.After(2 * time.Second): + return logCount + } } } @@ -540,7 +569,9 @@ func TestSubscribeFilterLogs(t *testing.T) { mockNetwork.PauseBlockProduction() block := mockNetwork.BlockNumber() - flushLogCh() + fmt.Println("network paused", "block", block) + flushedLogCount := flushLogCh() + fmt.Println("flushed", flushedLogCount) mockNetwork.ResumeBlockProduction() for i := block + 1; i <= block+3; i++ { @@ -556,7 +587,9 @@ func TestSubscribeFilterLogs(t *testing.T) { mockNetwork.PauseBlockProduction() block = mockNetwork.BlockNumber() - flushLogCh() + fmt.Println("Network paused at block number", block) + flushedLogCount = flushLogCh() + fmt.Println("flushLogCh", "flushed", flushedLogCount, "logs") mockNetwork.ResumeBlockProduction() for i := block + 1; i <= block+3; i++ { From 2f6cceba65bb56734231855db3447996691c72e0 Mon Sep 17 00:00:00 2001 From: Lautaro Emanuel Date: Tue, 4 Jun 2024 12:41:31 -0300 Subject: [PATCH 2/4] Add locks to `NewMockEthClientFromNetwork` --- core/safeclient/client_test.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/core/safeclient/client_test.go b/core/safeclient/client_test.go index 656c2fd2..f6b364b8 100644 --- a/core/safeclient/client_test.go +++ b/core/safeclient/client_test.go @@ -159,7 +159,11 @@ func NewMockEthClientFromNetwork(ctx context.Context, mockCtrl *gomock.Controlle mockClient.EXPECT().SubscribeNewHead(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { - if mockClient.isClosed { + mockClient.stateLock.Lock() + isClosed := mockClient.isClosed + mockClient.stateLock.Unlock() + + if isClosed { return nil, errors.New("connection refused") } @@ -191,15 +195,20 @@ func NewMockEthClientFromNetwork(ctx context.Context, mockCtrl *gomock.Controlle subErrCh <- errors.New("connection refused") } case blockNum := <-blockCh: - fmt.Println("header block", blockNum, "closed", mockClient.isClosed, "paused", mockClient.isPaused) + mockClient.stateLock.Lock() + isClosed := mockClient.isClosed + isPaused := mockClient.isPaused + mockClient.stateLock.Unlock() + + fmt.Println("header block", blockNum, "closed", isClosed, "paused", isPaused) blockCh = mockNetwork.SubscribeToBlocks() - if mockClient.isClosed { + if isClosed { continue } - if !mockClient.isPaused { + if !isPaused { ch <- &types.Header{Number: big.NewInt(int64(blockNum))} } } From 5b38d8aa53efc0fd49f00549a88ade7b8cbb3907 Mon Sep 17 00:00:00 2001 From: Lautaro Emanuel Date: Tue, 4 Jun 2024 12:41:58 -0300 Subject: [PATCH 3/4] Properly flush head messages --- core/safeclient/client_test.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/core/safeclient/client_test.go b/core/safeclient/client_test.go index f6b364b8..2c3f2a2a 100644 --- a/core/safeclient/client_test.go +++ b/core/safeclient/client_test.go @@ -448,8 +448,7 @@ func TestSubscribeNewHead(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger, err := logging.NewZapLogger("development") - assert.NoError(t, err) + logger := logging.NewNoopLogger() mockNetwork := NewMockNetwork(ctx, mockCtrl) @@ -461,13 +460,17 @@ func TestSubscribeNewHead(t *testing.T) { mockClient := client.Client.(*MockEthClient) headCh := make(chan *types.Header) - flushHeadCh := func() { - select { - case <-headCh: - case <-time.After(100 * time.Millisecond): + flushHeadCh := func() int { + headCount := 0 + for { + select { + case <-headCh: + headCount++ + case <-time.After(2 * time.Second): + return headCount + } } } - _, err = client.SubscribeNewHead(ctx, headCh) assert.NoError(t, err) @@ -488,7 +491,8 @@ func TestSubscribeNewHead(t *testing.T) { mockNetwork.PauseBlockProduction() block := mockNetwork.BlockNumber() - flushHeadCh() + flushedHeadCount := flushHeadCh() + fmt.Println("flushed", flushedHeadCount) mockNetwork.ResumeBlockProduction() for i := block + 1; i <= block+3; i++ { @@ -504,7 +508,8 @@ func TestSubscribeNewHead(t *testing.T) { mockNetwork.PauseBlockProduction() block = mockNetwork.BlockNumber() - flushHeadCh() + flushedHeadCount = flushHeadCh() + fmt.Println("flushed", flushedHeadCount) mockNetwork.ResumeBlockProduction() for i := block + 1; i <= block+3; i++ { @@ -520,7 +525,8 @@ func TestSubscribeNewHead(t *testing.T) { mockNetwork.PauseBlockProduction() block = mockNetwork.BlockNumber() - flushHeadCh() + flushedHeadCount = flushHeadCh() + fmt.Println("flushed", flushedHeadCount) mockNetwork.ResumeBlockProduction() for i := block + 1; i <= block+3; i++ { From 8ecdee3e204efe34b3956344f34db37e65418c7a Mon Sep 17 00:00:00 2001 From: Lautaro Emanuel Date: Tue, 4 Jun 2024 12:46:22 -0300 Subject: [PATCH 4/4] Remove logging from `TestLogCache` --- core/safeclient/client_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/safeclient/client_test.go b/core/safeclient/client_test.go index 2c3f2a2a..c4349f98 100644 --- a/core/safeclient/client_test.go +++ b/core/safeclient/client_test.go @@ -622,8 +622,7 @@ func TestLogCache(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger, err := logging.NewZapLogger("development") - assert.NoError(t, err) + logger := logging.NewNoopLogger() blockNum := uint64(0) headerProxyC := make(chan *types.Header)