Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prune checkpoint states at syncing time #7241

Merged
merged 4 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ export class BeaconChain implements IBeaconChain {
metrics,
logger,
clock,
shufflingCache: this.shufflingCache,
blockStateCache,
bufferPool: this.bufferPool,
datastore: fileDataStore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,20 @@ import {AllocSource, BufferPool, BufferWithKey} from "../../util/bufferPool.js";
import {IClock} from "../../util/clock.js";
import {StateCloneOpts} from "../regen/interface.js";
import {serializeState} from "../serializeState.js";
import {ShufflingCache} from "../shufflingCache.js";
import {CPStateDatastore, DatastoreKey, datastoreKeyToCheckpoint} from "./datastore/index.js";
import {MapTracker} from "./mapMetrics.js";
import {BlockStateCache, CacheItemType, CheckpointHex, CheckpointStateCache} from "./types.js";

export type PersistentCheckpointStateCacheOpts = {
/** Keep max n states in memory, persist the rest to disk */
maxCPStateEpochsInMemory?: number;
/** for testing only */
processLateBlock?: boolean;
};

type PersistentCheckpointStateCacheModules = {
metrics?: Metrics | null;
logger: Logger;
clock?: IClock | null;
signal?: AbortSignal;
shufflingCache: ShufflingCache;
datastore: CPStateDatastore;
blockStateCache: BlockStateCache;
bufferPool?: BufferPool | null;
Expand Down Expand Up @@ -102,24 +98,12 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
private preComputedCheckpoint: string | null = null;
private preComputedCheckpointHits: number | null = null;
private readonly maxEpochsInMemory: number;
// only for testing, default false for production
private readonly processLateBlock: boolean;
private readonly datastore: CPStateDatastore;
private readonly shufflingCache: ShufflingCache;
private readonly blockStateCache: BlockStateCache;
private readonly bufferPool?: BufferPool | null;

constructor(
{
metrics,
logger,
clock,
signal,
shufflingCache,
datastore,
blockStateCache,
bufferPool,
}: PersistentCheckpointStateCacheModules,
{metrics, logger, clock, signal, datastore, blockStateCache, bufferPool}: PersistentCheckpointStateCacheModules,
opts: PersistentCheckpointStateCacheOpts
) {
this.cache = new MapTracker(metrics?.cpStateCache);
Expand Down Expand Up @@ -153,10 +137,8 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
throw new Error("maxEpochsInMemory must be >= 0");
}
this.maxEpochsInMemory = opts.maxCPStateEpochsInMemory ?? DEFAULT_MAX_CP_STATE_EPOCHS_IN_MEMORY;
this.processLateBlock = opts.processLateBlock ?? false;
// Specify different datastore for testing
this.datastore = datastore;
this.shufflingCache = shufflingCache;
this.blockStateCache = blockStateCache;
this.bufferPool = bufferPool;
}
Expand Down Expand Up @@ -487,12 +469,9 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
// 2/3 of slot is the most free time of every slot, take that chance to persist checkpoint states
// normally it should only persist checkpoint states at 2/3 of slot 0 of epoch
await sleep(secToTwoThirdsSlot * 1000, this.signal);
} else if (!this.processLateBlock) {
// normally the block persist happens at 2/3 of slot 0 of epoch, if it's already late then just skip to allow other tasks to run
// there are plenty of chances in the same epoch to persist checkpoint states, also if block is late it could be reorged
this.logger.verbose("Skip persist checkpoint states", {blockSlot, root: blockRootHex});
return 0;
}
// at syncing time, it's critical to persist checkpoint states as soon as possible to avoid OOM during unfinality time
// if node is synced this is not a hot time because block comes late, we'll likely miss attestation already, or the block is orphaned

const persistEpochs = sortedEpochs.slice(0, sortedEpochs.length - this.maxEpochsInMemory);
for (const lowestEpoch of persistEpochs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,9 @@ describe("PersistentCheckpointStateCache", () => {
{
datastore,
logger: testLogger(),
shufflingCache: new ShufflingCache(),
blockStateCache: new FIFOBlockStateCache({}, {}),
},
{maxCPStateEpochsInMemory: 2, processLateBlock: true}
{maxCPStateEpochsInMemory: 2}
);
cache.add(cp0a, states["cp0a"]);
cache.add(cp0b, states["cp0b"]);
Expand Down Expand Up @@ -165,10 +164,9 @@ describe("PersistentCheckpointStateCache", () => {
{
datastore,
logger: testLogger(),
shufflingCache: new ShufflingCache(),
blockStateCache: new FIFOBlockStateCache({}, {}),
},
{maxCPStateEpochsInMemory: 2, processLateBlock: true}
{maxCPStateEpochsInMemory: 2}
);
cache.add(cp0a, states["cp0a"]);
cache.add(cp0b, states["cp0b"]);
Expand Down Expand Up @@ -242,10 +240,9 @@ describe("PersistentCheckpointStateCache", () => {
{
datastore,
logger: testLogger(),
shufflingCache: new ShufflingCache(),
blockStateCache: new FIFOBlockStateCache({}, {}),
},
{maxCPStateEpochsInMemory: 2, processLateBlock: true}
{maxCPStateEpochsInMemory: 2}
);
cache.add(cp0a, states["cp0a"]);
cache.add(cp0b, states["cp0b"]);
Expand Down Expand Up @@ -548,10 +545,9 @@ describe("PersistentCheckpointStateCache", () => {
{
datastore,
logger: testLogger(),
shufflingCache: new ShufflingCache(),
blockStateCache: new FIFOBlockStateCache({}, {}),
},
{maxCPStateEpochsInMemory: 1, processLateBlock: true}
{maxCPStateEpochsInMemory: 1}
);
cache.add(cp0a, states["cp0a"]);
cache.add(cp0b, states["cp0b"]);
Expand Down Expand Up @@ -820,10 +816,9 @@ describe("PersistentCheckpointStateCache", () => {
{
datastore,
logger: testLogger(),
shufflingCache: new ShufflingCache(),
blockStateCache: new FIFOBlockStateCache({}, {}),
},
{maxCPStateEpochsInMemory: 0, processLateBlock: true}
{maxCPStateEpochsInMemory: 0}
);
cache.add(cp0a, states["cp0a"]);
cache.add(cp0b, states["cp0b"]);
Expand Down Expand Up @@ -911,10 +906,9 @@ describe("PersistentCheckpointStateCache", () => {
{
datastore,
logger: testLogger(),
shufflingCache: new ShufflingCache(),
blockStateCache: new FIFOBlockStateCache({}, {}),
},
{maxCPStateEpochsInMemory: 0, processLateBlock: true}
{maxCPStateEpochsInMemory: 0}
);

const root1a = Buffer.alloc(32, 100);
Expand Down
Loading