Skip to content

Commit

Permalink
Fix flaky SafeEthClient tests (#208)
Browse files Browse the repository at this point in the history
* Avoid data races in `TestSubscribeFilterLogs`

* Add locks to `NewMockEthClientFromNetwork`

* Properly flush head messages

* Remove logging from `TestLogCache`
  • Loading branch information
emlautarom1 authored Jun 5, 2024
1 parent 43f66b4 commit 3cd8704
Showing 1 changed file with 75 additions and 28 deletions.
103 changes: 75 additions & 28 deletions core/safeclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
type MockNetwork struct {
blockTicker *time.Ticker
blockNum uint64
blockNumLock sync.Mutex
blockSubscribers []chan<- uint64
blockSubscribersLock sync.Mutex
}
Expand All @@ -44,12 +45,14 @@ func NewMockNetwork(ctx context.Context, mockCtrl *gomock.Controller) *MockNetwo
case <-ctx.Done():
return
case <-mockNetwork.blockTicker.C:
mockNetwork.blockNumLock.Lock()
mockNetwork.blockNum++
mockNetwork.blockNumLock.Unlock()

mockNetwork.blockSubscribersLock.Lock()
for _, ch := range mockNetwork.blockSubscribers {
ch <- mockNetwork.blockNum
}

mockNetwork.blockSubscribersLock.Lock()
mockNetwork.blockSubscribers = nil
mockNetwork.blockSubscribersLock.Unlock()
}
Expand All @@ -72,11 +75,15 @@ func (m *MockNetwork) ResumeBlockProduction() {
}

func (m *MockNetwork) BlockNumber() uint64 {
m.blockNumLock.Lock()
defer m.blockNumLock.Unlock()

return m.blockNum
}

