From c5b13c843d62f95e33a4f410e54c4a5521ac7cd9 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Fri, 15 Mar 2024 12:00:31 +1000 Subject: [PATCH] fix: Refresh spec on epoch change (#160) * fix: Refresh spec on epoch change * chore: linting --- pkg/api/handler.go | 17 ++++- pkg/beacon/default.go | 113 +++++++++++++++++++++++--------- pkg/beacon/download.go | 28 ++++---- pkg/beacon/finality_provider.go | 2 +- pkg/service/eth/eth.go | 44 +++++++++++-- 5 files changed, 154 insertions(+), 50 deletions(-) diff --git a/pkg/api/handler.go b/pkg/api/handler.go index 319600f..e51b5a7 100644 --- a/pkg/api/handler.go +++ b/pkg/api/handler.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "net/http" + "strconv" "strings" "time" @@ -599,7 +600,21 @@ func (h *Handler) handleEthV1BeaconBlobSidecars(ctx context.Context, r *http.Req return NewBadRequestResponse(nil), err } - sidecars, err := h.eth.BlobSidecars(ctx, id) + queryParams := r.URL.Query() + indicesRaw := queryParams["indices"] + + indices := make([]int, 0, len(indicesRaw)) + + for _, index := range indicesRaw { + converted, errr := strconv.Atoi(index) + if errr != nil { + return NewBadRequestResponse(nil), errr + } + + indices = append(indices, converted) + } + + sidecars, err := h.eth.BlobSidecars(ctx, id, indices) if err != nil { return NewInternalServerErrorResponse(nil), err } diff --git a/pkg/beacon/default.go b/pkg/beacon/default.go index a201c8e..345a3c6 100644 --- a/pkg/beacon/default.go +++ b/pkg/beacon/default.go @@ -41,8 +41,9 @@ type Default struct { depositSnapshots *store.DepositSnapshot blobSidecars *store.BlobSidecar - spec *state.Spec - genesis *v1.Genesis + specMutex sync.Mutex + spec *state.Spec + genesis *v1.Genesis historicalSlotFailures map[phase0.Slot]int @@ -86,6 +87,7 @@ func NewDefaultProvider(namespace string, log logrus.FieldLogger, nodes []node.C servingMutex: sync.Mutex{}, historicalMutex: sync.Mutex{}, majorityMutex: sync.Mutex{}, + specMutex: sync.Mutex{}, metrics: NewMetrics(namespace + "_beacon"), } @@ -103,7 +105,7 @@ func (d *Default) Start(ctx context.Context) error { go func() { for { // Wait until we have a single healthy node. - _, err := d.nodes.Healthy(ctx).NotSyncing(ctx).RandomNode(ctx) + nd, err := d.nodes.Healthy(ctx).NotSyncing(ctx).RandomNode(ctx) if err != nil { d.log.WithError(err).Error("Waiting for a healthy, non-syncing node before beginning..") time.Sleep(time.Second * 5) @@ -111,10 +113,28 @@ func (d *Default) Start(ctx context.Context) error { continue } + nd.Beacon.Wallclock().OnEpochChanged(func(epoch ethwallclock.Epoch) { + // Refresh the spec on epoch change. + // This will intentionally use any node (not the one that triggered the event) to fetch the spec. + if err := d.refreshSpec(ctx); err != nil { + d.log.WithError(err).Error("Failed to refresh spec") + } + }) + if err := d.startCrons(ctx); err != nil { d.log.WithError(err).Fatal("Failed to start crons") } + go func() { + if err := d.startGenesisLoop(ctx); err != nil { + d.log.WithError(err).Fatal("Failed to start genesis loop") + } + }() + + if err := d.fetchUpstreamRequirements(ctx); err != nil { + d.log.WithError(err).Error("Failed to fetch upstream requirements") + } + break } }() @@ -124,7 +144,8 @@ func (d *Default) Start(ctx context.Context) error { n := node logCtx := d.log.WithFields(logrus.Fields{ - "node": n.Config.Name, + "node": n.Config.Name, + "reason": "serving_updater", }) n.Beacon.OnFinalityCheckpointUpdated(ctx, func(ctx context.Context, event *beacon.FinalityCheckpointUpdated) error { @@ -140,7 +161,13 @@ func (d *Default) Start(ctx context.Context) error { return err } - return d.checkForNewServingCheckpoint(ctx) + if err := d.checkForNewServingCheckpoint(ctx); err != nil { + logCtx.WithError(err).Error("Failed to check for new serving checkpoint after finality checkpoint updated") + + return err + } + + return nil }) n.Beacon.OnReady(ctx, func(ctx context.Context, _ *beacon.ReadyEvent) error { @@ -196,12 +223,6 @@ func (d *Default) startCrons(ctx context.Context) error { return err } - go func() { - if err := d.startGenesisLoop(ctx); err != nil { - d.log.WithError(err).Fatal("Failed to start genesis loop") - } - }() - go func() { if err := d.startServingLoop(ctx); err != nil { d.log.WithError(err).Fatal("Failed to start serving loop") @@ -219,6 +240,14 @@ func (d *Default) startCrons(ctx context.Context) error { return nil } +func (d *Default) fetchUpstreamRequirements(ctx context.Context) error { + if err := d.refreshSpec(ctx); err != nil { + return err + } + + return nil +} + func (d *Default) StartAsync(ctx context.Context) { go func() { if err := d.Start(ctx); err != nil { @@ -369,7 +398,7 @@ func (d *Default) Syncing(ctx context.Context) (*v1.SyncState, error) { SyncDistance: 0, } - sp, err := d.Spec(ctx) + sp, err := d.Spec() if err != nil { return syncState, err } @@ -405,12 +434,24 @@ func (d *Default) Genesis(ctx context.Context) (*v1.Genesis, error) { return d.genesis, nil } -func (d *Default) Spec(ctx context.Context) (*state.Spec, error) { +func (d *Default) setSpec(s *state.Spec) { + d.specMutex.Lock() + defer d.specMutex.Unlock() + + d.spec = s +} + +func (d *Default) Spec() (*state.Spec, error) { + d.specMutex.Lock() + defer d.specMutex.Unlock() + if d.spec == nil { return nil, errors.New("config spec not yet available") } - return d.spec, nil + copied := *d.spec + + return &copied, nil } func (d *Default) OperatingMode() OperatingMode { @@ -460,12 +501,7 @@ func (d *Default) checkFinality(ctx context.Context) error { return nil } -func (d *Default) checkBeaconSpec(ctx context.Context) error { - // No-Op if we already have a beacon spec - if d.spec != nil { - return nil - } - +func (d *Default) refreshSpec(ctx context.Context) error { d.log.Debug("Fetching beacon spec") upstream, err := d.nodes.Ready(ctx).DataProviders(ctx).RandomNode(ctx) @@ -479,13 +515,23 @@ func (d *Default) checkBeaconSpec(ctx context.Context) error { } // store the beacon state spec - d.spec = s + d.setSpec(s) - d.log.Info("Fetched beacon spec") + d.log.Debug("Fetched beacon spec") return nil } +func (d *Default) checkBeaconSpec(ctx context.Context) error { + // No-Op if we already have a beacon spec + _, err := d.Spec() + if err == nil { + return nil + } + + return d.refreshSpec(ctx) +} + func (d *Default) checkGenesisTime(ctx context.Context) error { // No-Op if we already have a genesis time if d.genesis != nil { @@ -520,7 +566,7 @@ func (d *Default) OnFinalityCheckpointHeadUpdated(ctx context.Context, cb func(c }) } -func (d *Default) publishFinalityCheckpointHeadUpdated(ctx context.Context, checkpoint *v1.Finality) { +func (d *Default) publishFinalityCheckpointHeadUpdated(_ context.Context, checkpoint *v1.Finality) { d.broker.Emit(topicFinalityHeadUpdated, checkpoint) } @@ -599,9 +645,10 @@ func (d *Default) GetBeaconStateByRoot(ctx context.Context, root phase0.Root) (* return d.states.GetByStateRoot(stateRoot) } -func (d *Default) storeBlock(ctx context.Context, block *spec.VersionedSignedBeaconBlock) error { - if d.spec == nil { - return errors.New("beacon chain spec is unknown") +func (d *Default) storeBlock(_ context.Context, block *spec.VersionedSignedBeaconBlock) error { + _, err := d.Spec() + if err != nil { + return err } if d.genesis == nil { @@ -679,7 +726,9 @@ func (d *Default) UpstreamsStatus(ctx context.Context) (map[string]*UpstreamStat func (d *Default) ListFinalizedSlots(ctx context.Context) ([]phase0.Slot, error) { slots := []phase0.Slot{} - if d.spec == nil { + + sp, err := d.Spec() + if err != nil { return slots, errors.New("no beacon chain spec available") } @@ -692,9 +741,9 @@ func (d *Default) ListFinalizedSlots(ctx context.Context) ([]phase0.Slot, error) return slots, errors.New("no finalized checkpoint available") } - latestSlot := phase0.Slot(uint64(finality.Finalized.Epoch) * uint64(d.spec.SlotsPerEpoch)) + latestSlot := phase0.Slot(uint64(finality.Finalized.Epoch) * uint64(sp.SlotsPerEpoch)) - for i, val := uint64(latestSlot), uint64(latestSlot)-uint64(d.spec.SlotsPerEpoch)*uint64(d.config.HistoricalEpochCount); i > val; i -= uint64(d.spec.SlotsPerEpoch) { + for i, val := uint64(latestSlot), uint64(latestSlot)-uint64(sp.SlotsPerEpoch)*uint64(d.config.HistoricalEpochCount); i > val; i -= uint64(sp.SlotsPerEpoch) { slots = append(slots, phase0.Slot(i)) } @@ -702,7 +751,8 @@ func (d *Default) ListFinalizedSlots(ctx context.Context) ([]phase0.Slot, error) } func (d *Default) GetEpochBySlot(ctx context.Context, slot phase0.Slot) (phase0.Epoch, error) { - if d.spec == nil { + _, err := d.Spec() + if err != nil { return phase0.Epoch(0), errors.New("no upstream beacon state spec available") } @@ -716,7 +766,8 @@ func (d *Default) PeerCount(ctx context.Context) (uint64, error) { func (d *Default) GetSlotTime(ctx context.Context, slot phase0.Slot) (eth.SlotTime, error) { SlotTime := eth.SlotTime{} - if d.spec == nil { + _, err := d.Spec() + if err != nil { return SlotTime, errors.New("no upstream beacon state spec available") } diff --git a/pkg/beacon/download.go b/pkg/beacon/download.go index 7074c02..11bc24d 100644 --- a/pkg/beacon/download.go +++ b/pkg/beacon/download.go @@ -23,6 +23,8 @@ func (d *Default) downloadServingCheckpoint(ctx context.Context, checkpoint *v1. return errors.New("finalized checkpoint is nil") } + d.log.WithField("epoch", checkpoint.Finalized.Epoch).Info("Downloading serving checkpoint") + upstream, err := d.nodes. Ready(ctx). DataProviders(ctx). @@ -44,10 +46,12 @@ func (d *Default) downloadServingCheckpoint(ctx context.Context, checkpoint *v1. return fmt.Errorf("failed to get slot from block: %w", err) } - // For simplicity we'll hardcode SLOTS_PER_EPOCH to 32. - // TODO(sam.calder-mason): Fetch this from a beacon node and store it in the instance. - const slotsPerEpoch = 32 - if blockSlot%slotsPerEpoch != 0 { + sp, err := d.Spec() + if err != nil { + return fmt.Errorf("failed to fetch spec: %w", err) + } + + if blockSlot%sp.SlotsPerEpoch != 0 { return fmt.Errorf("block slot is not aligned from an epoch boundary: %d", blockSlot) } @@ -129,8 +133,9 @@ func (d *Default) fetchHistoricalCheckpoints(ctx context.Context, checkpoint *v1 d.historicalMutex.Lock() defer d.historicalMutex.Unlock() - if d.spec == nil { - return errors.New("beacon spec unavailable") + sp, err := d.Spec() + if err != nil { + return errors.New("chain spec unavailable") } if d.genesis == nil { @@ -147,8 +152,6 @@ func (d *Default) fetchHistoricalCheckpoints(ctx context.Context, checkpoint *v1 return errors.New("no data provider node available") } - sp := d.spec - slotsInScope := make(map[phase0.Slot]struct{}) // We always care about the genesis slot. @@ -218,7 +221,8 @@ func (d *Default) downloadBlock(ctx context.Context, slot phase0.Slot, upstream } // Same thing with the chain spec. - if d.spec == nil { + _, err := d.Spec() + if err != nil { return nil, errors.New("chain spec not known") } @@ -326,9 +330,9 @@ func (d *Default) fetchBundle(ctx context.Context, root phase0.Root, upstream *N } } - sp, err := upstream.Beacon.Spec() + sp, err := d.Spec() if err != nil { - return nil, fmt.Errorf("failed to fetch spec from upstream node: %w", err) + return nil, fmt.Errorf("failed to fetch spec: %w", err) } denebFork, err := sp.ForkEpochs.GetByName("DENEB") @@ -341,7 +345,7 @@ func (d *Default) fetchBundle(ctx context.Context, root phase0.Root, upstream *N } } - d.log.Infof("Successfully fetched bundle from %s", upstream.Config.Name) + d.log.WithField("root", eth.RootAsString(root)).Infof("Successfully fetched bundle from %s", upstream.Config.Name) return block, nil } diff --git a/pkg/beacon/finality_provider.go b/pkg/beacon/finality_provider.go index f3eb0c8..108e3ba 100644 --- a/pkg/beacon/finality_provider.go +++ b/pkg/beacon/finality_provider.go @@ -33,7 +33,7 @@ type FinalityProvider interface { // Genesis returns the chain genesis. Genesis(ctx context.Context) (*v1.Genesis, error) // Spec returns the chain spec. - Spec(ctx context.Context) (*state.Spec, error) + Spec() (*state.Spec, error) // UpstreamsStatus returns the status of all the upstreams. UpstreamsStatus(ctx context.Context) (map[string]*UpstreamStatus, error) // GetBlockBySlot returns the block at the given slot. diff --git a/pkg/service/eth/eth.go b/pkg/service/eth/eth.go index 113f7c7..a09c24f 100644 --- a/pkg/service/eth/eth.go +++ b/pkg/service/eth/eth.go @@ -112,7 +112,7 @@ func (h *Handler) ConfigSpec(ctx context.Context) (*state.Spec, error) { } }() - return h.provider.Spec(ctx) + return h.provider.Spec() } // ForkSchedule returns the upcoming forks. @@ -129,7 +129,7 @@ func (h *Handler) ForkSchedule(ctx context.Context) ([]*state.ScheduledFork, err } }() - sp, err := h.provider.Spec(ctx) + sp, err := h.provider.Spec() if err != nil { return nil, err } @@ -156,7 +156,7 @@ func (h *Handler) DepositContract(ctx context.Context) (*DepositContract, error) } }() - sp, err := h.provider.Spec(ctx) + sp, err := h.provider.Spec() if err != nil { return nil, err } @@ -186,6 +186,10 @@ func (h *Handler) DepositSnapshot(ctx context.Context) (*types.DepositSnapshot, return nil, err } + if finality == nil || finality.Finalized == nil { + return nil, fmt.Errorf("no finality known") + } + snapshot, err := h.provider.GetDepositSnapshot(ctx, finality.Finalized.Epoch) if err != nil { return nil, err @@ -435,7 +439,7 @@ func (h *Handler) BlockRoot(ctx context.Context, blockID BlockIdentifier) (phase } // BlobSidecars returns the blob sidecars for the given block ID. -func (h *Handler) BlobSidecars(ctx context.Context, blockID BlockIdentifier) ([]*deneb.BlobSidecar, error) { +func (h *Handler) BlobSidecars(ctx context.Context, blockID BlockIdentifier, indices []int) ([]*deneb.BlobSidecar, error) { var err error const call = "blob_sidecars" @@ -452,6 +456,7 @@ func (h *Handler) BlobSidecars(ctx context.Context, blockID BlockIdentifier) ([] switch blockID.Type() { case BlockIDGenesis: + //nolint:govet // False positive block, err := h.provider.GetBlockBySlot(ctx, phase0.Slot(0)) if err != nil { return nil, err @@ -468,6 +473,7 @@ func (h *Handler) BlobSidecars(ctx context.Context, blockID BlockIdentifier) ([] slot = sl case BlockIDSlot: + //nolint:govet // False positive sslot, err := NewSlotFromString(blockID.Value()) if err != nil { return nil, err @@ -489,6 +495,7 @@ func (h *Handler) BlobSidecars(ctx context.Context, blockID BlockIdentifier) ([] slot = sl case BlockIDRoot: + //nolint:govet // False positive root, err := blockID.AsRoot() if err != nil { return nil, err @@ -510,6 +517,7 @@ func (h *Handler) BlobSidecars(ctx context.Context, blockID BlockIdentifier) ([] slot = sl case BlockIDFinalized: + //nolint:govet // False positive finality, err := h.provider.Finalized(ctx) if err != nil { return nil, err @@ -538,5 +546,31 @@ func (h *Handler) BlobSidecars(ctx context.Context, blockID BlockIdentifier) ([] return nil, fmt.Errorf("invalid block id: %v", blockID.String()) } - return h.provider.GetBlobSidecarsBySlot(ctx, slot) + sidecars, err := h.provider.GetBlobSidecarsBySlot(ctx, slot) + if err != nil { + return nil, err + } + + if len(indices) == 0 { + return sidecars, nil + } + + filtered := make([]*deneb.BlobSidecar, 0, len(indices)) + + for _, index := range indices { + if index < 0 { + return nil, fmt.Errorf("invalid index %v", index) + } + + // Find the sidecar with the given index + for i, sidecar := range sidecars { + if index == int(sidecar.Index) { + filtered = append(filtered, sidecars[i]) + + break + } + } + } + + return filtered, nil }