Skip to content

Commit

Permalink
Implement a DatabaseArchiveWriter and add to pruneOldestBlobSidecars (#…
Browse files Browse the repository at this point in the history
…8640)

* Implement a DatabaseArchiveWriter and add to pruneOldestBlobSidecars. Currently not configurable and only implements a NoopWriter.
  • Loading branch information
david-ry4n authored Sep 25, 2024
1 parent fd599c4 commit a4e4fcc
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import tech.pegasys.teku.storage.api.WeakSubjectivityUpdate;
import tech.pegasys.teku.storage.client.RecentChainData;
import tech.pegasys.teku.storage.server.Database;
import tech.pegasys.teku.storage.server.DatabaseArchiveNoopWriter;
import tech.pegasys.teku.storage.server.DatabaseContext;
import tech.pegasys.teku.storage.server.ShuttingDownException;
import tech.pegasys.teku.storage.server.StateStorageMode;
Expand Down Expand Up @@ -293,7 +294,10 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti
List.of(blobSidecar5_0)));

// let's prune with limit to 1
assertThat(database.pruneOldestBlobSidecars(UInt64.MAX_VALUE, 1)).isTrue();
assertThat(
database.pruneOldestBlobSidecars(
UInt64.MAX_VALUE, 1, DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE))
.isTrue();
assertBlobSidecarKeys(
blobSidecar2_0.getSlot(),
blobSidecar5_0.getSlot(),
Expand All @@ -310,7 +314,10 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti
assertThat(database.getBlobSidecarColumnCount()).isEqualTo(4L);

// let's prune up to slot 1 (nothing will be pruned)
assertThat(database.pruneOldestBlobSidecars(ONE, 10)).isFalse();
assertThat(
database.pruneOldestBlobSidecars(
ONE, 10, DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE))
.isFalse();
assertBlobSidecarKeys(
blobSidecar2_0.getSlot(),
blobSidecar5_0.getSlot(),
Expand All @@ -327,15 +334,21 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti
assertThat(database.getBlobSidecarColumnCount()).isEqualTo(4L);

// let's prune all from slot 4 excluded
assertThat(database.pruneOldestBlobSidecars(UInt64.valueOf(3), 10)).isFalse();
assertThat(
database.pruneOldestBlobSidecars(
UInt64.valueOf(3), 10, DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE))
.isFalse();
assertBlobSidecarKeys(
blobSidecar1_0.getSlot(), blobSidecar5_0.getSlot(), blobSidecarToKey(blobSidecar5_0));
assertBlobSidecars(Map.of(blobSidecar5_0.getSlot(), List.of(blobSidecar5_0)));
assertThat(database.getEarliestBlobSidecarSlot()).contains(UInt64.valueOf(4));
assertThat(database.getBlobSidecarColumnCount()).isEqualTo(1L);

// let's prune all
assertThat(database.pruneOldestBlobSidecars(UInt64.valueOf(5), 1)).isTrue();
assertThat(
database.pruneOldestBlobSidecars(
UInt64.valueOf(5), 1, DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE))
.isTrue();
// all empty now
assertBlobSidecarKeys(ZERO, UInt64.valueOf(10));
assertThat(database.getEarliestBlobSidecarSlot()).contains(UInt64.valueOf(6));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,13 @@ void storeFinalizedBlocks(
*
* @param lastSlotToPrune inclusive, not reached if limit happens first
* @param pruneLimit soft BlobSidecars (not slots) limit
* @param archiveWriter write BlobSidecars to archive when pruning.
* @return true if number of pruned blobs reached the pruneLimit, false otherwise
*/
boolean pruneOldestBlobSidecars(UInt64 lastSlotToPrune, int pruneLimit);
boolean pruneOldestBlobSidecars(
UInt64 lastSlotToPrune,
int pruneLimit,
final DatabaseArchiveWriter<BlobSidecar> archiveWriter);

boolean pruneOldestNonCanonicalBlobSidecars(UInt64 lastSlotToPrune, int pruneLimit);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.storage.server;

import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;

public class DatabaseArchiveNoopWriter<T> implements DatabaseArchiveWriter<T> {

public static final DatabaseArchiveNoopWriter<BlobSidecar> NOOP_BLOBSIDECAR_STORE =
new DatabaseArchiveNoopWriter<>();

@Override
public boolean archive(final T data) {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.storage.server;

/**
* A functional interface to allow storing data that is to be pruned from the Database. If the store
* function is successful it returns true, signalling the data can be pruned. If the store function
* fails, the data was not stored and the data should not be pruned.
*
* @param <T> the data to be stored.
*/
@FunctionalInterface
public interface DatabaseArchiveWriter<T> {
boolean archive(final T data);
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
import tech.pegasys.teku.storage.api.WeakSubjectivityState;
import tech.pegasys.teku.storage.api.WeakSubjectivityUpdate;
import tech.pegasys.teku.storage.server.Database;
import tech.pegasys.teku.storage.server.DatabaseArchiveNoopWriter;
import tech.pegasys.teku.storage.server.DatabaseArchiveWriter;
import tech.pegasys.teku.storage.server.StateStorageMode;
import tech.pegasys.teku.storage.server.kvstore.dataaccess.CombinedKvStoreDao;
import tech.pegasys.teku.storage.server.kvstore.dataaccess.KvStoreCombinedDao;
Expand Down Expand Up @@ -877,11 +879,14 @@ public Optional<BlobSidecar> getNonCanonicalBlobSidecar(final SlotAndBlockRootAn
}

@Override
public boolean pruneOldestBlobSidecars(final UInt64 lastSlotToPrune, final int pruneLimit) {
public boolean pruneOldestBlobSidecars(
final UInt64 lastSlotToPrune,
final int pruneLimit,
final DatabaseArchiveWriter<BlobSidecar> archiveWriter) {
try (final Stream<SlotAndBlockRootAndBlobIndex> prunableBlobKeys =
streamBlobSidecarKeys(UInt64.ZERO, lastSlotToPrune);
final FinalizedUpdater updater = finalizedUpdater()) {
return pruneBlobSidecars(pruneLimit, prunableBlobKeys, updater, false);
return pruneBlobSidecars(pruneLimit, prunableBlobKeys, updater, archiveWriter, false);
}
}

Expand All @@ -891,14 +896,20 @@ public boolean pruneOldestNonCanonicalBlobSidecars(
try (final Stream<SlotAndBlockRootAndBlobIndex> prunableNoncanonicalBlobKeys =
streamNonCanonicalBlobSidecarKeys(UInt64.ZERO, lastSlotToPrune);
final FinalizedUpdater updater = finalizedUpdater()) {
return pruneBlobSidecars(pruneLimit, prunableNoncanonicalBlobKeys, updater, true);
return pruneBlobSidecars(
pruneLimit,
prunableNoncanonicalBlobKeys,
updater,
DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE,
true);
}
}

private boolean pruneBlobSidecars(
final int pruneLimit,
final Stream<SlotAndBlockRootAndBlobIndex> prunableBlobKeys,
final FinalizedUpdater updater,
final DatabaseArchiveWriter<BlobSidecar> archiveWriter,
final boolean nonCanonicalblobSidecars) {
int remaining = pruneLimit;
int pruned = 0;
Expand All @@ -912,6 +923,13 @@ private boolean pruneBlobSidecars(
if (finished && key.getBlobIndex().equals(ZERO)) {
break;
}
// Attempt to archive the BlobSidecar if present. If there's no BlobSidecar return true.
final boolean blobSidecarArchived =
getBlobSidecar(key).map(archiveWriter::archive).orElse(Boolean.TRUE);
if (!blobSidecarArchived) {
LOG.warn("Failed to archive BlobSidecar for slot:{}. Stopping pruning", key.getSlot());
break;
}
if (nonCanonicalblobSidecars) {
updater.removeNonCanonicalBlobSidecar(key);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import tech.pegasys.teku.storage.api.WeakSubjectivityState;
import tech.pegasys.teku.storage.api.WeakSubjectivityUpdate;
import tech.pegasys.teku.storage.server.Database;
import tech.pegasys.teku.storage.server.DatabaseArchiveWriter;

public class NoOpDatabase implements Database {

Expand Down Expand Up @@ -325,7 +326,10 @@ public Optional<UInt64> getEarliestBlobSidecarSlot() {
}

@Override
public boolean pruneOldestBlobSidecars(final UInt64 lastSlotToPrune, final int pruneLimit) {
public boolean pruneOldestBlobSidecars(
final UInt64 lastSlotToPrune,
final int pruneLimit,
final DatabaseArchiveWriter<BlobSidecar> archiveWriter) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.config.SpecConfig;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.storage.server.Database;
import tech.pegasys.teku.storage.server.DatabaseArchiveNoopWriter;
import tech.pegasys.teku.storage.server.DatabaseArchiveWriter;
import tech.pegasys.teku.storage.server.ShuttingDownException;

public class BlobSidecarPruner extends Service {
Expand All @@ -55,6 +58,7 @@ public class BlobSidecarPruner extends Service {
private final AtomicLong blobColumnSize = new AtomicLong(0);
private final AtomicLong earliestBlobSidecarSlot = new AtomicLong(-1);
private final boolean storeNonCanonicalBlobSidecars;
private final DatabaseArchiveWriter<BlobSidecar> archiveWriter;

public BlobSidecarPruner(
final Spec spec,
Expand All @@ -81,6 +85,9 @@ public BlobSidecarPruner(
this.pruningActiveLabelledGauge = pruningActiveLabelledGauge;
this.storeNonCanonicalBlobSidecars = storeNonCanonicalBlobSidecars;

// To be updated with other implementations. e.g. filesystem or s3
this.archiveWriter = DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE;

if (blobSidecarsStorageCountersEnabled) {
LabelledGauge labelledGauge =
metricsSystem.createLabelledGauge(
Expand Down Expand Up @@ -148,7 +155,7 @@ private void pruneBlobsPriorToAvailabilityWindow() {
try {
final long blobsPruningStart = System.currentTimeMillis();
final boolean blobsPruningLimitReached =
database.pruneOldestBlobSidecars(latestPrunableSlot, pruneLimit);
database.pruneOldestBlobSidecars(latestPrunableSlot, pruneLimit, archiveWriter);
logPruningResult(
"Blobs pruning finished in {} ms. Limit reached: {}",
blobsPruningStart,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import tech.pegasys.teku.spec.config.SpecConfig;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.storage.server.Database;
import tech.pegasys.teku.storage.server.DatabaseArchiveNoopWriter;

public class BlobSidecarPrunerTest {
public static final Duration PRUNE_INTERVAL = Duration.ofSeconds(5);
Expand Down Expand Up @@ -83,7 +84,7 @@ void shouldNotPruneWhenGenesisNotAvailable() {
asyncRunner.executeDueActions();

verify(database).getGenesisTime();
verify(database, never()).pruneOldestBlobSidecars(any(), anyInt());
verify(database, never()).pruneOldestBlobSidecars(any(), anyInt(), any());
verify(database, never()).pruneOldestNonCanonicalBlobSidecars(any(), anyInt());
}

Expand All @@ -92,7 +93,7 @@ void shouldNotPrunePriorGenesis() {
asyncRunner.executeDueActions();

verify(database).getGenesisTime();
verify(database, never()).pruneOldestBlobSidecars(any(), anyInt());
verify(database, never()).pruneOldestBlobSidecars(any(), anyInt(), any());
verify(database, never()).pruneOldestNonCanonicalBlobSidecars(any(), anyInt());
}

Expand All @@ -108,7 +109,7 @@ void shouldNotPruneWhenLatestPrunableIncludeGenesis() {
timeProvider.advanceTimeBy(Duration.ofSeconds(currentTime.longValue()));

asyncRunner.executeDueActions();
verify(database, never()).pruneOldestBlobSidecars(any(), anyInt());
verify(database, never()).pruneOldestBlobSidecars(any(), anyInt(), any());
verify(database, never()).pruneOldestNonCanonicalBlobSidecars(any(), anyInt());
}

Expand All @@ -126,7 +127,11 @@ void shouldPruneWhenLatestPrunableSlotIsGreaterThanOldestDAEpoch() {
timeProvider.advanceTimeBy(Duration.ofSeconds(currentTime.longValue()));

asyncRunner.executeDueActions();
verify(database).pruneOldestBlobSidecars(UInt64.valueOf((slotsPerEpoch / 2) - 1), PRUNE_LIMIT);
verify(database)
.pruneOldestBlobSidecars(
UInt64.valueOf((slotsPerEpoch / 2) - 1),
PRUNE_LIMIT,
DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE);
verify(database)
.pruneOldestNonCanonicalBlobSidecars(UInt64.valueOf((slotsPerEpoch / 2) - 1), PRUNE_LIMIT);
}
Expand Down Expand Up @@ -175,7 +180,7 @@ void shouldUseEpochsStoreBlobs() {
timeProvider.advanceTimeBy(Duration.ofSeconds(timeForSlotOne.longValue()));

asyncRunner.executeDueActions();
verify(databaseOverride, never()).pruneOldestBlobSidecars(any(), anyInt());
verify(databaseOverride, never()).pruneOldestBlobSidecars(any(), anyInt(), any());

// move more to open pruning zone near genesis
final UInt64 slotDelta =
Expand All @@ -186,7 +191,10 @@ void shouldUseEpochsStoreBlobs() {

asyncRunner.executeDueActions();
verify(databaseOverride)
.pruneOldestBlobSidecars(UInt64.valueOf((slotsPerEpoch / 2) - 1), PRUNE_LIMIT);
.pruneOldestBlobSidecars(
UInt64.valueOf((slotsPerEpoch / 2) - 1),
PRUNE_LIMIT,
DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE);
verify(databaseOverride)
.pruneOldestNonCanonicalBlobSidecars(UInt64.valueOf((slotsPerEpoch / 2) - 1), PRUNE_LIMIT);
}
Expand Down Expand Up @@ -230,7 +238,7 @@ void shouldNotPruneWhenEpochsStoreBlobsIsMax() {
timeProvider.advanceTimeBy(Duration.ofSeconds(currentTime.longValue()));

asyncRunner.executeDueActions();
verify(databaseOverride, never()).pruneOldestBlobSidecars(any(), anyInt());
verify(databaseOverride, never()).pruneOldestBlobSidecars(any(), anyInt(), any());
verify(databaseOverride, never()).pruneOldestNonCanonicalBlobSidecars(any(), anyInt());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import tech.pegasys.teku.spec.generator.ChainBuilder;
import tech.pegasys.teku.storage.api.StubStorageUpdateChannel;
import tech.pegasys.teku.storage.api.StubStorageUpdateChannelWithDelays;
import tech.pegasys.teku.storage.server.DatabaseArchiveNoopWriter;
import tech.pegasys.teku.storage.storageSystem.InMemoryStorageSystemBuilder;
import tech.pegasys.teku.storage.storageSystem.StorageSystem;
import tech.pegasys.teku.storage.store.UpdatableStore.StoreTransaction;
Expand Down Expand Up @@ -392,7 +393,10 @@ public void retrieveEarliestBlobSidecarSlot_shouldReturnUpdatedValue() {
maybeEarliestBlobSidecarSlot.isPresent()
&& maybeEarliestBlobSidecarSlot.get().equals(UInt64.ZERO));

storageSystem.database().pruneOldestBlobSidecars(UInt64.valueOf(5), 5);
storageSystem
.database()
.pruneOldestBlobSidecars(
UInt64.valueOf(5), 5, DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE);

assertThat(store.retrieveEarliestBlobSidecarSlot())
.isCompletedWithValueMatching(
Expand Down

0 comments on commit a4e4fcc

Please sign in to comment.