Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: debug too many shuffling promises #7251

Merged
merged 13 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion packages/beacon-node/src/chain/prepareNextSlot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,12 @@ export class PrepareNextSlotScheduler {
// the slot 0 of next epoch will likely use this Previous Root Checkpoint state for state transition so we transfer cache here
// the resulting state with cache will be cached in Checkpoint State Cache which is used for the upcoming block processing
// for other slots dontTransferCached=true because we don't run state transition on this state
{dontTransferCache: !isEpochTransition},
//
// Shuffling calculation will be done asynchronously when passing asyncShufflingCalculation=true. Shuffling will be queued in
// beforeProcessEpoch and should theoretically be ready immediately after the synchronous epoch transition finished and the
// event loop is free. In long periods of non-finality too many forks will cause the shufflingCache to throw an error for
// too many queued shufflings so only run async during normal epoch transition. See issue ChainSafe/lodestar#7244
{dontTransferCache: !isEpochTransition, asyncShufflingCalculation: true},
matthewkeil marked this conversation as resolved.
Show resolved Hide resolved
RegenCaller.precomputeEpoch
);

Expand Down
18 changes: 13 additions & 5 deletions packages/beacon-node/src/chain/regen/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ export enum RegenFnName {
getCheckpointState = "getCheckpointState",
}

