Skip to content

Commit

Permalink
Reinitialize Attestor client indefinitely (#151)
Browse files Browse the repository at this point in the history
* feat: Reinitialize Attestor client indefinitely

* fix: Prevent concurrent write to clients map

* refactor: Remove unnecessary continues

* fix: Only unsubscribe when updating subscription

* fix: Make sure ticker is stopped
  • Loading branch information
Hyodar authored May 12, 2024
1 parent acbde0a commit 21c4b20
Showing 1 changed file with 42 additions and 13 deletions.
55 changes: 42 additions & 13 deletions operator/attestor/attestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"errors"
"sync"
"time"

"github.com/Layr-Labs/eigensdk-go/chainio/clients/eth"
Expand All @@ -26,6 +27,7 @@ const (
MQ_REBROADCAST_TIMEOUT = 15 * time.Second
RECONNECTION_ATTEMPTS = 5
RECONNECTION_DELAY = time.Second
REINITIALIZE_DELAY = time.Minute
)

var (
Expand All @@ -49,6 +51,7 @@ type Attestor struct {
signedRootC chan messages.SignedStateRootUpdateMessage
rollupIdsToUrls map[uint32]string
clients map[uint32]eth.Client
clientsLock sync.Mutex
rpcCallsCollectors map[uint32]*rpccalls.Collector
notifier Notifier
consumer *consumer.Consumer
Expand Down Expand Up @@ -200,31 +203,57 @@ func (attestor *Attestor) reconnectClient(rollupId uint32) (eth.Client, error) {

// Spawns routines for new headers that die in one minute
func (attestor *Attestor) processRollupHeaders(rollupId uint32, headersC chan *ethtypes.Header, subscription ethereum.Subscription, ctx context.Context) {
reinitializeTicker := time.NewTicker(REINITIALIZE_DELAY)
reinitializeTicker.Stop()

defer reinitializeTicker.Stop()

reinitializeSubscription := func() error {
client, err := attestor.reconnectClient(rollupId)
if err != nil {
attestor.logger.Error("Error while reconnecting client", "rollupId", rollupId, "err", err)
return err
}

attestor.clientsLock.Lock()
attestor.clients[rollupId] = client
attestor.clientsLock.Unlock()

newSubscription, err := client.SubscribeNewHead(ctx, headersC)
if err != nil {
attestor.logger.Error("Error while subscribing", "rollupId", rollupId, "err", err)
return err
}

subscription.Unsubscribe()
subscription = newSubscription

return nil
}

for {
select {
case <-subscription.Err():
subscription.Unsubscribe()

client, err := attestor.reconnectClient(rollupId)
attestor.logger.Error("Header subscription error", "rollupId", rollupId)
err := reinitializeSubscription()
if err != nil {
return
reinitializeTicker.Reset(REINITIALIZE_DELAY)
}
attestor.clients[rollupId] = client

subscription, err = client.SubscribeNewHead(ctx, headersC)
if err != nil {
return
}

continue

case header, ok := <-headersC:
if !ok {
return
}

go attestor.processHeader(rollupId, header, ctx)
continue

case <-reinitializeTicker.C:
attestor.logger.Info("Reinitializing header subscription", "rollupId", rollupId)

err := reinitializeSubscription()
if err == nil {
reinitializeTicker.Stop()
}

case <-ctx.Done():
subscription.Unsubscribe()
Expand Down

0 comments on commit 21c4b20

Please sign in to comment.