diff --git a/operator/attestor/attestor.go b/operator/attestor/attestor.go index ef283b16..82581d1a 100644 --- a/operator/attestor/attestor.go +++ b/operator/attestor/attestor.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "errors" + "sync" "time" "github.com/Layr-Labs/eigensdk-go/chainio/clients/eth" @@ -26,6 +27,7 @@ const ( MQ_REBROADCAST_TIMEOUT = 15 * time.Second RECONNECTION_ATTEMPTS = 5 RECONNECTION_DELAY = time.Second + REINITIALIZE_DELAY = time.Minute ) var ( @@ -49,6 +51,7 @@ type Attestor struct { signedRootC chan messages.SignedStateRootUpdateMessage rollupIdsToUrls map[uint32]string clients map[uint32]eth.Client + clientsLock sync.Mutex rpcCallsCollectors map[uint32]*rpccalls.Collector notifier Notifier consumer *consumer.Consumer @@ -200,23 +203,42 @@ func (attestor *Attestor) reconnectClient(rollupId uint32) (eth.Client, error) { // Spawns routines for new headers that die in one minute func (attestor *Attestor) processRollupHeaders(rollupId uint32, headersC chan *ethtypes.Header, subscription ethereum.Subscription, ctx context.Context) { + reinitializeTicker := time.NewTicker(REINITIALIZE_DELAY) + reinitializeTicker.Stop() + + defer reinitializeTicker.Stop() + + reinitializeSubscription := func() error { + client, err := attestor.reconnectClient(rollupId) + if err != nil { + attestor.logger.Error("Error while reconnecting client", "rollupId", rollupId, "err", err) + return err + } + + attestor.clientsLock.Lock() + attestor.clients[rollupId] = client + attestor.clientsLock.Unlock() + + newSubscription, err := client.SubscribeNewHead(ctx, headersC) + if err != nil { + attestor.logger.Error("Error while subscribing", "rollupId", rollupId, "err", err) + return err + } + + subscription.Unsubscribe() + subscription = newSubscription + + return nil + } + for { select { case <-subscription.Err(): - subscription.Unsubscribe() - - client, err := attestor.reconnectClient(rollupId) + attestor.logger.Error("Header subscription error", "rollupId", rollupId) + err := reinitializeSubscription() if err != nil { - return + reinitializeTicker.Reset(REINITIALIZE_DELAY) } - attestor.clients[rollupId] = client - - subscription, err = client.SubscribeNewHead(ctx, headersC) - if err != nil { - return - } - - continue case header, ok := <-headersC: if !ok { @@ -224,7 +246,14 @@ func (attestor *Attestor) processRollupHeaders(rollupId uint32, headersC chan *e } go attestor.processHeader(rollupId, header, ctx) - continue + + case <-reinitializeTicker.C: + attestor.logger.Info("Reinitializing header subscription", "rollupId", rollupId) + + err := reinitializeSubscription() + if err == nil { + reinitializeTicker.Stop() + } case <-ctx.Done(): subscription.Unsubscribe()