export type StateCloneOpts = {
export type StateRegenerationOpts = {
dontTransferCache: boolean;
/**
* Do not queue shuffling calculation async. Forces sync JIT calculation in afterProcessEpoch if not passed as `true`
*/
asyncShufflingCalculation?: boolean;
};

export interface IStateRegenerator extends IStateRegeneratorInternal {
Expand All @@ -56,15 +60,19 @@ export interface IStateRegeneratorInternal {
* Return a valid pre-state for a beacon block
* This will always return a state in the latest viable epoch
*/
getPreState(block: BeaconBlock, opts: StateCloneOpts, rCaller: RegenCaller): Promise<CachedBeaconStateAllForks>;
getPreState(
block: BeaconBlock,
opts: StateRegenerationOpts,
rCaller: RegenCaller
): Promise<CachedBeaconStateAllForks>;

/**
* Return a valid checkpoint state
* This will always return a state with `state.slot % SLOTS_PER_EPOCH === 0`
*/
getCheckpointState(
cp: phase0.Checkpoint,
opts: StateCloneOpts,
opts: StateRegenerationOpts,
rCaller: RegenCaller
): Promise<CachedBeaconStateAllForks>;

Expand All @@ -74,12 +82,12 @@ export interface IStateRegeneratorInternal {
getBlockSlotState(
blockRoot: RootHex,
slot: Slot,
opts: StateCloneOpts,
opts: StateRegenerationOpts,
rCaller: RegenCaller
): Promise<CachedBeaconStateAllForks>;

/**
* Return the exact state with `stateRoot`
*/
getState(stateRoot: RootHex, rCaller: RegenCaller, opts?: StateCloneOpts): Promise<CachedBeaconStateAllForks>;
getState(stateRoot: RootHex, rCaller: RegenCaller, opts?: StateRegenerationOpts): Promise<CachedBeaconStateAllForks>;
}
18 changes: 12 additions & 6 deletions packages/beacon-node/src/chain/regen/queued.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@ import {JobItemQueue} from "../../util/queue/index.js";
import {CheckpointHex, toCheckpointHex} from "../stateCache/index.js";
import {BlockStateCache, CheckpointStateCache} from "../stateCache/types.js";
import {RegenError, RegenErrorCode} from "./errors.js";
import {IStateRegenerator, IStateRegeneratorInternal, RegenCaller, RegenFnName, StateCloneOpts} from "./interface.js";
import {
IStateRegenerator,
IStateRegeneratorInternal,
RegenCaller,
RegenFnName,
StateRegenerationOpts,
} from "./interface.js";
import {RegenModules, StateRegenerator} from "./regen.js";

const REGEN_QUEUE_MAX_LEN = 256;
Expand Down Expand Up @@ -86,7 +92,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
*/
getPreStateSync(
block: BeaconBlock,
opts: StateCloneOpts = {dontTransferCache: true}
opts: StateRegenerationOpts = {dontTransferCache: true}
): CachedBeaconStateAllForks | null {
const parentRoot = toRootHex(block.parentRoot);
const parentBlock = this.forkChoice.getBlockHex(parentRoot);
Expand Down Expand Up @@ -212,7 +218,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
*/
async getPreState(
block: BeaconBlock,
opts: StateCloneOpts,
opts: StateRegenerationOpts,
rCaller: RegenCaller
): Promise<CachedBeaconStateAllForks> {
this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getPreState});
Expand All @@ -231,7 +237,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {

async getCheckpointState(
cp: phase0.Checkpoint,
opts: StateCloneOpts,
opts: StateRegenerationOpts,
rCaller: RegenCaller
): Promise<CachedBeaconStateAllForks> {
this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getCheckpointState});
Expand All @@ -256,7 +262,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
async getBlockSlotState(
blockRoot: RootHex,
slot: Slot,
opts: StateCloneOpts,
opts: StateRegenerationOpts,
rCaller: RegenCaller
): Promise<CachedBeaconStateAllForks> {
this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getBlockSlotState});
Expand All @@ -268,7 +274,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
async getState(
stateRoot: RootHex,
rCaller: RegenCaller,
opts: StateCloneOpts = {dontTransferCache: true}
opts: StateRegenerationOpts = {dontTransferCache: true}
): Promise<CachedBeaconStateAllForks> {
this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getState});

Expand Down
14 changes: 7 additions & 7 deletions packages/beacon-node/src/chain/regen/regen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {getCheckpointFromState} from "../blocks/utils/checkpoint.js";
import {ChainEvent, ChainEventEmitter} from "../emitter.js";
import {BlockStateCache, CheckpointStateCache} from "../stateCache/types.js";
import {RegenError, RegenErrorCode} from "./errors.js";
import {IStateRegeneratorInternal, RegenCaller, StateCloneOpts} from "./interface.js";
import {IStateRegeneratorInternal, RegenCaller, StateRegenerationOpts} from "./interface.js";

export type RegenModules = {
db: IBeaconDb;
Expand Down Expand Up @@ -51,7 +51,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
*/
async getPreState(
block: BeaconBlock,
opts: StateCloneOpts,
opts: StateRegenerationOpts,
regenCaller: RegenCaller
): Promise<CachedBeaconStateAllForks> {
const parentBlock = this.modules.forkChoice.getBlock(block.parentRoot);
Expand Down Expand Up @@ -84,7 +84,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
*/
async getCheckpointState(
cp: phase0.Checkpoint,
opts: StateCloneOpts,
opts: StateRegenerationOpts,
regenCaller: RegenCaller,
allowDiskReload = false
): Promise<CachedBeaconStateAllForks> {
Expand All @@ -99,7 +99,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
async getBlockSlotState(
blockRoot: RootHex,
slot: Slot,
opts: StateCloneOpts,
opts: StateRegenerationOpts,
regenCaller: RegenCaller,
allowDiskReload = false
): Promise<CachedBeaconStateAllForks> {
Expand Down Expand Up @@ -146,7 +146,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
async getState(
stateRoot: RootHex,
caller: RegenCaller,
opts?: StateCloneOpts,
opts?: StateRegenerationOpts,
// internal option, don't want to expose to external caller
allowDiskReload = false
): Promise<CachedBeaconStateAllForks> {
Expand Down Expand Up @@ -322,7 +322,7 @@ async function processSlotsByCheckpoint(
preState: CachedBeaconStateAllForks,
slot: Slot,
regenCaller: RegenCaller,
opts: StateCloneOpts
opts: StateRegenerationOpts
): Promise<CachedBeaconStateAllForks> {
let postState = await processSlotsToNearestCheckpoint(modules, preState, slot, regenCaller, opts);
if (postState.slot < slot) {
Expand All @@ -343,7 +343,7 @@ async function processSlotsToNearestCheckpoint(
preState: CachedBeaconStateAllForks,
slot: Slot,
regenCaller: RegenCaller,
opts: StateCloneOpts
opts: StateRegenerationOpts
): Promise<CachedBeaconStateAllForks> {
const preSlot = preState.slot;
const postSlot = slot;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {Epoch, RootHex} from "@lodestar/types";
import {toRootHex} from "@lodestar/utils";
import {Metrics} from "../../metrics/index.js";
import {StateCloneOpts} from "../regen/interface.js";
import {StateRegenerationOpts} from "../regen/interface.js";
import {MapTracker} from "./mapMetrics.js";
import {BlockStateCache} from "./types.js";

Expand Down Expand Up @@ -39,7 +39,7 @@ export class BlockStateCacheImpl implements BlockStateCache {
}
}

get(rootHex: RootHex, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
get(rootHex: RootHex, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null {
this.metrics?.lookups.inc();
const item = this.head?.stateRoot === rootHex ? this.head.state : this.cache.get(rootHex);
if (!item) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {RootHex} from "@lodestar/types";
import {toRootHex} from "@lodestar/utils";
import {Metrics} from "../../metrics/index.js";
import {LinkedList} from "../../util/array.js";
import {StateCloneOpts} from "../regen/interface.js";
import {StateRegenerationOpts} from "../regen/interface.js";
import {MapTracker} from "./mapMetrics.js";
import {BlockStateCache} from "./types.js";

Expand Down Expand Up @@ -93,7 +93,7 @@ export class FIFOBlockStateCache implements BlockStateCache {
/**
* Get a state from this cache given a state root hex.
*/
get(rootHex: RootHex, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
get(rootHex: RootHex, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null {
this.metrics?.lookups.inc();
const item = this.cache.get(rootHex);
if (!item) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {Epoch, RootHex, phase0} from "@lodestar/types";
import {MapDef, toRootHex} from "@lodestar/utils";
import {Metrics} from "../../metrics/index.js";
import {StateCloneOpts} from "../regen/interface.js";
import {StateRegenerationOpts} from "../regen/interface.js";
import {MapTracker} from "./mapMetrics.js";
import {CacheItemType, CheckpointStateCache} from "./types.js";

Expand Down Expand Up @@ -42,7 +42,7 @@ export class InMemoryCheckpointStateCache implements CheckpointStateCache {
this.maxEpochs = maxEpochs;
}

async getOrReload(cp: CheckpointHex, opts?: StateCloneOpts): Promise<CachedBeaconStateAllForks | null> {
async getOrReload(cp: CheckpointHex, opts?: StateRegenerationOpts): Promise<CachedBeaconStateAllForks | null> {
return this.get(cp, opts);
}

Expand All @@ -54,7 +54,7 @@ export class InMemoryCheckpointStateCache implements CheckpointStateCache {
async getOrReloadLatest(
rootHex: string,
maxEpoch: number,
opts?: StateCloneOpts
opts?: StateRegenerationOpts
): Promise<CachedBeaconStateAllForks | null> {
return this.getLatest(rootHex, maxEpoch, opts);
}
Expand All @@ -64,7 +64,7 @@ export class InMemoryCheckpointStateCache implements CheckpointStateCache {
return 0;
}

get(cp: CheckpointHex, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
get(cp: CheckpointHex, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null {
this.metrics?.lookups.inc();
const cpKey = toCheckpointKey(cp);
const item = this.cache.get(cpKey);
Expand Down Expand Up @@ -98,7 +98,7 @@ export class InMemoryCheckpointStateCache implements CheckpointStateCache {
/**
* Searches for the latest cached state with a `root`, starting with `epoch` and descending
*/
getLatest(rootHex: RootHex, maxEpoch: Epoch, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
getLatest(rootHex: RootHex, maxEpoch: Epoch, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null {
// sort epochs in descending order, only consider epochs lte `epoch`
const epochs = Array.from(this.epochIndex.keys())
.sort((a, b) => b - a)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {Logger, MapDef, fromHex, sleep, toHex, toRootHex} from "@lodestar/utils"
import {Metrics} from "../../metrics/index.js";
import {AllocSource, BufferPool, BufferWithKey} from "../../util/bufferPool.js";
import {IClock} from "../../util/clock.js";
import {StateCloneOpts} from "../regen/interface.js";
import {StateRegenerationOpts} from "../regen/interface.js";
import {serializeState} from "../serializeState.js";
import {ShufflingCache} from "../shufflingCache.js";
import {CPStateDatastore, DatastoreKey, datastoreKeyToCheckpoint} from "./datastore/index.js";
Expand Down Expand Up @@ -187,7 +187,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
* - Get block for processing
* - Regen head state
*/
async getOrReload(cp: CheckpointHex, opts?: StateCloneOpts): Promise<CachedBeaconStateAllForks | null> {
async getOrReload(cp: CheckpointHex, opts?: StateRegenerationOpts): Promise<CachedBeaconStateAllForks | null> {
const stateOrStateBytesData = await this.getStateOrLoadDb(cp, opts);
if (stateOrStateBytesData === null || isCachedBeaconState(stateOrStateBytesData)) {
return stateOrStateBytesData?.clone(opts?.dontTransferCache) ?? null;
Expand Down Expand Up @@ -259,7 +259,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
*/
async getStateOrLoadDb(
cp: CheckpointHex,
opts?: StateCloneOpts
opts?: StateRegenerationOpts
): Promise<CachedBeaconStateAllForks | LoadedStateBytesData | null> {
const cpKey = toCacheKey(cp);
const inMemoryState = this.get(cpKey, opts);
Expand Down Expand Up @@ -291,7 +291,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
/**
* Similar to get() api without reloading from disk
*/
get(cpOrKey: CheckpointHex | string, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
get(cpOrKey: CheckpointHex | string, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null {
this.metrics?.cpStateCache.lookups.inc();
const cpKey = typeof cpOrKey === "string" ? cpOrKey : toCacheKey(cpOrKey);
const cacheItem = this.cache.get(cpKey);
Expand Down Expand Up @@ -342,7 +342,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
/**
* Searches in-memory state for the latest cached state with a `root` without reload, starting with `epoch` and descending
*/
getLatest(rootHex: RootHex, maxEpoch: Epoch, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
getLatest(rootHex: RootHex, maxEpoch: Epoch, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null {
// sort epochs in descending order, only consider epochs lte `epoch`
const epochs = Array.from(this.epochIndex.keys())
.sort((a, b) => b - a)
Expand All @@ -368,7 +368,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
async getOrReloadLatest(
rootHex: RootHex,
maxEpoch: Epoch,
opts?: StateCloneOpts
opts?: StateRegenerationOpts
): Promise<CachedBeaconStateAllForks | null> {
// sort epochs in descending order, only consider epochs lte `epoch`
const epochs = Array.from(this.epochIndex.keys())
Expand Down
12 changes: 6 additions & 6 deletions packages/beacon-node/src/chain/stateCache/types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {routes} from "@lodestar/api";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {Epoch, RootHex, phase0} from "@lodestar/types";
import {StateCloneOpts} from "../regen/interface.js";
import {StateRegenerationOpts} from "../regen/interface.js";

export type CheckpointHex = {epoch: Epoch; rootHex: RootHex};

Expand All @@ -21,7 +21,7 @@ export type CheckpointHex = {epoch: Epoch; rootHex: RootHex};
* The cache key is state root
*/
export interface BlockStateCache {
get(rootHex: RootHex, opts?: StateCloneOpts): CachedBeaconStateAllForks | null;
get(rootHex: RootHex, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null;
add(item: CachedBeaconStateAllForks): void;
setHeadState(item: CachedBeaconStateAllForks | null): void;
/**
Expand Down Expand Up @@ -60,15 +60,15 @@ export interface BlockStateCache {
*/
export interface CheckpointStateCache {
init?: () => Promise<void>;
getOrReload(cp: CheckpointHex, opts?: StateCloneOpts): Promise<CachedBeaconStateAllForks | null>;
getOrReload(cp: CheckpointHex, opts?: StateRegenerationOpts): Promise<CachedBeaconStateAllForks | null>;
getStateOrBytes(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | Uint8Array | null>;
get(cpOrKey: CheckpointHex | string, opts?: StateCloneOpts): CachedBeaconStateAllForks | null;
get(cpOrKey: CheckpointHex | string, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null;
add(cp: phase0.Checkpoint, state: CachedBeaconStateAllForks): void;
getLatest(rootHex: RootHex, maxEpoch: Epoch, opts?: StateCloneOpts): CachedBeaconStateAllForks | null;
getLatest(rootHex: RootHex, maxEpoch: Epoch, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null;
getOrReloadLatest(
rootHex: RootHex,
maxEpoch: Epoch,
opts?: StateCloneOpts
opts?: StateRegenerationOpts
): Promise<CachedBeaconStateAllForks | null>;
updatePreComputedCheckpoint(rootHex: RootHex, epoch: Epoch): number | null;
prune(finalizedEpoch: Epoch, justifiedEpoch: Epoch): void;
Expand Down
Loading
Loading