Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky SafeEthClient tests #208

Merged
merged 5 commits into from
Jun 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 75 additions & 28 deletions core/safeclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
type MockNetwork struct {
blockTicker *time.Ticker
blockNum uint64
blockNumLock sync.Mutex
blockSubscribers []chan<- uint64
blockSubscribersLock sync.Mutex
}
Expand All @@ -44,12 +45,14 @@ func NewMockNetwork(ctx context.Context, mockCtrl *gomock.Controller) *MockNetwo
case <-ctx.Done():
return
case <-mockNetwork.blockTicker.C:
mockNetwork.blockNumLock.Lock()
mockNetwork.blockNum++
mockNetwork.blockNumLock.Unlock()

mockNetwork.blockSubscribersLock.Lock()
for _, ch := range mockNetwork.blockSubscribers {
ch <- mockNetwork.blockNum
}

mockNetwork.blockSubscribersLock.Lock()
mockNetwork.blockSubscribers = nil
mockNetwork.blockSubscribersLock.Unlock()
}
Expand All @@ -72,11 +75,15 @@ func (m *MockNetwork) ResumeBlockProduction() {
}

func (m *MockNetwork) BlockNumber() uint64 {
m.blockNumLock.Lock()
defer m.blockNumLock.Unlock()

return m.blockNum
}

