Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pruner for finalized states #8379

Merged
merged 30 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
58736e4
kvStore and db interface changes
gfukushima Jun 14, 2024
82ff501
add StatePruner and register in storage service
gfukushima Jun 14, 2024
9c11ce1
Add experimental command and default values in the storage config
gfukushima Jun 14, 2024
afba944
test
gfukushima Jun 18, 2024
87003ac
Add state pruning limit and cache last pruned slot
gfukushima Jun 19, 2024
4cb4383
Add validation to prevent users from enabling reconstruction of histo…
gfukushima Jun 19, 2024
c8cefcb
spotless
gfukushima Jun 19, 2024
82fd157
allow -1 for now as a sign of feature disabled
gfukushima Jun 19, 2024
817165d
Merge branch 'master' into state-pruner
gfukushima Jun 19, 2024
1560063
fix tests
gfukushima Jun 20, 2024
bc7af75
Change interface to return Optional
gfukushima Jun 20, 2024
2b12580
Merge remote-tracking branch 'origin/state-pruner' into state-pruner
gfukushima Jun 20, 2024
3e447d6
spotless
gfukushima Jun 20, 2024
6c6e3dd
Merge branch 'refs/heads/master' into state-pruner
gfukushima Jun 21, 2024
191147d
add validation for state pruning to enforce archive mode at config level
gfukushima Jun 21, 2024
ed73aa0
tweak logs
gfukushima Jun 21, 2024
a01aa0a
stream from finalized states not blocks
gfukushima Jun 21, 2024
0db66e7
spotless
gfukushima Jun 21, 2024
e32612f
adjust defaults to safer values
gfukushima Jun 25, 2024
c238b1f
change log to be more informative
gfukushima Jun 26, 2024
839320e
change flag unit from epochs to slots
gfukushima Jun 26, 2024
28869f5
spotless
gfukushima Jun 26, 2024
cc904c3
spotless
gfukushima Jun 26, 2024
88cbce0
Merge branch 'master' into state-pruner
gfukushima Jun 26, 2024
a4821e2
fix tests
gfukushima Jun 26, 2024
0879674
Merge remote-tracking branch 'origin/state-pruner' into state-pruner
gfukushima Jun 26, 2024
f6e3052
addressing comments and adding validation to prevent possible misconf…
gfukushima Jul 9, 2024
1d84229
add changelog entry
gfukushima Jul 10, 2024
d290255
Merge branch 'master' into state-pruner
gfukushima Jul 10, 2024
3b9611d
Merge branch 'master' into state-pruner
gfukushima Jul 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import static tech.pegasys.teku.spec.config.Constants.STORAGE_QUERY_CHANNEL_PARALLELISM;

import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.ethereum.pow.api.Eth1EventsChannel;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
Expand All @@ -40,6 +42,7 @@
import tech.pegasys.teku.storage.server.VersionedDatabaseFactory;
import tech.pegasys.teku.storage.server.pruner.BlobSidecarPruner;
import tech.pegasys.teku.storage.server.pruner.BlockPruner;
import tech.pegasys.teku.storage.server.pruner.StatePruner;

public class StorageService extends Service implements StorageServiceFacade {
private final StorageConfiguration config;
Expand All @@ -49,8 +52,10 @@
private volatile BatchingVoteUpdateChannel batchingVoteUpdateChannel;
private volatile Optional<BlockPruner> blockPruner = Optional.empty();
private volatile Optional<BlobSidecarPruner> blobsPruner = Optional.empty();
private volatile Optional<StatePruner> statePruner = Optional.empty();
private final boolean depositSnapshotStorageEnabled;
private final boolean blobSidecarsStorageCountersEnabled;
private static final Logger LOG = LogManager.getLogger();

public StorageService(
final ServiceConfig serviceConfig,
Expand All @@ -61,6 +66,7 @@
this.config = storageConfiguration;
this.depositSnapshotStorageEnabled = depositSnapshotStorageEnabled;
this.blobSidecarsStorageCountersEnabled = blobSidecarsStorageCountersEnabled;
LOG.info("Storage service created");
gfukushima marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down Expand Up @@ -111,6 +117,25 @@
pruningTimingsLabelledGauge,
pruningActiveLabelledGauge));
}
LOG.info(
"Data storage mode: {}, Retained epochs {}",
config.getDataStorageMode(),
config.getRetainedEpochs());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably want to be mindful of logs here... maybe either put this to debug, or only log this when we're in archive mode...
also probalby want to think about this service WRT tree storage mode..