type MockEthClient struct {
*mocks.MockEthClient
stateLock sync.Mutex
isClosed bool
isPaused bool
closeSubscribers []chan<- bool
Expand Down Expand Up @@ -113,14 +120,23 @@ func (m *MockNetwork) SubscribeToBlocks() <-chan uint64 {
}

func (m *MockEthClient) ReopenConnection() {
m.stateLock.Lock()
defer m.stateLock.Unlock()

m.isClosed = false
}

func (m *MockEthClient) PauseHeaderSubscriptions() {
m.stateLock.Lock()
defer m.stateLock.Unlock()

m.isPaused = true
}

func (m *MockEthClient) ResumeHeaderSubscriptions() {
m.stateLock.Lock()
defer m.stateLock.Unlock()

m.isPaused = false
}

Expand All @@ -143,7 +159,11 @@ func NewMockEthClientFromNetwork(ctx context.Context, mockCtrl *gomock.Controlle

mockClient.EXPECT().SubscribeNewHead(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
if mockClient.isClosed {
mockClient.stateLock.Lock()
isClosed := mockClient.isClosed
mockClient.stateLock.Unlock()

if isClosed {
return nil, errors.New("connection refused")
}

Expand Down Expand Up @@ -175,15 +195,20 @@ func NewMockEthClientFromNetwork(ctx context.Context, mockCtrl *gomock.Controlle
subErrCh <- errors.New("connection refused")
}
case blockNum := <-blockCh:
fmt.Println("header block", blockNum, "closed", mockClient.isClosed, "paused", mockClient.isPaused)
mockClient.stateLock.Lock()
isClosed := mockClient.isClosed
isPaused := mockClient.isPaused
mockClient.stateLock.Unlock()

fmt.Println("header block", blockNum, "closed", isClosed, "paused", isPaused)

blockCh = mockNetwork.SubscribeToBlocks()

if mockClient.isClosed {
if isClosed {
continue
}

if !mockClient.isPaused {
if !isPaused {
ch <- &types.Header{Number: big.NewInt(int64(blockNum))}
}
}
Expand All @@ -195,7 +220,10 @@ func NewMockEthClientFromNetwork(ctx context.Context, mockCtrl *gomock.Controlle

mockClient.EXPECT().SubscribeFilterLogs(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) {
if mockClient.isClosed {
mockClient.stateLock.Lock()
isClosed := mockClient.isClosed
mockClient.stateLock.Unlock()
if isClosed {
return nil, errors.New("connection refused")
}

Expand Down Expand Up @@ -234,7 +262,10 @@ func NewMockEthClientFromNetwork(ctx context.Context, mockCtrl *gomock.Controlle

blockCh = mockNetwork.SubscribeToBlocks()

if mockClient.isClosed {
mockClient.stateLock.Lock()
isClosed := mockClient.isClosed
mockClient.stateLock.Unlock()
if isClosed {
continue
}

Expand All @@ -250,6 +281,9 @@ func NewMockEthClientFromNetwork(ctx context.Context, mockCtrl *gomock.Controlle

mockClient.EXPECT().BlockNumber(gomock.Any()).DoAndReturn(
func(ctx context.Context) (uint64, error) {
mockNetwork.blockNumLock.Lock()
defer mockNetwork.blockNumLock.Unlock()

return mockNetwork.blockNum, nil
},
).AnyTimes()
Expand Down Expand Up @@ -414,8 +448,7 @@ func TestSubscribeNewHead(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

logger, err := logging.NewZapLogger("development")
assert.NoError(t, err)
logger := logging.NewNoopLogger()

mockNetwork := NewMockNetwork(ctx, mockCtrl)

Expand All @@ -427,13 +460,17 @@ func TestSubscribeNewHead(t *testing.T) {
mockClient := client.Client.(*MockEthClient)

headCh := make(chan *types.Header)
flushHeadCh := func() {
select {
case <-headCh:
case <-time.After(100 * time.Millisecond):
flushHeadCh := func() int {
headCount := 0
for {
select {
case <-headCh:
headCount++
case <-time.After(2 * time.Second):
return headCount
}
}
}

_, err = client.SubscribeNewHead(ctx, headCh)
assert.NoError(t, err)

Expand All @@ -454,7 +491,8 @@ func TestSubscribeNewHead(t *testing.T) {

mockNetwork.PauseBlockProduction()
block := mockNetwork.BlockNumber()
flushHeadCh()
flushedHeadCount := flushHeadCh()
fmt.Println("flushed", flushedHeadCount)
mockNetwork.ResumeBlockProduction()

for i := block + 1; i <= block+3; i++ {
Expand All @@ -470,7 +508,8 @@ func TestSubscribeNewHead(t *testing.T) {

mockNetwork.PauseBlockProduction()
block = mockNetwork.BlockNumber()
flushHeadCh()
flushedHeadCount = flushHeadCh()
fmt.Println("flushed", flushedHeadCount)
mockNetwork.ResumeBlockProduction()

for i := block + 1; i <= block+3; i++ {
Expand All @@ -486,7 +525,8 @@ func TestSubscribeNewHead(t *testing.T) {

mockNetwork.PauseBlockProduction()
block = mockNetwork.BlockNumber()
flushHeadCh()
flushedHeadCount = flushHeadCh()
fmt.Println("flushed", flushedHeadCount)
mockNetwork.ResumeBlockProduction()

for i := block + 1; i <= block+3; i++ {
Expand All @@ -504,8 +544,7 @@ func TestSubscribeFilterLogs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

logger, err := logging.NewZapLogger("development")
assert.NoError(t, err)
logger := logging.NewNoopLogger()

mockNetwork := NewMockNetwork(ctx, mockCtrl)

Expand All @@ -517,10 +556,15 @@ func TestSubscribeFilterLogs(t *testing.T) {
mockClient := client.Client.(*MockEthClient)

logCh := make(chan types.Log)
flushLogCh := func() {
select {
case <-logCh:
case <-time.After(100 * time.Millisecond):
flushLogCh := func() int {
logCount := 0
for {
select {
case <-logCh:
logCount++
case <-time.After(2 * time.Second):
return logCount
}
}
}

Expand All @@ -540,7 +584,9 @@ func TestSubscribeFilterLogs(t *testing.T) {

mockNetwork.PauseBlockProduction()
block := mockNetwork.BlockNumber()
flushLogCh()
fmt.Println("network paused", "block", block)
flushedLogCount := flushLogCh()
fmt.Println("flushed", flushedLogCount)
mockNetwork.ResumeBlockProduction()

for i := block + 1; i <= block+3; i++ {
Expand All @@ -556,7 +602,9 @@ func TestSubscribeFilterLogs(t *testing.T) {

mockNetwork.PauseBlockProduction()
block = mockNetwork.BlockNumber()
flushLogCh()
fmt.Println("Network paused at block number", block)
flushedLogCount = flushLogCh()
fmt.Println("flushLogCh", "flushed", flushedLogCount, "logs")
mockNetwork.ResumeBlockProduction()

for i := block + 1; i <= block+3; i++ {
Expand All @@ -574,8 +622,7 @@ func TestLogCache(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

logger, err := logging.NewZapLogger("development")
assert.NoError(t, err)
logger := logging.NewNoopLogger()

blockNum := uint64(0)
headerProxyC := make(chan *types.Header)
Expand Down

0 comments on commit 3cd8704

Please sign in to comment.