type MockEthClient struct {
*mocks.MockEthClient
stateLock sync.Mutex
isClosed bool
isPaused bool
closeSubscribers []chan<- bool
Expand Down Expand Up @@ -113,14 +120,23 @@ func (m *MockNetwork) SubscribeToBlocks() <-chan uint64 {
}

func (m *MockEthClient) ReopenConnection() {
m.stateLock.Lock()
defer m.stateLock.Unlock()

m.isClosed = false
}

func (m *MockEthClient) PauseHeaderSubscriptions() {
m.stateLock.Lock()
defer m.stateLock.Unlock()

m.isPaused = true
}

func (m *MockEthClient) ResumeHeaderSubscriptions() {
m.stateLock.Lock()
defer m.stateLock.Unlock()

m.isPaused = false
}

Expand All @@ -143,7 +159,11 @@ func NewMockEthClientFromNetwork(ctx context.Context, mockCtrl *gomock.Controlle

mockClient.EXPECT().SubscribeNewHead(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
if mockClient.isClosed {
mockClient.stateLock.Lock()
isClosed := mockClient.isClosed
mockClient.stateLock.Unlock()

if isClosed {
return nil, errors.New("connection refused")
}

Expand Down Expand Up @@ -175,15 +195,20 @@ func NewMockEthClientFromNetwork(ctx context.Context, mockCtrl *gomock.Controlle
subErrCh <- errors.New("connection refused")
}
case blockNum := <-blockCh:
fmt.Println("header block", blockNum, "closed", mockClient.isClosed, "paused", mockClient.isPaused)
mockClient.stateLock.Lock()
isClosed := mockClient.isClosed
isPaused := mockClient.isPaused
mockClient.stateLock.Unlock()

fmt.Println("header block", blockNum, "closed", isClosed, "paused", isPaused)

blockCh = mockNetwork.SubscribeToBlocks()

if mockClient.isClosed {
if isClosed {
continue
}

if !mockClient.isPaused {
if !isPaused {
ch <- &types.Header{Number: big.NewInt(int64(blockNum))}
}
}
Expand All @@ -195,7 +220,10 @@ func NewMockEthClientFromNetwork(ctx context.Context, mockCtrl *gomock.Controlle

mockClient.EXPECT().SubscribeFilterLogs(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) {
if mockClient.isClosed {
mockClient.stateLock.Lock()
isClosed := mockClient.isClosed
mockClient.stateLock.Unlock()
if isClosed {
return nil, errors.New("connection refused")
}

Expand Down Expand Up @@ -234,7 +262,10 @@ func NewMockEthClientFromNetwork(ctx context.Context, mockCtrl *gomock.Controlle

blockCh = mockNetwork.SubscribeToBlocks()

if mockClient.isClosed {
mockClient.stateLock.Lock()
isClosed := mockClient.isClosed
mockClient.stateLock.Unlock()
if isClosed {
continue
}

Expand All @@ -250,6 +281,9 @@ func NewMockEthClientFromNetwork(ctx context.Context, mockCtrl *gomock.Controlle

mockClient.EXPECT().BlockNumber(gomock.Any()).DoAndReturn(
func(ctx context.Context) (uint64, error) {
mockNetwork.blockNumLock.Lock()
defer mockNetwork.blockNumLock.Unlock()

return mockNetwork.blockNum, nil
},
).AnyTimes()
Expand Down Expand Up @@ -414,8 +448,7 @@ func TestSubscribeNewHead(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

logger, err := logging.NewZapLogger("development")
assert.NoError(t, err)
logger := logging.NewNoopLogger()

mockNetwork := NewMockNetwork(ctx, mockCtrl)

Expand All @@ -427,13 +460,17 @@ func TestSubscribeNewHead(t *testing.T) {
mockClient := client.Client.(*MockEthClient)

headCh := make(chan *types.Header)
flushHeadCh := func() {
select {
case <-headCh:
case <-time.After(100 * time.Millisecond):
flushHeadCh := func() int {
headCount := 0
for {
select {
case <-headCh:
headCount++
case <-time.After(2 * time.Second):
return headCount
}
}
}

_, err = client.SubscribeNewHead(ctx, headCh)
assert.NoError(t, err)

Expand All @@ -454,7 +491,8 @@ func TestSubscribeNewHead(t *testing.T) {

mockNetwork.PauseBlockProduction()
block := mockNetwork.BlockNumber()
flushHeadCh()
flushedHeadCount := flushHeadCh()
fmt.Println("flushed", flushedHeadCount)
mockNetwork.ResumeBlockProduction()

for i := block + 1; i <= block+3; i++ {
Expand All @@ -470,7 +508,8 @@ func TestSubscribeNewHead(t *testing.T) {

mockNetwork.PauseBlockProduction()
block = mockNetwork.BlockNumber()
flushHeadCh()
flushedHeadCount = flushHeadCh()
fmt.Println("flushed", flushedHeadCount)
mockNetwork.ResumeBlockProduction()

for i := block + 1; i <= block+3; i++ {
Expand All @@ -486,7 +525,8 @@ func TestSubscribeNewHead(t *testing.T) {

mockNetwork.PauseBlockProduction()
block = mockNetwork.BlockNumber()
flushHeadCh()
flushedHeadCount = flushHeadCh()
fmt.Println("flushed", flushedHeadCount)
mockNetwork.ResumeBlockProduction()

for i := block + 1; i <= block+3; i++ {
Expand All @@ -504,8 +544,7 @@ func TestSubscribeFilterLogs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

logger, err := logging.NewZapLogger("development")
assert.NoError(t, err)
logger := logging.NewNoopLogger()

mockNetwork := NewMockNetwork(ctx, mockCtrl)

Expand All @@ -517,10 +556,15 @@ func TestSubscribeFilterLogs(t *testing.T) {
mockClient := client.Client.(*MockEthClient)

logCh := make(chan types.Log)
flushLogCh := func() {
select {
case <-logCh:
case <-time.After(100 * time.Millisecond):
flushLogCh := func() int {
logCount := 0
for {
select {
case <-logCh:
logCount++
case <-time.After(2 * time.Second):
return logCount
}
}
}

Expand All @@ -540,7 +584,9 @@ func TestSubscribeFilterLogs(t *testing.T) {

mockNetwork.PauseBlockProduction()
block := mockNetwork.BlockNumber()
flushLogCh()
fmt.Println("network paused", "block", block)
flushedLogCount := flushLogCh()
fmt.Println("flushed", flushedLogCount)
mockNetwork.ResumeBlockProduction()

for i := block + 1; i <= block+3; i++ {
Expand All @@ -556,7 +602,9 @@ func TestSubscribeFilterLogs(t *testing.T) {

mockNetwork.PauseBlockProduction()
block = mockNetwork.BlockNumber()
flushLogCh()
fmt.Println("Network paused at block number", block)
flushedLogCount = flushLogCh()
fmt.Println("flushLogCh", "flushed", flushedLogCount, "logs")
mockNetwork.ResumeBlockProduction()

for i := block + 1; i <= block+3; i++ {
Expand All @@ -574,8 +622,7 @@ func TestLogCache(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

logger, err := logging.NewZapLogger("development")
assert.NoError(t, err)
logger := logging.NewNoopLogger()

blockNum := uint64(0)
headerProxyC := make(chan *types.Header)
Expand Down
Loading