Skip to content

Commit

Permalink
Fix SafeEthClient concurrent close (#200)
Browse files Browse the repository at this point in the history
* Add `TestConcurrentClose`

* Use `sync.Once` to prevent double closing

* Add `wg.Wait` to wait for all goroutines to finish before finishing the test
  • Loading branch information
emlautarom1 authored May 30, 2024
1 parent 661537b commit eafed88
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 10 deletions.
16 changes: 6 additions & 10 deletions core/safeclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ type SafeEthClient struct {
logger logging.Logger
rpcUrl string
closeC chan struct{}
closed bool
headerTimeout time.Duration
logResubInterval time.Duration
blockChunkSize uint64
blockMaxRange uint64

createClient func(string, logging.Logger) (eth.Client, error)
onceClose sync.Once
}

func NewSafeEthClient(rpcUrl string, logger logging.Logger, opts ...SafeEthClientOption) (*SafeEthClient, error) {
Expand Down Expand Up @@ -301,15 +301,11 @@ func (c *SafeEthClient) SubscribeFilterLogs(ctx context.Context, q ethereum.Filt
}

func (c *SafeEthClient) Close() {
if c.closed {
return
}

close(c.closeC)
c.wg.Wait()
c.logger.Info("SafeEthClient closed")

c.closed = true
c.onceClose.Do(func() {
close(c.closeC)
c.wg.Wait()
c.logger.Info("SafeEthClient closed")
})
}

func (c *SafeEthClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
Expand Down
18 changes: 18 additions & 0 deletions core/safeclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,24 @@ func NewMockSafeClientControllable(ctx context.Context, mockCtrl *gomock.Control
return client, err
}

func TestConcurrentClose(t *testing.T) {
logger, err := logging.NewZapLogger("development")
assert.NoError(t, err)

client, err := safeclient.NewSafeEthClient("", logger, safeclient.WithCustomCreateClient(func(string, logging.Logger) (eth.Client, error) { return nil, nil }))
assert.NoError(t, err)

var wg sync.WaitGroup
for i := 1; i <= 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
client.Close()
}()
}
wg.Wait()
}

func TestSubscribeNewHead(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
Expand Down

0 comments on commit eafed88

Please sign in to comment.