Copy link
Contributor Author

@gfukushima gfukushima Jun 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, these are some leftovers I missed from a test. I'm doing a cleanup on some of those atm. the one that survive the clean up in the end they will all be at debug level

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed this to be a more informative log but kept it at INFO level since it is only printed at startup, WDYT?

if (config.getDataStorageMode().storesFinalizedStates()
&& config.getRetainedEpochs() > -1) {
statePruner =
Optional.of(
new StatePruner(
config.getSpec(),
database,
storagePrunerAsyncRunner,
config.getStatePruningInterval(),
config.getRetainedEpochs(),
config.getStatePruningLimit(),
"state",
pruningTimingsLabelledGauge,
pruningActiveLabelledGauge));
}
if (config.getSpec().isMilestoneSupported(SpecMilestone.DENEB)) {
blobsPruner =
Optional.of(
Expand Down Expand Up @@ -170,6 +195,11 @@
__ ->
blobsPruner
.map(BlobSidecarPruner::start)
.orElseGet(() -> SafeFuture.completedFuture(null)))
.thenCompose(
__ ->
Dismissed Show dismissed Hide dismissed
statePruner
.map(StatePruner::start)
.orElseGet(() -> SafeFuture.completedFuture(null)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,7 @@ default Stream<SlotAndBlockRootAndBlobIndex> streamBlobSidecarKeys(final UInt64
* @return actual last pruned slot
*/
UInt64 pruneFinalizedBlocks(UInt64 lastSlotToPrune, int pruneLimit);

Optional<UInt64> pruneFinalizedStates(
Optional<UInt64> lastPrunedSlot, UInt64 lastSlotToPruneStateFor, long pruneLimit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public class StorageConfiguration {
public static final Duration DEFAULT_BLOCK_PRUNING_INTERVAL = Duration.ofMinutes(15);
public static final int DEFAULT_BLOCK_PRUNING_LIMIT = 5000;
public static final Duration DEFAULT_BLOBS_PRUNING_INTERVAL = Duration.ofMinutes(1);
public static final Duration DEFAULT_STATE_PRUNING_INTERVAL = Duration.ofMinutes(10);
public static final long DEFAULT_STORAGE_RETAINED_EPOCHS = -1;
public static final int DEFAULT_STATE_PRUNING_LIMIT = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think if this runs during epoch transition it may get interesting, more of a general observation about all of these types of tasks i guess... we probably want to build out a design at some point that gives us more confidence about what parts of an epoch these tasks might be allowed...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh, this has been the most difficult part so far, to find/define a good set up, I thought about something a little more dynamic. E.g having the interval being defined by epochs_retained and epoch_durantion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Paul that we could have a "no flight time window" which could be epoch transition +/- 1 or 2 slots (ie from 31 to 2). The question will be how to handle tasks falling in that window:

  • skip: (pro: easy, cons: no execution guarantee for a long time)
  • queue them to the end of the time window (slot 2): (pro: execution guarantees, cons: multiple tasks can potentially run at the same time)

Just dumped some thoughts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is to link these prunings explicitly to certain slots, so the frequency will be an epoch for all of them, and we will have an implicit control over the epoch transition overlap (not guaranteed but reasonable).


// 60/12 = 5 blocks per minute * 6 max blobs per block = 30 blobs per minute at maximum, 15 as
// target. Let's configure 48 pruning per minute, so we have some room for catching up.
Expand All @@ -53,8 +56,11 @@ public class StorageConfiguration {
private final int maxKnownNodeCacheSize;
private final Duration blockPruningInterval;
private final int blockPruningLimit;
private final Duration statePruningInterval;
private final Duration blobsPruningInterval;
private final int blobsPruningLimit;
private final long retainedEpochs;
private final int statePruningLimit;

private final int stateRebuildTimeoutSeconds;

Expand All @@ -70,6 +76,9 @@ private StorageConfiguration(
final Duration blobsPruningInterval,
final int blobsPruningLimit,
final int stateRebuildTimeoutSeconds,
final long retainedEpochs,
final Duration statePruningInterval,
final int statePruningLimit,
final Spec spec) {
this.eth1DepositContract = eth1DepositContract;
this.dataStorageMode = dataStorageMode;
Expand All @@ -82,6 +91,9 @@ private StorageConfiguration(
this.blobsPruningInterval = blobsPruningInterval;
this.blobsPruningLimit = blobsPruningLimit;
this.stateRebuildTimeoutSeconds = stateRebuildTimeoutSeconds;
this.retainedEpochs = retainedEpochs;
this.statePruningInterval = statePruningInterval;
this.statePruningLimit = statePruningLimit;
this.spec = spec;
}

Expand Down Expand Up @@ -133,6 +145,18 @@ public int getBlobsPruningLimit() {
return blobsPruningLimit;
}

public long getRetainedEpochs() {
return retainedEpochs;
}

public Duration getStatePruningInterval() {
return statePruningInterval;
}

public int getStatePruningLimit() {
return statePruningLimit;
}

public Spec getSpec() {
return spec;
}
Expand All @@ -152,6 +176,9 @@ public static final class Builder {
private Duration blobsPruningInterval = DEFAULT_BLOBS_PRUNING_INTERVAL;
private int blobsPruningLimit = DEFAULT_BLOBS_PRUNING_LIMIT;
private int stateRebuildTimeoutSeconds = DEFAULT_STATE_REBUILD_TIMEOUT_SECONDS;
private Duration statePruningInterval = DEFAULT_STATE_PRUNING_INTERVAL;
private long retainedEpochs = DEFAULT_STORAGE_RETAINED_EPOCHS;
private int statePruningLimit = DEFAULT_STATE_PRUNING_LIMIT;

private Builder() {}

Expand Down Expand Up @@ -244,6 +271,31 @@ public Builder blobsPruningLimit(final int blobsPruningLimit) {
return this;
}

public Builder retainedEpochs(final long retainedEpochs) {
if (retainedEpochs < -1) {
throw new InvalidConfigurationException("Invalid number of states to be retained");
}
this.retainedEpochs = retainedEpochs;
return this;
}

public Builder statePruningInterval(final Duration statePruningInterval) {
if (statePruningInterval.isNegative() || statePruningInterval.isZero()) {
throw new InvalidConfigurationException("Block pruning interval must be positive");
}
this.statePruningInterval = statePruningInterval;
return this;
}

public Builder statePruningLimit(final int statePruningLimit) {
if (statePruningLimit < 0) {
throw new InvalidConfigurationException(
String.format("Invalid statePruningLimit: %d", statePruningLimit));
}
this.statePruningLimit = statePruningLimit;
return this;
}

public StorageConfiguration build() {
determineDataStorageMode();
return new StorageConfiguration(
Expand All @@ -258,6 +310,9 @@ public StorageConfiguration build() {
blobsPruningInterval,
blobsPruningLimit,
stateRebuildTimeoutSeconds,
retainedEpochs,
statePruningInterval,
statePruningLimit,
spec);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;
Expand Down Expand Up @@ -432,6 +433,84 @@ private void deleteFinalizedBlocks(final List<Pair<UInt64, Bytes32>> blocksToPru
}
}

@Override
public Optional<UInt64> pruneFinalizedStates(
final Optional<UInt64> lastPrunedSlot, final UInt64 lastSlotToPrune, final long pruneLimit) {
final Optional<UInt64> earliestFinalizedStateSlot;

if (lastPrunedSlot.isEmpty()) {
earliestFinalizedStateSlot = dao.getEarliestFinalizedStateSlot();
} else {
earliestFinalizedStateSlot = lastPrunedSlot;
}

LOG.debug(
"Earliest finalized state stored is for slot {}",
() ->
earliestFinalizedStateSlot.isEmpty()
? "EMPTY"
: earliestFinalizedStateSlot.get().toString());
return earliestFinalizedStateSlot
.map(uInt64 -> pruneFinalizedStateForSlots(uInt64, lastSlotToPrune, pruneLimit))
.or(() -> Optional.of(lastSlotToPrune));
}

private UInt64 pruneFinalizedStateForSlots(
final UInt64 earliestFinalizedStateSlot,
final UInt64 lastSlotToPrune,
final long pruneLimit) {
final List<Triple<UInt64, Bytes32, Bytes32>> slotsToPruneStateFor;

try (final Stream<SignedBeaconBlock> stream =
dao.streamFinalizedBlocks(earliestFinalizedStateSlot, lastSlotToPrune)) {
slotsToPruneStateFor =
// fetch slot for log purposes, stateroot and blockroot (keys used to store state)
stream
.limit(pruneLimit)
.map(block -> Triple.of(block.getSlot(), block.getRoot(), block.getStateRoot()))
.toList();
}
if (slotsToPruneStateFor.isEmpty()) {
LOG.debug("No finalized state to prune up to {} slot", lastSlotToPrune);
return lastSlotToPrune;
}

final UInt64 lastPrunedBlockSlot =
gfukushima marked this conversation as resolved.
Show resolved Hide resolved
slotsToPruneStateFor.get(slotsToPruneStateFor.size() - 1).getLeft();
LOG.debug(
"Pruning {} finalized state, last block slot is {}",
slotsToPruneStateFor.size(),
lastPrunedBlockSlot);
deleteFinalizedStatesForSlot(slotsToPruneStateFor);

return slotsToPruneStateFor.size() < pruneLimit ? lastSlotToPrune : lastPrunedBlockSlot;
}

private void deleteFinalizedStatesForSlot(
final List<Triple<UInt64, Bytes32, Bytes32>> slotsToPruneStateFor) {
if (!slotsToPruneStateFor.isEmpty()) {
if (slotsToPruneStateFor.size() < 20) {
LOG.debug(
"Received slots ({}) to delete finalized state for",
() -> slotsToPruneStateFor.stream().map(Triple::getLeft).toList());
} else {
LOG.debug("Received {} finalized blocks to delete", slotsToPruneStateFor.size());
}

try (final FinalizedUpdater updater = finalizedUpdater()) {
slotsToPruneStateFor.forEach(
triple -> {
updater.deleteFinalizedState(triple.getLeft());
updater.deleteFinalizedStateRoot(triple.getRight());
});

updater.commit();
} catch (Exception e) {
LOG.error("Failed to prune finalized states", e);
}
}
}

protected void updateHotBlocks(
final HotUpdater updater,
final Map<Bytes32, BlockAndCheckpoints> addedBlocks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ public Optional<SignedBeaconBlock> getEarliestFinalizedBlock() {
return db.getFirstEntry(schema.getColumnFinalizedBlocksBySlot()).map(ColumnEntry::getValue);
}

@Override
public Optional<UInt64> getEarliestFinalizedStateSlot() {
return stateStorageLogic.getEarliestAvailableFinalizedStateSlot(db, schema);
}

@Override
public Optional<SignedBeaconBlock> getLatestFinalizedBlockAtSlot(final UInt64 slot) {
return db.getFloorEntry(schema.getColumnFinalizedBlocksBySlot(), slot)
Expand Down Expand Up @@ -711,6 +716,11 @@ public void addFinalizedState(final Bytes32 blockRoot, final BeaconState state)
stateStorageUpdater.addFinalizedState(db, transaction, schema, state);
}

@Override
public void deleteFinalizedState(final UInt64 slot) {
stateStorageUpdater.deleteFinalizedState(transaction, schema, slot);
}

@Override
public void addReconstructedFinalizedState(final Bytes32 blockRoot, final BeaconState state) {
stateStorageUpdater.addReconstructedFinalizedState(db, transaction, schema, state);
Expand All @@ -721,6 +731,11 @@ public void addFinalizedStateRoot(final Bytes32 stateRoot, final UInt64 slot) {
transaction.put(schema.getColumnSlotsByFinalizedStateRoot(), stateRoot, slot);
}

@Override
public void deleteFinalizedStateRoot(final Bytes32 stateRoot) {
transaction.delete(schema.getColumnSlotsByFinalizedStateRoot(), stateRoot);
}

@Override
public void setOptimisticTransitionBlockSlot(final Optional<UInt64> transitionBlockSlot) {
if (transitionBlockSlot.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public interface KvStoreCombinedDao extends AutoCloseable {

Optional<SignedBeaconBlock> getEarliestFinalizedBlock();

Optional<UInt64> getEarliestFinalizedStateSlot();

Optional<SignedBeaconBlock> getLatestFinalizedBlockAtSlot(UInt64 slot);

List<SignedBeaconBlock> getNonCanonicalBlocksAtSlot(UInt64 slot);
Expand Down Expand Up @@ -234,10 +236,14 @@ interface FinalizedUpdater extends AutoCloseable {

void addFinalizedState(final Bytes32 blockRoot, final BeaconState state);

void deleteFinalizedState(final UInt64 slot);

void addReconstructedFinalizedState(final Bytes32 blockRoot, final BeaconState state);

void addFinalizedStateRoot(final Bytes32 stateRoot, final UInt64 slot);

void deleteFinalizedStateRoot(final Bytes32 stateRoot);

void setOptimisticTransitionBlockSlot(final Optional<UInt64> transitionBlockSlot);

void addNonCanonicalRootAtSlot(final UInt64 slot, final Set<Bytes32> blockRoots);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ public Optional<SignedBeaconBlock> getEarliestFinalizedBlock() {
return finalizedDao.getEarliestFinalizedBlock();
}

@Override
public Optional<UInt64> getEarliestFinalizedStateSlot() {
return finalizedDao.getEarliestFinalizedStateSlot();
}

@Override
public Optional<SignedBeaconBlock> getLatestFinalizedBlockAtSlot(final UInt64 slot) {
return finalizedDao.getLatestFinalizedBlockAtSlot(slot);
Expand Down Expand Up @@ -553,6 +558,11 @@ public void addFinalizedState(final Bytes32 blockRoot, final BeaconState state)
finalizedUpdater.addFinalizedState(blockRoot, state);
}

@Override
public void deleteFinalizedState(final UInt64 slot) {
finalizedUpdater.deleteFinalizedState(slot);
}

gfukushima marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void addReconstructedFinalizedState(final Bytes32 blockRoot, final BeaconState state) {
finalizedUpdater.addReconstructedFinalizedState(blockRoot, state);
Expand All @@ -563,6 +573,11 @@ public void addFinalizedStateRoot(final Bytes32 stateRoot, final UInt64 slot) {
finalizedUpdater.addFinalizedStateRoot(stateRoot, slot);
}

@Override
public void deleteFinalizedStateRoot(final Bytes32 stateRoot) {
finalizedUpdater.deleteFinalizedStateRoot(stateRoot);
}

@Override
public void setOptimisticTransitionBlockSlot(final Optional<UInt64> transitionBlockSlot) {
finalizedUpdater.setOptimisticTransitionBlockSlot(transitionBlockSlot);
Expand Down
Loading