Skip to content

Commit

Permalink
Add pruner for finalized states (#8379)
Browse files Browse the repository at this point in the history
* add StatePruner and register in storage service

* Add experimental command and default values in the storage config

* Add state pruning limit and cache last pruned slot

* Add validation to prevent users from enabling reconstruction of historic states

* add validation for state pruning to enforce archive mode at config level

* addressing comments and adding validation to prevent possible misconfiguration

Signed-off-by: Gabriel Fukushima <[email protected]>

---------

Signed-off-by: Gabriel Fukushima <[email protected]>
  • Loading branch information
gfukushima authored Jul 10, 2024
1 parent b4de196 commit f0d8fbd
Show file tree
Hide file tree
Showing 17 changed files with 551 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- Updated counter metrics to incorporate the suffix `_total`. If you are using a custom dashboard to monitor Teku metrics, you might need to update the metrics manually when breaking changes are introduced. For more information, see [Update metrics](../../how-to/monitor/update-metrics.md).

### Additions and Improvements
- Added a state pruner that can limit the number of finalized states stored when running an archive node.

- Updated bootnodes for Sepolia network.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import static tech.pegasys.teku.spec.config.Constants.STORAGE_QUERY_CHANNEL_PARALLELISM;

import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.ethereum.pow.api.Eth1EventsChannel;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
Expand All @@ -40,6 +42,7 @@
import tech.pegasys.teku.storage.server.VersionedDatabaseFactory;
import tech.pegasys.teku.storage.server.pruner.BlobSidecarPruner;
import tech.pegasys.teku.storage.server.pruner.BlockPruner;
import tech.pegasys.teku.storage.server.pruner.StatePruner;

public class StorageService extends Service implements StorageServiceFacade {
private final StorageConfiguration config;
Expand All @@ -49,8 +52,10 @@ public class StorageService extends Service implements StorageServiceFacade {
private volatile BatchingVoteUpdateChannel batchingVoteUpdateChannel;
private volatile Optional<BlockPruner> blockPruner = Optional.empty();
private volatile Optional<BlobSidecarPruner> blobsPruner = Optional.empty();
private volatile Optional<StatePruner> statePruner = Optional.empty();
private final boolean depositSnapshotStorageEnabled;
private final boolean blobSidecarsStorageCountersEnabled;
private static final Logger LOG = LogManager.getLogger();

public StorageService(
final ServiceConfig serviceConfig,
Expand Down Expand Up @@ -111,6 +116,26 @@ protected SafeFuture<?> doStart() {
pruningTimingsLabelledGauge,
pruningActiveLabelledGauge));
}
if (config.getDataStorageMode().storesFinalizedStates()
&& config.getRetainedSlots() > -1) {
LOG.info(
"State pruner will run every: {} minute(s), retaining states for the last {} finalized slots. Limited to {} state prune per execution. ",
config.getStatePruningInterval().toMinutes(),
config.getRetainedSlots(),
config.getStatePruningLimit());
statePruner =
Optional.of(
new StatePruner(
config.getSpec(),
database,
storagePrunerAsyncRunner,
config.getStatePruningInterval(),
config.getRetainedSlots(),
config.getStatePruningLimit(),
"state",
pruningTimingsLabelledGauge,
pruningActiveLabelledGauge));
}
if (config.getSpec().isMilestoneSupported(SpecMilestone.DENEB)) {
blobsPruner =
Optional.of(
Expand Down Expand Up @@ -170,6 +195,11 @@ protected SafeFuture<?> doStart() {
__ ->
blobsPruner
.map(BlobSidecarPruner::start)
.orElseGet(() -> SafeFuture.completedFuture(null)))
.thenCompose(
__ ->
statePruner
.map(StatePruner::start)
.orElseGet(() -> SafeFuture.completedFuture(null)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,7 @@ default Stream<SlotAndBlockRootAndBlobIndex> streamBlobSidecarKeys(final UInt64
* @return actual last pruned slot
*/
UInt64 pruneFinalizedBlocks(UInt64 lastSlotToPrune, int pruneLimit);

Optional<UInt64> pruneFinalizedStates(
Optional<UInt64> lastPrunedSlot, UInt64 lastSlotToPruneStateFor, long pruneLimit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public class StorageConfiguration {
public static final Duration DEFAULT_BLOCK_PRUNING_INTERVAL = Duration.ofMinutes(15);
public static final int DEFAULT_BLOCK_PRUNING_LIMIT = 5000;
public static final Duration DEFAULT_BLOBS_PRUNING_INTERVAL = Duration.ofMinutes(1);
public static final Duration DEFAULT_STATE_PRUNING_INTERVAL = Duration.ofMinutes(5);
public static final long DEFAULT_STORAGE_RETAINED_SLOTS = -1;
public static final int DEFAULT_STATE_PRUNING_LIMIT = 1;

// 60/12 = 5 blocks per minute * 6 max blobs per block = 30 blobs per minute at maximum, 15 as
// target. Let's configure 48 pruning per minute, so we have some room for catching up.
Expand All @@ -53,8 +56,11 @@ public class StorageConfiguration {
private final int maxKnownNodeCacheSize;
private final Duration blockPruningInterval;
private final int blockPruningLimit;
private final Duration statePruningInterval;
private final Duration blobsPruningInterval;
private final int blobsPruningLimit;
private final long retainedSlots;
private final int statePruningLimit;

private final int stateRebuildTimeoutSeconds;

Expand All @@ -70,6 +76,9 @@ private StorageConfiguration(
final Duration blobsPruningInterval,
final int blobsPruningLimit,
final int stateRebuildTimeoutSeconds,
final long retainedSlots,
final Duration statePruningInterval,
final int statePruningLimit,
final Spec spec) {
this.eth1DepositContract = eth1DepositContract;
this.dataStorageMode = dataStorageMode;
Expand All @@ -82,6 +91,9 @@ private StorageConfiguration(
this.blobsPruningInterval = blobsPruningInterval;
this.blobsPruningLimit = blobsPruningLimit;
this.stateRebuildTimeoutSeconds = stateRebuildTimeoutSeconds;
this.retainedSlots = retainedSlots;
this.statePruningInterval = statePruningInterval;
this.statePruningLimit = statePruningLimit;
this.spec = spec;
}

Expand Down Expand Up @@ -133,6 +145,18 @@ public int getBlobsPruningLimit() {
return blobsPruningLimit;
}

public long getRetainedSlots() {
return retainedSlots;
}

public Duration getStatePruningInterval() {
return statePruningInterval;
}

public int getStatePruningLimit() {
return statePruningLimit;
}

public Spec getSpec() {
return spec;
}
Expand All @@ -152,6 +176,9 @@ public static final class Builder {
private Duration blobsPruningInterval = DEFAULT_BLOBS_PRUNING_INTERVAL;
private int blobsPruningLimit = DEFAULT_BLOBS_PRUNING_LIMIT;
private int stateRebuildTimeoutSeconds = DEFAULT_STATE_REBUILD_TIMEOUT_SECONDS;
private Duration statePruningInterval = DEFAULT_STATE_PRUNING_INTERVAL;
private long retainedSlots = DEFAULT_STORAGE_RETAINED_SLOTS;
private int statePruningLimit = DEFAULT_STATE_PRUNING_LIMIT;

private Builder() {}

Expand Down Expand Up @@ -244,6 +271,32 @@ public Builder blobsPruningLimit(final int blobsPruningLimit) {
return this;
}

public Builder retainedSlots(final long retainedSlots) {
if (retainedSlots < -1) {
throw new InvalidConfigurationException(
"Invalid number of slots to retain finalized states for");
}
this.retainedSlots = retainedSlots;
return this;
}

public Builder statePruningInterval(final Duration statePruningInterval) {
if (statePruningInterval.isNegative() || statePruningInterval.isZero()) {
throw new InvalidConfigurationException("Block pruning interval must be positive");
}
this.statePruningInterval = statePruningInterval;
return this;
}

public Builder statePruningLimit(final int statePruningLimit) {
if (statePruningLimit < 0) {
throw new InvalidConfigurationException(
String.format("Invalid statePruningLimit: %d", statePruningLimit));
}
this.statePruningLimit = statePruningLimit;
return this;
}

public StorageConfiguration build() {
determineDataStorageMode();
return new StorageConfiguration(
Expand All @@ -258,6 +311,9 @@ public StorageConfiguration build() {
blobsPruningInterval,
blobsPruningLimit,
stateRebuildTimeoutSeconds,
retainedSlots,
statePruningInterval,
statePruningLimit,
spec);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,84 @@ private void deleteFinalizedBlocks(final List<Pair<UInt64, Bytes32>> blocksToPru
}
}

@Override
public Optional<UInt64> pruneFinalizedStates(
final Optional<UInt64> lastPrunedSlot, final UInt64 lastSlotToPrune, final long pruneLimit) {
final Optional<UInt64> earliestFinalizedStateSlot;

if (lastPrunedSlot.isEmpty()) {
earliestFinalizedStateSlot = dao.getEarliestFinalizedStateSlot();
} else {
earliestFinalizedStateSlot = lastPrunedSlot;
}

LOG.debug(
"Earliest finalized state stored is for slot {}",
() ->
earliestFinalizedStateSlot.isEmpty()
? "EMPTY"
: earliestFinalizedStateSlot.get().toString());
return earliestFinalizedStateSlot
.map(uInt64 -> pruneFinalizedStateForSlots(uInt64, lastSlotToPrune, pruneLimit))
.or(() -> Optional.of(lastSlotToPrune));
}

private UInt64 pruneFinalizedStateForSlots(
final UInt64 earliestFinalizedStateSlot,
final UInt64 lastSlotToPrune,
final long pruneLimit) {
final List<Pair<UInt64, Optional<Bytes32>>> slotsToPruneStateFor;

try (final Stream<UInt64> stream =
dao.streamFinalizedStateSlots(earliestFinalizedStateSlot, lastSlotToPrune)) {
slotsToPruneStateFor =
stream
.limit(pruneLimit)
.map(
slot ->
Pair.of(
slot,
dao.getFinalizedBlockAtSlot(slot).map(SignedBeaconBlock::getStateRoot)))
.toList();
}
if (slotsToPruneStateFor.isEmpty()) {
LOG.debug("No finalized state to prune up to {} slot", lastSlotToPrune);
return lastSlotToPrune;
}

final UInt64 lastPrunedSlot =
slotsToPruneStateFor.get(slotsToPruneStateFor.size() - 1).getLeft();

deleteFinalizedStatesForSlot(slotsToPruneStateFor);

return slotsToPruneStateFor.size() < pruneLimit ? lastSlotToPrune : lastPrunedSlot;
}

private void deleteFinalizedStatesForSlot(
final List<Pair<UInt64, Optional<Bytes32>>> slotsToPruneStateFor) {
if (!slotsToPruneStateFor.isEmpty()) {
if (slotsToPruneStateFor.size() < 20) {
LOG.debug(
"Received finalized slots ({}) to delete state",
() -> slotsToPruneStateFor.stream().map(Pair::getLeft).toList());
} else {
LOG.debug("Received {} finalized slots to delete state", slotsToPruneStateFor.size());
}

try (final FinalizedUpdater updater = finalizedUpdater()) {
slotsToPruneStateFor.forEach(
pair -> {
updater.deleteFinalizedState(pair.getLeft());
pair.getRight().ifPresent(updater::deleteFinalizedStateRoot);
});

updater.commit();
} catch (Exception e) {
LOG.error("Failed to prune finalized states", e);
}
}
}

protected void updateHotBlocks(
final HotUpdater updater,
final Map<Bytes32, BlockAndCheckpoints> addedBlocks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ public Optional<SignedBeaconBlock> getEarliestFinalizedBlock() {
return db.getFirstEntry(schema.getColumnFinalizedBlocksBySlot()).map(ColumnEntry::getValue);
}

@Override
public Optional<UInt64> getEarliestFinalizedStateSlot() {
return stateStorageLogic.getEarliestAvailableFinalizedStateSlot(db, schema);
}

@Override
public Optional<SignedBeaconBlock> getLatestFinalizedBlockAtSlot(final UInt64 slot) {
return db.getFloorEntry(schema.getColumnFinalizedBlocksBySlot(), slot)
Expand Down Expand Up @@ -711,6 +716,11 @@ public void addFinalizedState(final Bytes32 blockRoot, final BeaconState state)
stateStorageUpdater.addFinalizedState(db, transaction, schema, state);
}

@Override
public void deleteFinalizedState(final UInt64 slot) {
stateStorageUpdater.deleteFinalizedState(transaction, schema, slot);
}

@Override
public void addReconstructedFinalizedState(final Bytes32 blockRoot, final BeaconState state) {
stateStorageUpdater.addReconstructedFinalizedState(db, transaction, schema, state);
Expand All @@ -721,6 +731,11 @@ public void addFinalizedStateRoot(final Bytes32 stateRoot, final UInt64 slot) {
transaction.put(schema.getColumnSlotsByFinalizedStateRoot(), stateRoot, slot);
}

@Override
public void deleteFinalizedStateRoot(final Bytes32 stateRoot) {
transaction.delete(schema.getColumnSlotsByFinalizedStateRoot(), stateRoot);
}

@Override
public void setOptimisticTransitionBlockSlot(final Optional<UInt64> transitionBlockSlot) {
if (transitionBlockSlot.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public interface KvStoreCombinedDao extends AutoCloseable {

Optional<SignedBeaconBlock> getEarliestFinalizedBlock();

Optional<UInt64> getEarliestFinalizedStateSlot();

Optional<SignedBeaconBlock> getLatestFinalizedBlockAtSlot(UInt64 slot);

List<SignedBeaconBlock> getNonCanonicalBlocksAtSlot(UInt64 slot);
Expand Down Expand Up @@ -234,10 +236,14 @@ interface FinalizedUpdater extends AutoCloseable {

void addFinalizedState(final Bytes32 blockRoot, final BeaconState state);

void deleteFinalizedState(final UInt64 slot);

void addReconstructedFinalizedState(final Bytes32 blockRoot, final BeaconState state);

void addFinalizedStateRoot(final Bytes32 stateRoot, final UInt64 slot);

void deleteFinalizedStateRoot(final Bytes32 stateRoot);

void setOptimisticTransitionBlockSlot(final Optional<UInt64> transitionBlockSlot);

void addNonCanonicalRootAtSlot(final UInt64 slot, final Set<Bytes32> blockRoots);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ public Optional<SignedBeaconBlock> getEarliestFinalizedBlock() {
return finalizedDao.getEarliestFinalizedBlock();
}

@Override
public Optional<UInt64> getEarliestFinalizedStateSlot() {
return finalizedDao.getEarliestFinalizedStateSlot();
}

@Override
public Optional<SignedBeaconBlock> getLatestFinalizedBlockAtSlot(final UInt64 slot) {
return finalizedDao.getLatestFinalizedBlockAtSlot(slot);
Expand Down Expand Up @@ -553,6 +558,11 @@ public void addFinalizedState(final Bytes32 blockRoot, final BeaconState state)
finalizedUpdater.addFinalizedState(blockRoot, state);
}

@Override
public void deleteFinalizedState(final UInt64 slot) {
finalizedUpdater.deleteFinalizedState(slot);
}

@Override
public void addReconstructedFinalizedState(final Bytes32 blockRoot, final BeaconState state) {
finalizedUpdater.addReconstructedFinalizedState(blockRoot, state);
Expand All @@ -563,6 +573,11 @@ public void addFinalizedStateRoot(final Bytes32 stateRoot, final UInt64 slot) {
finalizedUpdater.addFinalizedStateRoot(stateRoot, slot);
}

@Override
public void deleteFinalizedStateRoot(final Bytes32 stateRoot) {
finalizedUpdater.deleteFinalizedStateRoot(stateRoot);
}

@Override
public void setOptimisticTransitionBlockSlot(final Optional<UInt64> transitionBlockSlot) {
finalizedUpdater.setOptimisticTransitionBlockSlot(transitionBlockSlot);
Expand Down
Loading

0 comments on commit f0d8fbd

Please sign in to comment.