Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
ws unfreeze (erigontech#7724)
Browse files Browse the repository at this point in the history
attempt to address next issue: 

> when I'm having a lot of websocket connections the node is freezing
and then it needs like 10 mins to sync. Then if I keep pushing requests
it falls out of sync all the time
  • Loading branch information
AskAlexSharov authored Jun 14, 2023
1 parent 8b312d5 commit 1b14785
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 43 deletions.
20 changes: 8 additions & 12 deletions cmd/rpcdaemon/commands/eth_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,11 @@ func (api *APIImpl) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
if h != nil {
err := notifier.Notify(rpcSub.ID, h)
if err != nil {
log.Warn("error while notifying subscription", "err", err)
return
log.Warn("[rpc] error while notifying subscription", "err", err)
}
}
if !ok {
log.Warn("new heads channel was closed")
log.Warn("[rpc] new heads channel was closed")
return
}
case <-rpcSub.Err():
Expand Down Expand Up @@ -186,13 +185,12 @@ func (api *APIImpl) NewPendingTransactions(ctx context.Context) (*rpc.Subscripti
if t != nil {
err := notifier.Notify(rpcSub.ID, t.Hash())
if err != nil {
log.Warn("error while notifying subscription", "err", err)
return
log.Warn("[rpc] error while notifying subscription", "err", err)
}
}
}
if !ok {
log.Warn("new pending transactions channel was closed")
log.Warn("[rpc] new pending transactions channel was closed")
return
}
case <-rpcSub.Err():
Expand Down Expand Up @@ -228,13 +226,12 @@ func (api *APIImpl) NewPendingTransactionsWithBody(ctx context.Context) (*rpc.Su
if t != nil {
err := notifier.Notify(rpcSub.ID, t)
if err != nil {
log.Warn("error while notifying subscription", "err", err)
return
log.Warn("[rpc] error while notifying subscription", "err", err)
}
}
}
if !ok {
log.Warn("new pending transactions channel was closed")
log.Warn("[rpc] new pending transactions channel was closed")
return
}
case <-rpcSub.Err():
Expand Down Expand Up @@ -269,12 +266,11 @@ func (api *APIImpl) Logs(ctx context.Context, crit filters.FilterCriteria) (*rpc
if h != nil {
err := notifier.Notify(rpcSub.ID, h)
if err != nil {
log.Warn("error while notifying subscription", "err", err)
return
log.Warn("[rpc] error while notifying subscription", "err", err)
}
}
if !ok {
log.Warn("log channel was closed")
log.Warn("[rpc] log channel was closed")
return
}
case <-rpcSub.Err():
Expand Down
42 changes: 11 additions & 31 deletions turbo/rpchelper/subscription.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package rpchelper

import (
"context"
"sync"
"sync/atomic"
)

// a simple interface for subscriptions for rpc helper
Expand All @@ -12,12 +12,8 @@ type Sub[T any] interface {
}

type chan_sub[T any] struct {
ch chan T

closed chan struct{}

ctx context.Context
cn context.CancelFunc
ch chan T
closed atomic.Bool
}

// buffered channel
Expand All @@ -28,38 +24,23 @@ func newChanSub[T any](size int) *chan_sub[T] {
}
o := &chan_sub[T]{}
o.ch = make(chan T, size)
o.closed = make(chan struct{})
o.ctx, o.cn = context.WithCancel(context.Background())
return o
}
func (s *chan_sub[T]) Send(x T) {
if s.closed.Load() {
return
}

select {
// if the output buffer is empty, send
case s.ch <- x:
// if sub is canceled, dispose message
case <-s.ctx.Done():
// the sub is overloaded, dispose message
default:
default: // the sub is overloaded, dispose message
}
}
func (s *chan_sub[T]) Close() {
select {
case <-s.ctx.Done():
if swapped := s.closed.CompareAndSwap(false, true); !swapped {
return
default:
}
// its possible for multiple goroutines to get to this point, if Close is called twice at the same time
// close the context - allows any sends to exit
s.cn()
select {
case s.closed <- struct{}{}:
// but it is not possible for multiple goroutines to get to this point
// drain the channel
for range s.ch {
}
close(s.ch)
default:
}
close(s.ch)
}

func NewSyncMap[K comparable, T any]() *SyncMap[K, T] {
Expand Down Expand Up @@ -117,8 +98,7 @@ func (m *SyncMap[K, T]) Range(fn func(k K, v T) error) error {
return nil
}

func (m *SyncMap[K, T]) Delete(k K) (T, bool) {
var t T
func (m *SyncMap[K, T]) Delete(k K) (t T, deleted bool) {
m.mu.Lock()
defer m.mu.Unlock()
val, ok := m.m[k]
Expand Down

0 comments on commit 1b14785

Please sign in to comment.