Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a DatabaseArchiveWriter and add to pruneOldestBlobSidecars #8640

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import tech.pegasys.teku.storage.api.WeakSubjectivityUpdate;
import tech.pegasys.teku.storage.client.RecentChainData;
import tech.pegasys.teku.storage.server.Database;
import tech.pegasys.teku.storage.server.DatabaseArchiveNoopWriter;
import tech.pegasys.teku.storage.server.DatabaseContext;
import tech.pegasys.teku.storage.server.ShuttingDownException;
import tech.pegasys.teku.storage.server.StateStorageMode;
Expand Down Expand Up @@ -293,7 +294,10 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti
List.of(blobSidecar5_0)));

// let's prune with limit to 1
assertThat(database.pruneOldestBlobSidecars(UInt64.MAX_VALUE, 1)).isTrue();
assertThat(
database.pruneOldestBlobSidecars(
UInt64.MAX_VALUE, 1, DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE))
.isTrue();
assertBlobSidecarKeys(
blobSidecar2_0.getSlot(),
blobSidecar5_0.getSlot(),
Expand All @@ -310,7 +314,10 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti
assertThat(database.getBlobSidecarColumnCount()).isEqualTo(4L);

// let's prune up to slot 1 (nothing will be pruned)
assertThat(database.pruneOldestBlobSidecars(ONE, 10)).isFalse();
assertThat(
database.pruneOldestBlobSidecars(
ONE, 10, DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE))
.isFalse();
assertBlobSidecarKeys(
blobSidecar2_0.getSlot(),
blobSidecar5_0.getSlot(),
Expand All @@ -327,15 +334,21 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti
assertThat(database.getBlobSidecarColumnCount()).isEqualTo(4L);

// let's prune all from slot 4 excluded
assertThat(database.pruneOldestBlobSidecars(UInt64.valueOf(3), 10)).isFalse();
assertThat(
database.pruneOldestBlobSidecars(
UInt64.valueOf(3), 10, DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE))
.isFalse();
assertBlobSidecarKeys(
blobSidecar1_0.getSlot(), blobSidecar5_0.getSlot(), blobSidecarToKey(blobSidecar5_0));
assertBlobSidecars(Map.of(blobSidecar5_0.getSlot(), List.of(blobSidecar5_0)));
assertThat(database.getEarliestBlobSidecarSlot()).contains(UInt64.valueOf(4));
assertThat(database.getBlobSidecarColumnCount()).isEqualTo(1L);

// let's prune all
assertThat(database.pruneOldestBlobSidecars(UInt64.valueOf(5), 1)).isTrue();
assertThat(
database.pruneOldestBlobSidecars(
UInt64.valueOf(5), 1, DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE))
.isTrue();
// all empty now
assertBlobSidecarKeys(ZERO, UInt64.valueOf(10));
assertThat(database.getEarliestBlobSidecarSlot()).contains(UInt64.valueOf(6));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,13 @@ void storeFinalizedBlocks(
*
* @param lastSlotToPrune inclusive, not reached if limit happens first
* @param pruneLimit soft BlobSidecars (not slots) limit
* @param archiveWriter write BlobSidecars to archive when pruning.
* @return true if number of pruned blobs reached the pruneLimit, false otherwise
*/
boolean pruneOldestBlobSidecars(UInt64 lastSlotToPrune, int pruneLimit);
boolean pruneOldestBlobSidecars(
UInt64 lastSlotToPrune,
int pruneLimit,
final DatabaseArchiveWriter<BlobSidecar> archiveWriter);

boolean pruneOldestNonCanonicalBlobSidecars(UInt64 lastSlotToPrune, int pruneLimit);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.storage.server;

import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;

public class DatabaseArchiveNoopWriter<T> implements DatabaseArchiveWriter<T> {

public static final DatabaseArchiveNoopWriter<BlobSidecar> NOOP_BLOBSIDECAR_STORE =
new DatabaseArchiveNoopWriter<>();

@Override
public boolean archive(final T data) {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.storage.server;

/**
* A functional interface to allow storing data that is to be pruned from the Database. If the store
* function is successful it returns true, signalling the data can be pruned. If the store function
* fails, the data was not stored and the data should not be pruned.
*
* @param <T> the data to be stored.
*/
@FunctionalInterface
public interface DatabaseArchiveWriter<T> {
boolean archive(final T data);
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
import tech.pegasys.teku.storage.api.WeakSubjectivityState;
import tech.pegasys.teku.storage.api.WeakSubjectivityUpdate;
import tech.pegasys.teku.storage.server.Database;
import tech.pegasys.teku.storage.server.DatabaseArchiveNoopWriter;
import tech.pegasys.teku.storage.server.DatabaseArchiveWriter;
import tech.pegasys.teku.storage.server.StateStorageMode;
import tech.pegasys.teku.storage.server.kvstore.dataaccess.CombinedKvStoreDao;
import tech.pegasys.teku.storage.server.kvstore.dataaccess.KvStoreCombinedDao;
Expand Down Expand Up @@ -877,11 +879,14 @@ public Optional<BlobSidecar> getNonCanonicalBlobSidecar(final SlotAndBlockRootAn
}

@Override
public boolean pruneOldestBlobSidecars(final UInt64 lastSlotToPrune, final int pruneLimit) {
public boolean pruneOldestBlobSidecars(
final UInt64 lastSlotToPrune,
final int pruneLimit,
final DatabaseArchiveWriter<BlobSidecar> archiveWriter) {
try (final Stream<SlotAndBlockRootAndBlobIndex> prunableBlobKeys =
streamBlobSidecarKeys(UInt64.ZERO, lastSlotToPrune);
final FinalizedUpdater updater = finalizedUpdater()) {
return pruneBlobSidecars(pruneLimit, prunableBlobKeys, updater, false);
return pruneBlobSidecars(pruneLimit, prunableBlobKeys, updater, archiveWriter, false);
}
}

Expand All @@ -891,14 +896,20 @@ public boolean pruneOldestNonCanonicalBlobSidecars(
try (final Stream<SlotAndBlockRootAndBlobIndex> prunableNoncanonicalBlobKeys =
streamNonCanonicalBlobSidecarKeys(UInt64.ZERO, lastSlotToPrune);
final FinalizedUpdater updater = finalizedUpdater()) {
return pruneBlobSidecars(pruneLimit, prunableNoncanonicalBlobKeys, updater, true);
return pruneBlobSidecars(
pruneLimit,
prunableNoncanonicalBlobKeys,
updater,
DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE,
true);
}
}

private boolean pruneBlobSidecars(
final int pruneLimit,
final Stream<SlotAndBlockRootAndBlobIndex> prunableBlobKeys,
final FinalizedUpdater updater,
final DatabaseArchiveWriter<BlobSidecar> archiveWriter,
final boolean nonCanonicalblobSidecars) {
int remaining = pruneLimit;
int pruned = 0;
Expand All @@ -912,6 +923,13 @@ private boolean pruneBlobSidecars(
if (finished && key.getBlobIndex().equals(ZERO)) {
break;
}
// Attempt to archive the BlobSidecar if present. If there's no BlobSidecar return true.
final boolean blobSidecarArchived =
getBlobSidecar(key).map(archiveWriter::archive).orElse(Boolean.TRUE);
if (!blobSidecarArchived) {
LOG.warn("Failed to archive BlobSidecar for slot:{}. Stopping pruning", key.getSlot());
break;
}
if (nonCanonicalblobSidecars) {
updater.removeNonCanonicalBlobSidecar(key);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import tech.pegasys.teku.storage.api.WeakSubjectivityState;
import tech.pegasys.teku.storage.api.WeakSubjectivityUpdate;
import tech.pegasys.teku.storage.server.Database;
import tech.pegasys.teku.storage.server.DatabaseArchiveWriter;

public class NoOpDatabase implements Database {

Expand Down Expand Up @@ -325,7 +326,10 @@ public Optional<UInt64> getEarliestBlobSidecarSlot() {
}

@Override
public boolean pruneOldestBlobSidecars(final UInt64 lastSlotToPrune, final int pruneLimit) {
public boolean pruneOldestBlobSidecars(
final UInt64 lastSlotToPrune,
final int pruneLimit,
final DatabaseArchiveWriter<BlobSidecar> archiveWriter) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.config.SpecConfig;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.storage.server.Database;
import tech.pegasys.teku.storage.server.DatabaseArchiveNoopWriter;
import tech.pegasys.teku.storage.server.DatabaseArchiveWriter;
import tech.pegasys.teku.storage.server.ShuttingDownException;

public class BlobSidecarPruner extends Service {
Expand All @@ -55,6 +58,7 @@ public class BlobSidecarPruner extends Service {
private final AtomicLong blobColumnSize = new AtomicLong(0);
private final AtomicLong earliestBlobSidecarSlot = new AtomicLong(-1);
private final boolean storeNonCanonicalBlobSidecars;
private final DatabaseArchiveWriter<BlobSidecar> archiveWriter;

public BlobSidecarPruner(
final Spec spec,
Expand All @@ -81,6 +85,9 @@ public BlobSidecarPruner(
this.pruningActiveLabelledGauge = pruningActiveLabelledGauge;
this.storeNonCanonicalBlobSidecars = storeNonCanonicalBlobSidecars;

// To be updated with other implementations. e.g. filesystem or s3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use // TODO: prefix so it will be captured nicely by intellij, or simply remove since we have a (set of) issues on the topic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix that up in the next PR. Thanks for feedback.

this.archiveWriter = DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE;

if (blobSidecarsStorageCountersEnabled) {
LabelledGauge labelledGauge =
metricsSystem.createLabelledGauge(
Expand Down Expand Up @@ -148,7 +155,7 @@ private void pruneBlobsPriorToAvailabilityWindow() {
try {
final long blobsPruningStart = System.currentTimeMillis();
final boolean blobsPruningLimitReached =
database.pruneOldestBlobSidecars(latestPrunableSlot, pruneLimit);
database.pruneOldestBlobSidecars(latestPrunableSlot, pruneLimit, archiveWriter);
logPruningResult(
"Blobs pruning finished in {} ms. Limit reached: {}",
blobsPruningStart,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import tech.pegasys.teku.spec.config.SpecConfig;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.storage.server.Database;
import tech.pegasys.teku.storage.server.DatabaseArchiveNoopWriter;

public class BlobSidecarPrunerTest {
public static final Duration PRUNE_INTERVAL = Duration.ofSeconds(5);
Expand Down Expand Up @@ -83,7 +84,7 @@ void shouldNotPruneWhenGenesisNotAvailable() {
asyncRunner.executeDueActions();

verify(database).getGenesisTime();
verify(database, never()).pruneOldestBlobSidecars(any(), anyInt());
verify(database, never()).pruneOldestBlobSidecars(any(), anyInt(), any());
verify(database, never()).pruneOldestNonCanonicalBlobSidecars(any(), anyInt());
}

Expand All @@ -92,7 +93,7 @@ void shouldNotPrunePriorGenesis() {
asyncRunner.executeDueActions();

verify(database).getGenesisTime();
verify(database, never()).pruneOldestBlobSidecars(any(), anyInt());
verify(database, never()).pruneOldestBlobSidecars(any(), anyInt(), any());
verify(database, never()).pruneOldestNonCanonicalBlobSidecars(any(), anyInt());
}

Expand All @@ -108,7 +109,7 @@ void shouldNotPruneWhenLatestPrunableIncludeGenesis() {
timeProvider.advanceTimeBy(Duration.ofSeconds(currentTime.longValue()));

asyncRunner.executeDueActions();
verify(database, never()).pruneOldestBlobSidecars(any(), anyInt());
verify(database, never()).pruneOldestBlobSidecars(any(), anyInt(), any());
verify(database, never()).pruneOldestNonCanonicalBlobSidecars(any(), anyInt());
}

Expand All @@ -126,7 +127,11 @@ void shouldPruneWhenLatestPrunableSlotIsGreaterThanOldestDAEpoch() {
timeProvider.advanceTimeBy(Duration.ofSeconds(currentTime.longValue()));

asyncRunner.executeDueActions();
verify(database).pruneOldestBlobSidecars(UInt64.valueOf((slotsPerEpoch / 2) - 1), PRUNE_LIMIT);
verify(database)
.pruneOldestBlobSidecars(
UInt64.valueOf((slotsPerEpoch / 2) - 1),
PRUNE_LIMIT,
DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE);
verify(database)
.pruneOldestNonCanonicalBlobSidecars(UInt64.valueOf((slotsPerEpoch / 2) - 1), PRUNE_LIMIT);
}
Expand Down Expand Up @@ -175,7 +180,7 @@ void shouldUseEpochsStoreBlobs() {
timeProvider.advanceTimeBy(Duration.ofSeconds(timeForSlotOne.longValue()));

asyncRunner.executeDueActions();
verify(databaseOverride, never()).pruneOldestBlobSidecars(any(), anyInt());
verify(databaseOverride, never()).pruneOldestBlobSidecars(any(), anyInt(), any());

// move more to open pruning zone near genesis
final UInt64 slotDelta =
Expand All @@ -186,7 +191,10 @@ void shouldUseEpochsStoreBlobs() {

asyncRunner.executeDueActions();
verify(databaseOverride)
.pruneOldestBlobSidecars(UInt64.valueOf((slotsPerEpoch / 2) - 1), PRUNE_LIMIT);
.pruneOldestBlobSidecars(
UInt64.valueOf((slotsPerEpoch / 2) - 1),
PRUNE_LIMIT,
DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE);
verify(databaseOverride)
.pruneOldestNonCanonicalBlobSidecars(UInt64.valueOf((slotsPerEpoch / 2) - 1), PRUNE_LIMIT);
}
Expand Down Expand Up @@ -230,7 +238,7 @@ void shouldNotPruneWhenEpochsStoreBlobsIsMax() {
timeProvider.advanceTimeBy(Duration.ofSeconds(currentTime.longValue()));

asyncRunner.executeDueActions();
verify(databaseOverride, never()).pruneOldestBlobSidecars(any(), anyInt());
verify(databaseOverride, never()).pruneOldestBlobSidecars(any(), anyInt(), any());
verify(databaseOverride, never()).pruneOldestNonCanonicalBlobSidecars(any(), anyInt());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import tech.pegasys.teku.spec.generator.ChainBuilder;
import tech.pegasys.teku.storage.api.StubStorageUpdateChannel;
import tech.pegasys.teku.storage.api.StubStorageUpdateChannelWithDelays;
import tech.pegasys.teku.storage.server.DatabaseArchiveNoopWriter;
import tech.pegasys.teku.storage.storageSystem.InMemoryStorageSystemBuilder;
import tech.pegasys.teku.storage.storageSystem.StorageSystem;
import tech.pegasys.teku.storage.store.UpdatableStore.StoreTransaction;
Expand Down Expand Up @@ -392,7 +393,10 @@ public void retrieveEarliestBlobSidecarSlot_shouldReturnUpdatedValue() {
maybeEarliestBlobSidecarSlot.isPresent()
&& maybeEarliestBlobSidecarSlot.get().equals(UInt64.ZERO));

storageSystem.database().pruneOldestBlobSidecars(UInt64.valueOf(5), 5);
storageSystem
.database()
.pruneOldestBlobSidecars(
UInt64.valueOf(5), 5, DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE);

assertThat(store.retrieveEarliestBlobSidecarSlot())
.isCompletedWithValueMatching(
Expand Down