Skip to content

Commit

Permalink
Save earliest slot block in a DB variable (#8722)
Browse files Browse the repository at this point in the history
Adding a variable in the database to store the earliest available block slot which is used when peers request blocks by range. This is a query that can vary in some case and I've seen it causing disconnection in some cases due to multiple requests getting timed out. Changing it to be stored in a variable brings the query to 1 - 2 ms response time. It does add a bit of logic to handle the sync of historical batches and prune which are currently the workflow in the code base that affect that column family.

Signed-off-by: Gabriel Fukushima <[email protected]>

---------

Signed-off-by: Gabriel Fukushima <[email protected]>
  • Loading branch information
gfukushima authored Oct 22, 2024
1 parent 108e5b9 commit 2550254
Show file tree
Hide file tree
Showing 15 changed files with 228 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2207,7 +2207,8 @@ public void pruneFinalizedBlocks_shouldRemoveFinalizedBlocks(final DatabaseConte
spec.computeEpochAtSlot(finalizedBlock.getSlot()).plus(1), finalizedBlock);
assertThat(database.getFinalizedBlockAtSlot(UInt64.valueOf(6))).isPresent();

final UInt64 lastPrunedSlot1 = database.pruneFinalizedBlocks(UInt64.valueOf(3), 100);
final UInt64 lastPrunedSlot1 =
database.pruneFinalizedBlocks(UInt64.valueOf(3), 100, UInt64.valueOf(10));
assertThat(lastPrunedSlot1).isEqualTo(UInt64.valueOf(3));
assertThat(database.getFinalizedBlockAtSlot(UInt64.valueOf(0))).isEmpty();
assertThat(database.getFinalizedBlockAtSlot(UInt64.valueOf(1))).isEmpty();
Expand All @@ -2217,16 +2218,89 @@ public void pruneFinalizedBlocks_shouldRemoveFinalizedBlocks(final DatabaseConte
assertThat(database.getFinalizedBlockAtSlot(UInt64.valueOf(5))).isPresent();
assertThat(database.getFinalizedBlockAtSlot(UInt64.valueOf(6))).isPresent();

final UInt64 lastPrunedSlot2 = database.pruneFinalizedBlocks(UInt64.valueOf(5), 1);
final UInt64 lastPrunedSlot2 =
database.pruneFinalizedBlocks(UInt64.valueOf(5), 1, UInt64.valueOf(10));
assertThat(lastPrunedSlot2).isEqualTo(UInt64.valueOf(4));
assertThat(database.getFinalizedBlockAtSlot(UInt64.valueOf(4))).isEmpty();
assertThat(database.getFinalizedBlockAtSlot(UInt64.valueOf(5))).isPresent();
assertThat(database.getFinalizedBlockAtSlot(UInt64.valueOf(6))).isPresent();

final UInt64 lastPrunedSlot3 = database.pruneFinalizedBlocks(UInt64.valueOf(4), 1);
final UInt64 lastPrunedSlot3 =
database.pruneFinalizedBlocks(UInt64.valueOf(4), 1, UInt64.valueOf(10));
assertThat(lastPrunedSlot3).isEqualTo(UInt64.valueOf(4));
}

@TestTemplate
public void pruneFinalizedBlocks_UpdatesEarliestAvailableBlockSlot(final DatabaseContext context)
throws Exception {
initialize(context, StateStorageMode.ARCHIVE);
final List<SignedBlockAndState> blockAndStates = chainBuilder.generateBlocksUpToSlot(5);
addBlocks(blockAndStates);
// Block 7 skipped simulating it was an empty block
final SignedBlockAndState finalizedBlock = chainBuilder.generateBlockAtSlot(7);
addBlocks(finalizedBlock);
justifyAndFinalizeEpoch(
spec.computeEpochAtSlot(finalizedBlock.getSlot()).plus(1), finalizedBlock);
assertThat(database.getFinalizedBlockAtSlot(UInt64.valueOf(6))).isEmpty();

final UInt64 lastPrunedSlot1 =
database.pruneFinalizedBlocks(UInt64.valueOf(3), 100, UInt64.valueOf(10));
assertThat(lastPrunedSlot1).isEqualTo(UInt64.valueOf(3));
assertThat(database.getEarliestAvailableBlockSlot()).isEqualTo(Optional.of(UInt64.valueOf(4)));

final UInt64 lastPrunedSlot2 =
database.pruneFinalizedBlocks(UInt64.valueOf(5), 10, UInt64.valueOf(10));
assertThat(lastPrunedSlot2).isEqualTo(UInt64.valueOf(5));
// there's no slot 6 because that was purposely skipped so we expect the earliest available
// block slot to be 7
assertThat(database.getEarliestAvailableBlockSlot()).isEqualTo(Optional.of(UInt64.valueOf(7)));
}

@TestTemplate
public void pruneFinalizedBlocks_UpdatesEarliestAvailableBlockSlotWhenLimited(
final DatabaseContext context) throws Exception {
initialize(context, StateStorageMode.ARCHIVE);
final List<SignedBlockAndState> blockAndStates = chainBuilder.generateBlocksUpToSlot(5);
addBlocks(blockAndStates);
// Block 7 skipped simulating it was an empty block
final SignedBlockAndState finalizedBlock = chainBuilder.generateBlockAtSlot(7);
addBlocks(finalizedBlock);
justifyAndFinalizeEpoch(
spec.computeEpochAtSlot(finalizedBlock.getSlot()).plus(1), finalizedBlock);
assertThat(database.getFinalizedBlockAtSlot(UInt64.valueOf(6))).isEmpty();

final UInt64 lastPrunedSlot1 =
database.pruneFinalizedBlocks(UInt64.valueOf(3), 2, UInt64.valueOf(10));
assertThat(lastPrunedSlot1).isEqualTo(UInt64.valueOf(1));
assertThat(database.getEarliestAvailableBlockSlot()).isEqualTo(Optional.of(UInt64.valueOf(2)));

final UInt64 lastPrunedSlot2 =
database.pruneFinalizedBlocks(UInt64.valueOf(5), 10, UInt64.valueOf(10));
assertThat(lastPrunedSlot2).isEqualTo(UInt64.valueOf(5));
// there's no slot 6 because that was purposely skipped so we expect the earliest available
// block slot to be 7
assertThat(database.getEarliestAvailableBlockSlot()).isEqualTo(Optional.of(UInt64.valueOf(7)));
}

@TestTemplate
public void
pruneFinalizedBlocks_ClearEarliestAvailableBlockSlotVariableWhenNoBlocksLeftAfterPrune(
final DatabaseContext context) throws Exception {
initialize(context, StateStorageMode.ARCHIVE);
final List<SignedBlockAndState> blockAndStates = chainBuilder.generateBlocksUpToSlot(5);
addBlocks(blockAndStates);
// Block 7 skipped simulating it was an empty block
final SignedBlockAndState finalizedBlock = chainBuilder.generateBlockAtSlot(7);
addBlocks(finalizedBlock);
justifyAndFinalizeEpoch(
spec.computeEpochAtSlot(finalizedBlock.getSlot()).plus(1), finalizedBlock);

final UInt64 lastPrunedSlot1 =
database.pruneFinalizedBlocks(UInt64.valueOf(7), 10, UInt64.valueOf(10));
assertThat(lastPrunedSlot1).isEqualTo(UInt64.valueOf(7));
assertThat(database.getEarliestAvailableBlockSlot()).isEqualTo(Optional.empty());
}

private List<Map.Entry<Bytes32, UInt64>> getFinalizedStateRootsList() {
try (final Stream<Map.Entry<Bytes32, UInt64>> roots = database.getFinalizedStateRoots()) {
return roots.map(entry -> Map.entry(entry.getKey(), entry.getValue())).collect(toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,11 @@ default Stream<SlotAndBlockRootAndBlobIndex> streamBlobSidecarKeys(final UInt64
*
* @param lastSlotToPrune inclusive, not reached if limit happens first
* @param pruneLimit slots limit
* @param checkpointInitialSlot
* @return actual last pruned slot
*/
UInt64 pruneFinalizedBlocks(UInt64 lastSlotToPrune, int pruneLimit);
UInt64 pruneFinalizedBlocks(
UInt64 lastSlotToPrune, int pruneLimit, final UInt64 checkpointInitialSlot);

Optional<UInt64> pruneFinalizedStates(
Optional<UInt64> lastPrunedSlot, UInt64 lastSlotToPruneStateFor, long pruneLimit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,34 @@ protected void storeFinalizedBlocksToDao(
.forEach(updater::addBlobSidecar);
});

needToUpdateEarliestBlockSlot(blocks.stream().findFirst())
.ifPresent(updater::setEarliestBlockSlot);
needToUpdateEarliestBlobSidecarSlot(maybeEarliestBlobSidecar)
.ifPresent(updater::setEarliestBlobSidecarSlot);
updater.commit();
}
}

private Optional<UInt64> needToUpdateEarliestBlockSlot(
final Optional<SignedBeaconBlock> maybeNewEarliestBlockSlot) {
// New value is absent - not updating
if (maybeNewEarliestBlockSlot.isEmpty()) {
return Optional.empty();
}
// New value is present, value from DB is absent - updating
final Optional<UInt64> maybeEarliestFinalizedBlockSlotDb = dao.getEarliestFinalizedBlockSlot();
if (maybeEarliestFinalizedBlockSlotDb.isEmpty()) {
return maybeNewEarliestBlockSlot.map(SignedBeaconBlock::getSlot);
}
// New value is smaller than value from DB - updating
final UInt64 newEarliestBlockSlot = maybeNewEarliestBlockSlot.get().getSlot();
if (newEarliestBlockSlot.isLessThan(maybeEarliestFinalizedBlockSlotDb.get())) {
return maybeNewEarliestBlockSlot.map(SignedBeaconBlock::getSlot);
} else {
return Optional.empty();
}
}

private Optional<UInt64> needToUpdateEarliestBlobSidecarSlot(
final Optional<UInt64> maybeNewEarliestBlobSidecarSlot) {
// New value is absent - not updating
Expand Down Expand Up @@ -377,7 +399,8 @@ public void deleteHotBlocks(final Set<Bytes32> blockRootsToDelete) {
}

@Override
public UInt64 pruneFinalizedBlocks(final UInt64 lastSlotToPrune, final int pruneLimit) {
public UInt64 pruneFinalizedBlocks(
final UInt64 lastSlotToPrune, final int pruneLimit, final UInt64 checkpointInitialSlot) {
final Optional<UInt64> earliestBlockSlot =
dao.getEarliestFinalizedBlock().map(SignedBeaconBlock::getSlot);
LOG.debug(
Expand All @@ -386,14 +409,18 @@ public UInt64 pruneFinalizedBlocks(final UInt64 lastSlotToPrune, final int prune
if (earliestBlockSlot.isEmpty()) {
return lastSlotToPrune;
}
return pruneToBlock(lastSlotToPrune, pruneLimit);
return pruneToBlock(lastSlotToPrune, pruneLimit, checkpointInitialSlot);
}

private UInt64 pruneToBlock(final UInt64 lastSlotToPrune, final int pruneLimit) {
private UInt64 pruneToBlock(
final UInt64 lastSlotToPrune, final int pruneLimit, final UInt64 checkpointInitialSlot) {
final List<Pair<UInt64, Bytes32>> blocksToPrune;
final Optional<UInt64> earliestSlotAvailableAfterPrune;
LOG.debug("Pruning finalized blocks to slot {} (included)", lastSlotToPrune);
try (final Stream<SignedBeaconBlock> stream =
dao.streamFinalizedBlocks(UInt64.ZERO, lastSlotToPrune)) {
// get an extra block to set earliest finalized block slot available after pruning runs
// ensuring it is an existing block in the DB
blocksToPrune =
stream.limit(pruneLimit).map(block -> Pair.of(block.getSlot(), block.getRoot())).toList();
}
Expand All @@ -402,17 +429,30 @@ private UInt64 pruneToBlock(final UInt64 lastSlotToPrune, final int pruneLimit)
LOG.debug("No finalized blocks to prune up to {} slot", lastSlotToPrune);
return lastSlotToPrune;
}

try (final Stream<SignedBeaconBlock> stream =
dao.streamFinalizedBlocks(UInt64.ZERO, checkpointInitialSlot)) {

earliestSlotAvailableAfterPrune =
stream
.map(SignedBeaconBlock::getSlot)
.filter(slot -> slot.isGreaterThan(blocksToPrune.getLast().getLeft()))
.findFirst();
}

final UInt64 lastPrunedBlockSlot = blocksToPrune.getLast().getKey();
LOG.debug(
"Pruning {} finalized blocks, last block slot is {}",
blocksToPrune.size(),
lastPrunedBlockSlot);
deleteFinalizedBlocks(blocksToPrune);
deleteFinalizedBlocks(blocksToPrune, earliestSlotAvailableAfterPrune);

return blocksToPrune.size() < pruneLimit ? lastSlotToPrune : lastPrunedBlockSlot;
}

private void deleteFinalizedBlocks(final List<Pair<UInt64, Bytes32>> blocksToPrune) {
private void deleteFinalizedBlocks(
final List<Pair<UInt64, Bytes32>> blocksToPrune,
final Optional<UInt64> earliestSlotAvailableAfterPrune) {
if (blocksToPrune.size() > 0) {
if (blocksToPrune.size() < 20) {
LOG.debug(
Expand All @@ -425,6 +465,8 @@ private void deleteFinalizedBlocks(final List<Pair<UInt64, Bytes32>> blocksToPru
try (final FinalizedUpdater updater = finalizedUpdater()) {
blocksToPrune.forEach(
pair -> updater.deleteFinalizedBlock(pair.getLeft(), pair.getRight()));
earliestSlotAvailableAfterPrune.ifPresentOrElse(
updater::setEarliestBlockSlot, updater::deleteEarliestBlockSlot);
updater.commit();
}
}
Expand Down Expand Up @@ -578,6 +620,7 @@ public void storeInitialAnchor(final AnchorPoint initialAnchor) {
&& spec.atSlot(initialAnchor.getSlot())
.getMilestone()
.isGreaterThanOrEqualTo(SpecMilestone.DENEB)) {
updater.setEarliestBlockSlot(initialAnchor.getSlot());
updater.setEarliestBlobSidecarSlot(initialAnchor.getSlot());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,10 @@ public Optional<SignedBeaconBlock> getFinalizedBlockAtSlot(final UInt64 slot) {

@Override
public Optional<UInt64> getEarliestFinalizedBlockSlot() {
return db.getFirstEntry(schema.getColumnFinalizedBlocksBySlot()).map(ColumnEntry::getKey);
return db.get(schema.getVariableEarliestBlockSlot())
.or(
() ->
db.getFirstEntry(schema.getColumnFinalizedBlocksBySlot()).map(ColumnEntry::getKey));
}

@Override
Expand Down Expand Up @@ -649,6 +652,16 @@ public void setEarliestBlobSidecarSlot(final UInt64 slot) {
transaction.put(schema.getVariableEarliestBlobSidecarSlot(), slot);
}

@Override
public void setEarliestBlockSlot(final UInt64 slot) {
transaction.put(schema.getVariableEarliestBlockSlot(), slot);
}

@Override
public void deleteEarliestBlockSlot() {
transaction.delete(schema.getVariableEarliestBlockSlot());
}

@Override
public void commit() {
// Commit db updates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ interface FinalizedUpdater extends AutoCloseable {

void setEarliestBlobSidecarSlot(UInt64 slot);

void setEarliestBlockSlot(UInt64 slot);

void deleteEarliestBlockSlot();

void commit();

void cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,16 @@ public void setEarliestBlobSidecarSlot(final UInt64 slot) {
finalizedUpdater.setEarliestBlobSidecarSlot(slot);
}

@Override
public void setEarliestBlockSlot(final UInt64 slot) {
finalizedUpdater.setEarliestBlockSlot(slot);
}

@Override
public void deleteEarliestBlockSlot() {
finalizedUpdater.deleteEarliestBlockSlot();
}

@Override
public void addMinGenesisTimeBlock(final MinGenesisTimeBlockEvent event) {
hotUpdater.addMinGenesisTimeBlock(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ public Optional<SignedBeaconBlock> getFinalizedBlockAtSlot(final UInt64 slot) {
}

public Optional<UInt64> getEarliestFinalizedBlockSlot() {
return db.getFirstEntry(schema.getColumnFinalizedBlocksBySlot()).map(ColumnEntry::getKey);
return db.get(schema.getVariableEarliestBlockSlot())
.or(
() ->
db.getFirstEntry(schema.getColumnFinalizedBlocksBySlot()).map(ColumnEntry::getKey));
}

public Optional<UInt64> getEarliestFinalizedStateSlot() {
Expand Down Expand Up @@ -412,6 +415,16 @@ public void setEarliestBlobSidecarSlot(final UInt64 slot) {
transaction.put(schema.getVariableEarliestBlobSidecarSlot(), slot);
}

@Override
public void setEarliestBlockSlot(final UInt64 slot) {
transaction.put(schema.getVariableEarliestBlockSlot(), slot);
}

@Override
public void deleteEarliestBlockSlot() {
transaction.delete(schema.getVariableEarliestBlockSlot());
}

@Override
public void commit() {
// Commit db updates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public interface SchemaCombined extends Schema {

KvStoreVariable<UInt64> getVariableEarliestBlobSidecarSlot();

KvStoreVariable<UInt64> getVariableEarliestBlockSlot();

KvStoreVariable<DepositTreeSnapshot> getVariableFinalizedDepositSnapshot();

Map<String, KvStoreColumn<?, ?>> getColumnMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,17 @@ public KvStoreVariable<UInt64> getVariableEarliestBlobSidecarSlot() {
return delegate.getVariableEarliestBlobSidecarSlot();
}

public KvStoreVariable<UInt64> getVariableEarliestBlockSlot() {
return delegate.getVariableEarliestBlockSlot();
}

public Map<String, KvStoreVariable<?>> getVariableMap() {
return Map.of(
"OPTIMISTIC_TRANSITION_BLOCK_SLOT",
getOptimisticTransitionBlockSlot(),
"EARLIEST_BLOB_SIDECAR_SLOT",
getVariableEarliestBlobSidecarSlot());
getVariableEarliestBlobSidecarSlot(),
"EARLIEST_BLOCK_SLOT_AVAILABLE",
getVariableEarliestBlockSlot());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public abstract class V6SchemaCombined implements SchemaCombined {

private final KvStoreVariable<UInt64> optimisticTransitionBlockSlot;
private final KvStoreVariable<UInt64> earliestBlobSidecarSlot;
private final KvStoreVariable<UInt64> earliestBlockSlot;

protected V6SchemaCombined(final Spec spec, final int finalizedOffset) {
this.finalizedOffset = finalizedOffset;
Expand All @@ -100,6 +101,7 @@ protected V6SchemaCombined(final Spec spec, final int finalizedOffset) {

optimisticTransitionBlockSlot = KvStoreVariable.create(finalizedOffset + 1, UINT64_SERIALIZER);
earliestBlobSidecarSlot = KvStoreVariable.create(finalizedOffset + 2, UINT64_SERIALIZER);
earliestBlockSlot = KvStoreVariable.create(finalizedOffset + 3, UINT64_SERIALIZER);
}

@Override
Expand Down Expand Up @@ -192,6 +194,11 @@ public KvStoreVariable<UInt64> getVariableEarliestBlobSidecarSlot() {
return earliestBlobSidecarSlot;
}

@Override
public KvStoreVariable<UInt64> getVariableEarliestBlockSlot() {
return earliestBlockSlot;
}

@Override
public Map<String, KvStoreColumn<?, ?>> getColumnMap() {
return ImmutableMap.<String, KvStoreColumn<?, ?>>builder()
Expand Down Expand Up @@ -227,6 +234,7 @@ public Map<String, KvStoreVariable<?>> getVariableMap() {
.put("OPTIMISTIC_TRANSITION_BLOCK_SLOT", getOptimisticTransitionBlockSlot())
.put("FINALIZED_DEPOSIT_SNAPSHOT", getVariableFinalizedDepositSnapshot())
.put("EARLIEST_BLOB_SIDECAR_SLOT", getVariableEarliestBlobSidecarSlot())
.put("EARLIEST_BLOCK_SLOT_AVAILABLE", getVariableEarliestBlockSlot())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public Map<String, KvStoreVariable<?>> getVariableMap() {
.put("OPTIMISTIC_TRANSITION_BLOCK_SLOT", getOptimisticTransitionBlockSlot())
.put("FINALIZED_DEPOSIT_SNAPSHOT", getVariableFinalizedDepositSnapshot())
.put("EARLIEST_BLOB_SIDECAR_SLOT", getVariableEarliestBlobSidecarSlot())
.put("EARLIEST_BLOCK_SLOT", getVariableEarliestBlockSlot())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ public Optional<DepositTreeSnapshot> getFinalizedDepositSnapshot() {
public void setFinalizedDepositSnapshot(final DepositTreeSnapshot finalizedDepositSnapshot) {}

@Override
public UInt64 pruneFinalizedBlocks(final UInt64 lastSlotToPrune, final int pruneLimit) {
public UInt64 pruneFinalizedBlocks(
final UInt64 lastSlotToPrune, final int pruneLimit, final UInt64 checkpointInitialSlot) {
return lastSlotToPrune;
}

Expand Down
Loading

0 comments on commit 2550254

Please sign in to comment.