Skip to content

Commit

Permalink
fix: Refresh spec on epoch change (#160)
Browse files Browse the repository at this point in the history
* fix: Refresh spec on epoch change

* chore: linting
  • Loading branch information
samcm authored Mar 15, 2024
1 parent c7cf5df commit c5b13c8
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 50 deletions.
17 changes: 16 additions & 1 deletion pkg/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -599,7 +600,21 @@ func (h *Handler) handleEthV1BeaconBlobSidecars(ctx context.Context, r *http.Req
return NewBadRequestResponse(nil), err
}

sidecars, err := h.eth.BlobSidecars(ctx, id)
queryParams := r.URL.Query()
indicesRaw := queryParams["indices"]

indices := make([]int, 0, len(indicesRaw))

for _, index := range indicesRaw {
converted, errr := strconv.Atoi(index)
if errr != nil {
return NewBadRequestResponse(nil), errr
}

indices = append(indices, converted)
}

sidecars, err := h.eth.BlobSidecars(ctx, id, indices)
if err != nil {
return NewInternalServerErrorResponse(nil), err
}
Expand Down
113 changes: 82 additions & 31 deletions pkg/beacon/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ type Default struct {
depositSnapshots *store.DepositSnapshot
blobSidecars *store.BlobSidecar

spec *state.Spec
genesis *v1.Genesis
specMutex sync.Mutex
spec *state.Spec
genesis *v1.Genesis

historicalSlotFailures map[phase0.Slot]int

Expand Down Expand Up @@ -86,6 +87,7 @@ func NewDefaultProvider(namespace string, log logrus.FieldLogger, nodes []node.C
servingMutex: sync.Mutex{},
historicalMutex: sync.Mutex{},
majorityMutex: sync.Mutex{},
specMutex: sync.Mutex{},

metrics: NewMetrics(namespace + "_beacon"),
}
Expand All @@ -103,18 +105,36 @@ func (d *Default) Start(ctx context.Context) error {
go func() {
for {
// Wait until we have a single healthy node.
_, err := d.nodes.Healthy(ctx).NotSyncing(ctx).RandomNode(ctx)
nd, err := d.nodes.Healthy(ctx).NotSyncing(ctx).RandomNode(ctx)
if err != nil {
d.log.WithError(err).Error("Waiting for a healthy, non-syncing node before beginning..")
time.Sleep(time.Second * 5)

continue
}

nd.Beacon.Wallclock().OnEpochChanged(func(epoch ethwallclock.Epoch) {
// Refresh the spec on epoch change.
// This will intentionally use any node (not the one that triggered the event) to fetch the spec.
if err := d.refreshSpec(ctx); err != nil {
d.log.WithError(err).Error("Failed to refresh spec")
}
})

if err := d.startCrons(ctx); err != nil {
d.log.WithError(err).Fatal("Failed to start crons")
}

go func() {
if err := d.startGenesisLoop(ctx); err != nil {
d.log.WithError(err).Fatal("Failed to start genesis loop")
}
}()

if err := d.fetchUpstreamRequirements(ctx); err != nil {
d.log.WithError(err).Error("Failed to fetch upstream requirements")
}

break
}
}()
Expand All @@ -124,7 +144,8 @@ func (d *Default) Start(ctx context.Context) error {
n := node

logCtx := d.log.WithFields(logrus.Fields{
"node": n.Config.Name,
"node": n.Config.Name,
"reason": "serving_updater",
})

n.Beacon.OnFinalityCheckpointUpdated(ctx, func(ctx context.Context, event *beacon.FinalityCheckpointUpdated) error {
Expand All @@ -140,7 +161,13 @@ func (d *Default) Start(ctx context.Context) error {
return err
}

return d.checkForNewServingCheckpoint(ctx)
if err := d.checkForNewServingCheckpoint(ctx); err != nil {
logCtx.WithError(err).Error("Failed to check for new serving checkpoint after finality checkpoint updated")

return err
}

return nil
})

n.Beacon.OnReady(ctx, func(ctx context.Context, _ *beacon.ReadyEvent) error {
Expand Down Expand Up @@ -196,12 +223,6 @@ func (d *Default) startCrons(ctx context.Context) error {
return err
}

go func() {
if err := d.startGenesisLoop(ctx); err != nil {
d.log.WithError(err).Fatal("Failed to start genesis loop")
}
}()

go func() {
if err := d.startServingLoop(ctx); err != nil {
d.log.WithError(err).Fatal("Failed to start serving loop")
Expand All @@ -219,6 +240,14 @@ func (d *Default) startCrons(ctx context.Context) error {
return nil
}

func (d *Default) fetchUpstreamRequirements(ctx context.Context) error {
if err := d.refreshSpec(ctx); err != nil {
return err
}

return nil
}

func (d *Default) StartAsync(ctx context.Context) {
go func() {
if err := d.Start(ctx); err != nil {
Expand Down Expand Up @@ -369,7 +398,7 @@ func (d *Default) Syncing(ctx context.Context) (*v1.SyncState, error) {
SyncDistance: 0,
}

sp, err := d.Spec(ctx)
sp, err := d.Spec()
if err != nil {
return syncState, err
}
Expand Down Expand Up @@ -405,12 +434,24 @@ func (d *Default) Genesis(ctx context.Context) (*v1.Genesis, error) {
return d.genesis, nil
}

func (d *Default) Spec(ctx context.Context) (*state.Spec, error) {
func (d *Default) setSpec(s *state.Spec) {
d.specMutex.Lock()
defer d.specMutex.Unlock()

d.spec = s
}

func (d *Default) Spec() (*state.Spec, error) {
d.specMutex.Lock()
defer d.specMutex.Unlock()

if d.spec == nil {
return nil, errors.New("config spec not yet available")
}

return d.spec, nil
copied := *d.spec

return &copied, nil
}

func (d *Default) OperatingMode() OperatingMode {
Expand Down Expand Up @@ -460,12 +501,7 @@ func (d *Default) checkFinality(ctx context.Context) error {
return nil
}

func (d *Default) checkBeaconSpec(ctx context.Context) error {
// No-Op if we already have a beacon spec
if d.spec != nil {
return nil
}

func (d *Default) refreshSpec(ctx context.Context) error {
d.log.Debug("Fetching beacon spec")

upstream, err := d.nodes.Ready(ctx).DataProviders(ctx).RandomNode(ctx)
Expand All @@ -479,13 +515,23 @@ func (d *Default) checkBeaconSpec(ctx context.Context) error {
}

// store the beacon state spec
d.spec = s
d.setSpec(s)

d.log.Info("Fetched beacon spec")
d.log.Debug("Fetched beacon spec")

return nil
}

func (d *Default) checkBeaconSpec(ctx context.Context) error {
// No-Op if we already have a beacon spec
_, err := d.Spec()
if err == nil {
return nil
}

return d.refreshSpec(ctx)
}

func (d *Default) checkGenesisTime(ctx context.Context) error {
// No-Op if we already have a genesis time
if d.genesis != nil {
Expand Down Expand Up @@ -520,7 +566,7 @@ func (d *Default) OnFinalityCheckpointHeadUpdated(ctx context.Context, cb func(c
})
}

func (d *Default) publishFinalityCheckpointHeadUpdated(ctx context.Context, checkpoint *v1.Finality) {
func (d *Default) publishFinalityCheckpointHeadUpdated(_ context.Context, checkpoint *v1.Finality) {
d.broker.Emit(topicFinalityHeadUpdated, checkpoint)
}

Expand Down Expand Up @@ -599,9 +645,10 @@ func (d *Default) GetBeaconStateByRoot(ctx context.Context, root phase0.Root) (*
return d.states.GetByStateRoot(stateRoot)
}

func (d *Default) storeBlock(ctx context.Context, block *spec.VersionedSignedBeaconBlock) error {
if d.spec == nil {
return errors.New("beacon chain spec is unknown")
func (d *Default) storeBlock(_ context.Context, block *spec.VersionedSignedBeaconBlock) error {
_, err := d.Spec()
if err != nil {
return err
}

if d.genesis == nil {
Expand Down Expand Up @@ -679,7 +726,9 @@ func (d *Default) UpstreamsStatus(ctx context.Context) (map[string]*UpstreamStat

func (d *Default) ListFinalizedSlots(ctx context.Context) ([]phase0.Slot, error) {
slots := []phase0.Slot{}
if d.spec == nil {

sp, err := d.Spec()
if err != nil {
return slots, errors.New("no beacon chain spec available")
}

Expand All @@ -692,17 +741,18 @@ func (d *Default) ListFinalizedSlots(ctx context.Context) ([]phase0.Slot, error)
return slots, errors.New("no finalized checkpoint available")
}

latestSlot := phase0.Slot(uint64(finality.Finalized.Epoch) * uint64(d.spec.SlotsPerEpoch))
latestSlot := phase0.Slot(uint64(finality.Finalized.Epoch) * uint64(sp.SlotsPerEpoch))

for i, val := uint64(latestSlot), uint64(latestSlot)-uint64(d.spec.SlotsPerEpoch)*uint64(d.config.HistoricalEpochCount); i > val; i -= uint64(d.spec.SlotsPerEpoch) {
for i, val := uint64(latestSlot), uint64(latestSlot)-uint64(sp.SlotsPerEpoch)*uint64(d.config.HistoricalEpochCount); i > val; i -= uint64(sp.SlotsPerEpoch) {
slots = append(slots, phase0.Slot(i))
}

return slots, nil
}

func (d *Default) GetEpochBySlot(ctx context.Context, slot phase0.Slot) (phase0.Epoch, error) {
if d.spec == nil {
_, err := d.Spec()
if err != nil {
return phase0.Epoch(0), errors.New("no upstream beacon state spec available")
}

Expand All @@ -716,7 +766,8 @@ func (d *Default) PeerCount(ctx context.Context) (uint64, error) {
func (d *Default) GetSlotTime(ctx context.Context, slot phase0.Slot) (eth.SlotTime, error) {
SlotTime := eth.SlotTime{}

if d.spec == nil {
_, err := d.Spec()
if err != nil {
return SlotTime, errors.New("no upstream beacon state spec available")
}

Expand Down
28 changes: 16 additions & 12 deletions pkg/beacon/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ func (d *Default) downloadServingCheckpoint(ctx context.Context, checkpoint *v1.
return errors.New("finalized checkpoint is nil")
}

d.log.WithField("epoch", checkpoint.Finalized.Epoch).Info("Downloading serving checkpoint")

upstream, err := d.nodes.
Ready(ctx).
DataProviders(ctx).
Expand All @@ -44,10 +46,12 @@ func (d *Default) downloadServingCheckpoint(ctx context.Context, checkpoint *v1.
return fmt.Errorf("failed to get slot from block: %w", err)
}

// For simplicity we'll hardcode SLOTS_PER_EPOCH to 32.
// TODO(sam.calder-mason): Fetch this from a beacon node and store it in the instance.
const slotsPerEpoch = 32
if blockSlot%slotsPerEpoch != 0 {
sp, err := d.Spec()
if err != nil {
return fmt.Errorf("failed to fetch spec: %w", err)
}

if blockSlot%sp.SlotsPerEpoch != 0 {
return fmt.Errorf("block slot is not aligned from an epoch boundary: %d", blockSlot)
}

Expand Down Expand Up @@ -129,8 +133,9 @@ func (d *Default) fetchHistoricalCheckpoints(ctx context.Context, checkpoint *v1
d.historicalMutex.Lock()
defer d.historicalMutex.Unlock()

if d.spec == nil {
return errors.New("beacon spec unavailable")
sp, err := d.Spec()
if err != nil {
return errors.New("chain spec unavailable")
}

if d.genesis == nil {
Expand All @@ -147,8 +152,6 @@ func (d *Default) fetchHistoricalCheckpoints(ctx context.Context, checkpoint *v1
return errors.New("no data provider node available")
}

sp := d.spec

slotsInScope := make(map[phase0.Slot]struct{})

// We always care about the genesis slot.
Expand Down Expand Up @@ -218,7 +221,8 @@ func (d *Default) downloadBlock(ctx context.Context, slot phase0.Slot, upstream
}

// Same thing with the chain spec.
if d.spec == nil {
_, err := d.Spec()
if err != nil {
return nil, errors.New("chain spec not known")
}

Expand Down Expand Up @@ -326,9 +330,9 @@ func (d *Default) fetchBundle(ctx context.Context, root phase0.Root, upstream *N
}
}

sp, err := upstream.Beacon.Spec()
sp, err := d.Spec()
if err != nil {
return nil, fmt.Errorf("failed to fetch spec from upstream node: %w", err)
return nil, fmt.Errorf("failed to fetch spec: %w", err)
}

denebFork, err := sp.ForkEpochs.GetByName("DENEB")
Expand All @@ -341,7 +345,7 @@ func (d *Default) fetchBundle(ctx context.Context, root phase0.Root, upstream *N
}
}

d.log.Infof("Successfully fetched bundle from %s", upstream.Config.Name)
d.log.WithField("root", eth.RootAsString(root)).Infof("Successfully fetched bundle from %s", upstream.Config.Name)

return block, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/beacon/finality_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type FinalityProvider interface {
// Genesis returns the chain genesis.
Genesis(ctx context.Context) (*v1.Genesis, error)
// Spec returns the chain spec.
Spec(ctx context.Context) (*state.Spec, error)
Spec() (*state.Spec, error)
// UpstreamsStatus returns the status of all the upstreams.
UpstreamsStatus(ctx context.Context) (map[string]*UpstreamStatus, error)
// GetBlockBySlot returns the block at the given slot.
Expand Down
Loading

0 comments on commit c5b13c8

Please sign in to comment.