Skip to content

Commit

Permalink
Merge branch 'unstable' into nflaig/auth-error-log
Browse files Browse the repository at this point in the history
  • Loading branch information
nflaig committed Nov 29, 2024
2 parents fea6527 + aaac34a commit 1ede386
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 68 deletions.
44 changes: 33 additions & 11 deletions packages/beacon-node/src/chain/archiver/archiveBlocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ export async function archiveBlocks(
});

if (finalizedPostDeneb) {
await migrateBlobSidecarsFromHotToColdDb(config, db, finalizedCanonicalBlockRoots);
logger.verbose("Migrated blobSidecars from hot DB to cold DB");
const migrate = await migrateBlobSidecarsFromHotToColdDb(config, db, finalizedCanonicalBlockRoots, currentEpoch);
logger.verbose(migrate ? "Migrated blobSidecars from hot DB to cold DB" : "Skip blobSidecars migration");
}
}

Expand Down Expand Up @@ -157,22 +157,36 @@ async function migrateBlocksFromHotToColdDb(db: IBeaconDb, blocks: BlockRootSlot
}
}

/**
* Migrate blobSidecars from hot db to cold db.
* @returns true if we do that, false if block is out of range data.
*/
async function migrateBlobSidecarsFromHotToColdDb(
config: ChainForkConfig,
db: IBeaconDb,
blocks: BlockRootSlot[]
): Promise<void> {
blocks: BlockRootSlot[],
currentEpoch: Epoch
): Promise<boolean> {
let result = false;

for (let i = 0; i < blocks.length; i += BLOB_SIDECAR_BATCH_SIZE) {
const toIdx = Math.min(i + BLOB_SIDECAR_BATCH_SIZE, blocks.length);
const canonicalBlocks = blocks.slice(i, toIdx);

// processCanonicalBlocks
if (canonicalBlocks.length === 0) return;
if (canonicalBlocks.length === 0) return false;

// load Buffer instead of ssz deserialized to improve performance
const canonicalBlobSidecarsEntries: KeyValue<Slot, Uint8Array>[] = await Promise.all(
canonicalBlocks
.filter((block) => config.getForkSeq(block.slot) >= ForkSeq.deneb)
.filter((block) => {
const blockSlot = block.slot;
const blockEpoch = computeEpochAtSlot(blockSlot);
return (
config.getForkSeq(blockSlot) >= ForkSeq.deneb &&
blockEpoch >= currentEpoch - config.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS
);
})
.map(async (block) => {
const bytes = await db.blobSidecars.getBinary(block.root);
if (!bytes) {
Expand All @@ -182,12 +196,20 @@ async function migrateBlobSidecarsFromHotToColdDb(
})
);

// put to blockArchive db and delete block db
await Promise.all([
db.blobSidecarsArchive.batchPutBinary(canonicalBlobSidecarsEntries),
db.blobSidecars.batchDelete(canonicalBlocks.map((block) => block.root)),
]);
const migrate = canonicalBlobSidecarsEntries.length > 0;

if (migrate) {
// put to blockArchive db and delete block db
await Promise.all([
db.blobSidecarsArchive.batchPutBinary(canonicalBlobSidecarsEntries),
db.blobSidecars.batchDelete(canonicalBlocks.map((block) => block.root)),
]);
}

result = result || migrate;
}

return result;
}

/**
Expand Down
1 change: 0 additions & 1 deletion packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ export class BeaconChain implements IBeaconChain {
metrics,
logger,
clock,
shufflingCache: this.shufflingCache,
blockStateCache,
bufferPool: this.bufferPool,
datastore: fileDataStore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,20 @@ import {AllocSource, BufferPool, BufferWithKey} from "../../util/bufferPool.js";
import {IClock} from "../../util/clock.js";
import {StateCloneOpts} from "../regen/interface.js";
import {serializeState} from "../serializeState.js";
import {ShufflingCache} from "../shufflingCache.js";
import {CPStateDatastore, DatastoreKey, datastoreKeyToCheckpoint} from "./datastore/index.js";
import {MapTracker} from "./mapMetrics.js";
import {BlockStateCache, CacheItemType, CheckpointHex, CheckpointStateCache} from "./types.js";

export type PersistentCheckpointStateCacheOpts = {
/** Keep max n states in memory, persist the rest to disk */
maxCPStateEpochsInMemory?: number;
/** for testing only */
processLateBlock?: boolean;
};

type PersistentCheckpointStateCacheModules = {
metrics?: Metrics | null;
logger: Logger;
clock?: IClock | null;
signal?: AbortSignal;
shufflingCache: ShufflingCache;
datastore: CPStateDatastore;
blockStateCache: BlockStateCache;
bufferPool?: BufferPool | null;
Expand Down Expand Up @@ -102,24 +98,12 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
private preComputedCheckpoint: string | null = null;
private preComputedCheckpointHits: number | null = null;
private readonly maxEpochsInMemory: number;
// only for testing, default false for production
private readonly processLateBlock: boolean;
private readonly datastore: CPStateDatastore;
private readonly shufflingCache: ShufflingCache;
private readonly blockStateCache: BlockStateCache;
private readonly bufferPool?: BufferPool | null;

constructor(
{
metrics,
logger,
clock,
signal,
shufflingCache,
datastore,
blockStateCache,
bufferPool,
}: PersistentCheckpointStateCacheModules,
{metrics, logger, clock, signal, datastore, blockStateCache, bufferPool}: PersistentCheckpointStateCacheModules,
opts: PersistentCheckpointStateCacheOpts
) {
this.cache = new MapTracker(metrics?.cpStateCache);
Expand Down Expand Up @@ -153,10 +137,8 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
throw new Error("maxEpochsInMemory must be >= 0");
}
this.maxEpochsInMemory = opts.maxCPStateEpochsInMemory ?? DEFAULT_MAX_CP_STATE_EPOCHS_IN_MEMORY;
this.processLateBlock = opts.processLateBlock ?? false;
// Specify different datastore for testing
this.datastore = datastore;
this.shufflingCache = shufflingCache;
this.blockStateCache = blockStateCache;
this.bufferPool = bufferPool;
}
Expand All @@ -169,12 +151,11 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
await this.datastore.init();
}
const persistedKeys = await this.datastore.readKeys();
for (const persistedKey of persistedKeys) {
const cp = datastoreKeyToCheckpoint(persistedKey);
this.cache.set(toCacheKey(cp), {type: CacheItemType.persisted, value: persistedKey});
this.epochIndex.getOrDefault(cp.epoch).add(toRootHex(cp.root));
}
this.logger.info("Loaded persisted checkpoint states from the last run", {
// all checkpoint states from the last run are not trusted, remove them
// otherwise if we have a bad checkpoint state from the last run, the node get stucked
// this was found during mekong devnet, see https://github.com/ChainSafe/lodestar/pull/7255
await Promise.all(persistedKeys.map((key) => this.datastore.remove(key)));
this.logger.info("Removed persisted checkpoint states from the last run", {
count: persistedKeys.length,
maxEpochsInMemory: this.maxEpochsInMemory,
});
Expand Down Expand Up @@ -487,12 +468,9 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
// 2/3 of slot is the most free time of every slot, take that chance to persist checkpoint states
// normally it should only persist checkpoint states at 2/3 of slot 0 of epoch
await sleep(secToTwoThirdsSlot * 1000, this.signal);
} else if (!this.processLateBlock) {
// normally the block persist happens at 2/3 of slot 0 of epoch, if it's already late then just skip to allow other tasks to run
// there are plenty of chances in the same epoch to persist checkpoint states, also if block is late it could be reorged
this.logger.verbose("Skip persist checkpoint states", {blockSlot, root: blockRootHex});
return 0;
}
// at syncing time, it's critical to persist checkpoint states as soon as possible to avoid OOM during unfinality time
// if node is synced this is not a hot time because block comes late, we'll likely miss attestation already, or the block is orphaned

const persistEpochs = sortedEpochs.slice(0, sortedEpochs.length - this.maxEpochsInMemory);
for (const lowestEpoch of persistEpochs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ export async function beaconBlocksMaybeBlobsByRange(
return blocks.map((block) => getBlockInput.preData(config, block.data, BlockSource.byRange, block.bytes));
}

// From Deneb
// Only request blobs if they are recent enough
if (computeEpochAtSlot(startSlot) >= currentEpoch - config.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS) {
if (startEpoch >= currentEpoch - config.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS) {
const [allBlocks, allBlobSidecars] = await Promise.all([
network.sendBeaconBlocksByRange(peerId, request),
network.sendBlobSidecarsByRange(peerId, request),
Expand All @@ -46,8 +47,9 @@ export async function beaconBlocksMaybeBlobsByRange(
return matchBlockWithBlobs(config, allBlocks, allBlobSidecars, endSlot, BlockSource.byRange, BlobsSource.byRange);
}

// Post Deneb but old blobs
throw Error("Cannot sync blobs outside of blobs prune window");
// Data is out of range, only request blocks
const blocks = await network.sendBeaconBlocksByRange(peerId, request);
return blocks.map((block) => getBlockInput.outOfRangeData(config, block.data, BlockSource.byRange, block.bytes));
}

// Assumes that the blobs are in the same sequence as blocks, doesn't require block to be sorted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,9 @@ describe("PersistentCheckpointStateCache", () => {
{
datastore,
logger: testLogger(),
shufflingCache: new ShufflingCache(),
blockStateCache: new FIFOBlockStateCache({}, {}),
},
{maxCPStateEpochsInMemory: 2, processLateBlock: true}
{maxCPStateEpochsInMemory: 2}
);
cache.add(cp0a, states["cp0a"]);
cache.add(cp0b, states["cp0b"]);
Expand Down Expand Up @@ -165,10 +164,9 @@ describe("PersistentCheckpointStateCache", () => {
{
datastore,
logger: testLogger(),
shufflingCache: new ShufflingCache(),
blockStateCache: new FIFOBlockStateCache({}, {}),
},
{maxCPStateEpochsInMemory: 2, processLateBlock: true}
{maxCPStateEpochsInMemory: 2}
);
cache.add(cp0a, states["cp0a"]);
cache.add(cp0b, states["cp0b"]);
Expand Down Expand Up @@ -242,10 +240,9 @@ describe("PersistentCheckpointStateCache", () => {
{
datastore,
logger: testLogger(),
shufflingCache: new ShufflingCache(),
blockStateCache: new FIFOBlockStateCache({}, {}),
},
{maxCPStateEpochsInMemory: 2, processLateBlock: true}
{maxCPStateEpochsInMemory: 2}
);
cache.add(cp0a, states["cp0a"]);
cache.add(cp0b, states["cp0b"]);
Expand Down Expand Up @@ -548,10 +545,9 @@ describe("PersistentCheckpointStateCache", () => {
{
datastore,
logger: testLogger(),
shufflingCache: new ShufflingCache(),
blockStateCache: new FIFOBlockStateCache({}, {}),
},
{maxCPStateEpochsInMemory: 1, processLateBlock: true}
{maxCPStateEpochsInMemory: 1}
);
cache.add(cp0a, states["cp0a"]);
cache.add(cp0b, states["cp0b"]);
Expand Down Expand Up @@ -820,10 +816,9 @@ describe("PersistentCheckpointStateCache", () => {
{
datastore,
logger: testLogger(),
shufflingCache: new ShufflingCache(),
blockStateCache: new FIFOBlockStateCache({}, {}),
},
{maxCPStateEpochsInMemory: 0, processLateBlock: true}
{maxCPStateEpochsInMemory: 0}
);
cache.add(cp0a, states["cp0a"]);
cache.add(cp0b, states["cp0b"]);
Expand Down Expand Up @@ -911,10 +906,9 @@ describe("PersistentCheckpointStateCache", () => {
{
datastore,
logger: testLogger(),
shufflingCache: new ShufflingCache(),
blockStateCache: new FIFOBlockStateCache({}, {}),
},
{maxCPStateEpochsInMemory: 0, processLateBlock: true}
{maxCPStateEpochsInMemory: 0}
);

const root1a = Buffer.alloc(32, 100);
Expand Down
4 changes: 4 additions & 0 deletions packages/cli/src/util/pruneOldFilesInDir.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import fs from "node:fs";
import path from "node:path";

export function pruneOldFilesInDir(dirpath: string, maxAgeMs: number): number {
if (!fs.existsSync(dirpath)) {
return 0; // Nothing to prune
}

let deletedFileCount = 0;
for (const entryName of fs.readdirSync(dirpath)) {
const entryPath = path.join(dirpath, entryName);
Expand Down
4 changes: 4 additions & 0 deletions packages/cli/test/unit/util/pruneOldFilesInDir.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ describe("pruneOldFilesInDir", () => {
expect(fs.existsSync(emptyDir)).toBe(false);
});

it("should handle missing directories", () => {
expect(() => pruneOldFilesInDir(path.join(dataDir, "does-not-exist"), DAYS_TO_MS)).not.toThrowError();
});

function createFileWithAge(path: string, ageInDays: number): void {
// Create a new empty file
fs.closeSync(fs.openSync(path, "w"));
Expand Down
3 changes: 2 additions & 1 deletion packages/prover/test/unit/web3_provider.node.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import {JsonRpcRequest, JsonRpcRequestOrBatch, JsonRpcResponse} from "../../src/
import {ELRpcProvider} from "../../src/utils/rpc_provider.js";
import {createVerifiedExecutionProvider} from "../../src/web3_provider.js";

describe("web3_provider", () => {
// https://github.com/ChainSafe/lodestar/issues/7250
describe.skip("web3_provider", () => {
afterEach(() => {
vi.clearAllMocks();
});
Expand Down
25 changes: 18 additions & 7 deletions packages/state-transition/src/cache/epochTransitionCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ import {Epoch, RootHex, ValidatorIndex, phase0} from "@lodestar/types";
import {intDiv, toRootHex} from "@lodestar/utils";

import {processPendingAttestations} from "../epoch/processPendingAttestations.js";
import {CachedBeaconStateAllForks, CachedBeaconStateAltair, CachedBeaconStatePhase0} from "../index.js";
import {
CachedBeaconStateAllForks,
CachedBeaconStateAltair,
CachedBeaconStatePhase0,
hasCompoundingWithdrawalCredential,
} from "../index.js";
import {computeBaseRewardPerIncrement} from "../util/altair.js";
import {
FLAG_CURR_HEAD_ATTESTER,
Expand Down Expand Up @@ -133,11 +138,7 @@ export interface EpochTransitionCache {

flags: number[];

/**
* Validators in the current epoch, should use it for read-only value instead of accessing state.validators directly.
* Note that during epoch processing, validators could be updated so need to use it with care.
*/
validators: phase0.Validator[];
isCompoundingValidatorArr: boolean[];

/**
* balances array will be populated by processRewardsAndPenalties() and consumed by processEffectiveBalanceUpdates().
Expand Down Expand Up @@ -209,6 +210,8 @@ const inclusionDelays = new Array<number>();
/** WARNING: reused, never gc'd */
const flags = new Array<number>();
/** WARNING: reused, never gc'd */
const isCompoundingValidatorArr = new Array<boolean>();
/** WARNING: reused, never gc'd */
const nextEpochShufflingActiveValidatorIndices = new Array<number>();

export function beforeProcessEpoch(
Expand Down Expand Up @@ -262,6 +265,10 @@ export function beforeProcessEpoch(
// TODO: optimize by combining the two loops
// likely will require splitting into phase0 and post-phase0 versions

if (forkSeq >= ForkSeq.electra) {
isCompoundingValidatorArr.length = validatorCount;
}

// Clone before being mutated in processEffectiveBalanceUpdates
epochCtx.beforeEpochTransition();

Expand Down Expand Up @@ -298,6 +305,10 @@ export function beforeProcessEpoch(

flags[i] = flag;

if (forkSeq >= ForkSeq.electra) {
isCompoundingValidatorArr[i] = hasCompoundingWithdrawalCredential(validator.withdrawalCredentials);
}

if (isActiveCurr) {
totalActiveStakeByIncrement += effectiveBalancesByIncrements[i];
} else {
Expand Down Expand Up @@ -511,7 +522,7 @@ export function beforeProcessEpoch(
proposerIndices,
inclusionDelays,
flags,
validators,
isCompoundingValidatorArr,
// Will be assigned in processRewardsAndPenalties()
balances: undefined,
};
Expand Down
Loading

0 comments on commit 1ede386

Please sign in to comment.