From a4e4fcc1d30be3efb6e54a7505492d5ea1a83634 Mon Sep 17 00:00:00 2001 From: David Ryan Date: Wed, 25 Sep 2024 10:35:40 +1000 Subject: [PATCH] Implement a DatabaseArchiveWriter and add to pruneOldestBlobSidecars (#8640) * Implement a DatabaseArchiveWriter and add to pruneOldestBlobSidecars. Currently not configurable and only implements a NoopWriter. --- .../storage/server/kvstore/DatabaseTest.java | 21 ++++++++++++--- .../pegasys/teku/storage/server/Database.java | 6 ++++- .../server/DatabaseArchiveNoopWriter.java | 27 +++++++++++++++++++ .../storage/server/DatabaseArchiveWriter.java | 26 ++++++++++++++++++ .../server/kvstore/KvStoreDatabase.java | 24 ++++++++++++++--- .../storage/server/noop/NoOpDatabase.java | 6 ++++- .../server/pruner/BlobSidecarPruner.java | 9 ++++++- .../server/pruner/BlobSidecarPrunerTest.java | 22 ++++++++++----- .../pegasys/teku/storage/store/StoreTest.java | 6 ++++- 9 files changed, 129 insertions(+), 18 deletions(-) create mode 100644 storage/src/main/java/tech/pegasys/teku/storage/server/DatabaseArchiveNoopWriter.java create mode 100644 storage/src/main/java/tech/pegasys/teku/storage/server/DatabaseArchiveWriter.java diff --git a/storage/src/integration-test/java/tech/pegasys/teku/storage/server/kvstore/DatabaseTest.java b/storage/src/integration-test/java/tech/pegasys/teku/storage/server/kvstore/DatabaseTest.java index 3db2ad542a3..86a39a90f21 100644 --- a/storage/src/integration-test/java/tech/pegasys/teku/storage/server/kvstore/DatabaseTest.java +++ b/storage/src/integration-test/java/tech/pegasys/teku/storage/server/kvstore/DatabaseTest.java @@ -84,6 +84,7 @@ import tech.pegasys.teku.storage.api.WeakSubjectivityUpdate; import tech.pegasys.teku.storage.client.RecentChainData; import tech.pegasys.teku.storage.server.Database; +import tech.pegasys.teku.storage.server.DatabaseArchiveNoopWriter; import tech.pegasys.teku.storage.server.DatabaseContext; import tech.pegasys.teku.storage.server.ShuttingDownException; import tech.pegasys.teku.storage.server.StateStorageMode; @@ -293,7 +294,10 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti List.of(blobSidecar5_0))); // let's prune with limit to 1 - assertThat(database.pruneOldestBlobSidecars(UInt64.MAX_VALUE, 1)).isTrue(); + assertThat( + database.pruneOldestBlobSidecars( + UInt64.MAX_VALUE, 1, DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE)) + .isTrue(); assertBlobSidecarKeys( blobSidecar2_0.getSlot(), blobSidecar5_0.getSlot(), @@ -310,7 +314,10 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti assertThat(database.getBlobSidecarColumnCount()).isEqualTo(4L); // let's prune up to slot 1 (nothing will be pruned) - assertThat(database.pruneOldestBlobSidecars(ONE, 10)).isFalse(); + assertThat( + database.pruneOldestBlobSidecars( + ONE, 10, DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE)) + .isFalse(); assertBlobSidecarKeys( blobSidecar2_0.getSlot(), blobSidecar5_0.getSlot(), @@ -327,7 +334,10 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti assertThat(database.getBlobSidecarColumnCount()).isEqualTo(4L); // let's prune all from slot 4 excluded - assertThat(database.pruneOldestBlobSidecars(UInt64.valueOf(3), 10)).isFalse(); + assertThat( + database.pruneOldestBlobSidecars( + UInt64.valueOf(3), 10, DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE)) + .isFalse(); assertBlobSidecarKeys( blobSidecar1_0.getSlot(), blobSidecar5_0.getSlot(), blobSidecarToKey(blobSidecar5_0)); assertBlobSidecars(Map.of(blobSidecar5_0.getSlot(), List.of(blobSidecar5_0))); @@ -335,7 +345,10 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti assertThat(database.getBlobSidecarColumnCount()).isEqualTo(1L); // let's prune all - assertThat(database.pruneOldestBlobSidecars(UInt64.valueOf(5), 1)).isTrue(); + assertThat( + database.pruneOldestBlobSidecars( + UInt64.valueOf(5), 1, DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE)) + .isTrue(); // all empty now assertBlobSidecarKeys(ZERO, UInt64.valueOf(10)); assertThat(database.getEarliestBlobSidecarSlot()).contains(UInt64.valueOf(6)); diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/Database.java b/storage/src/main/java/tech/pegasys/teku/storage/server/Database.java index a0f0cf71d61..06e9e6a8ea9 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/Database.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/Database.java @@ -73,9 +73,13 @@ void storeFinalizedBlocks( * * @param lastSlotToPrune inclusive, not reached if limit happens first * @param pruneLimit soft BlobSidecars (not slots) limit + * @param archiveWriter write BlobSidecars to archive when pruning. * @return true if number of pruned blobs reached the pruneLimit, false otherwise */ - boolean pruneOldestBlobSidecars(UInt64 lastSlotToPrune, int pruneLimit); + boolean pruneOldestBlobSidecars( + UInt64 lastSlotToPrune, + int pruneLimit, + final DatabaseArchiveWriter archiveWriter); boolean pruneOldestNonCanonicalBlobSidecars(UInt64 lastSlotToPrune, int pruneLimit); diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/DatabaseArchiveNoopWriter.java b/storage/src/main/java/tech/pegasys/teku/storage/server/DatabaseArchiveNoopWriter.java new file mode 100644 index 00000000000..6a59598bc99 --- /dev/null +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/DatabaseArchiveNoopWriter.java @@ -0,0 +1,27 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.server; + +import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; + +public class DatabaseArchiveNoopWriter implements DatabaseArchiveWriter { + + public static final DatabaseArchiveNoopWriter NOOP_BLOBSIDECAR_STORE = + new DatabaseArchiveNoopWriter<>(); + + @Override + public boolean archive(final T data) { + return true; + } +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/DatabaseArchiveWriter.java b/storage/src/main/java/tech/pegasys/teku/storage/server/DatabaseArchiveWriter.java new file mode 100644 index 00000000000..479a5e4d866 --- /dev/null +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/DatabaseArchiveWriter.java @@ -0,0 +1,26 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.server; + +/** + * A functional interface to allow storing data that is to be pruned from the Database. If the store + * function is successful it returns true, signalling the data can be pruned. If the store function + * fails, the data was not stored and the data should not be pruned. + * + * @param the data to be stored. + */ +@FunctionalInterface +public interface DatabaseArchiveWriter { + boolean archive(final T data); +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreDatabase.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreDatabase.java index cc6f30bc6ec..e590f08b405 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreDatabase.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreDatabase.java @@ -72,6 +72,8 @@ import tech.pegasys.teku.storage.api.WeakSubjectivityState; import tech.pegasys.teku.storage.api.WeakSubjectivityUpdate; import tech.pegasys.teku.storage.server.Database; +import tech.pegasys.teku.storage.server.DatabaseArchiveNoopWriter; +import tech.pegasys.teku.storage.server.DatabaseArchiveWriter; import tech.pegasys.teku.storage.server.StateStorageMode; import tech.pegasys.teku.storage.server.kvstore.dataaccess.CombinedKvStoreDao; import tech.pegasys.teku.storage.server.kvstore.dataaccess.KvStoreCombinedDao; @@ -877,11 +879,14 @@ public Optional getNonCanonicalBlobSidecar(final SlotAndBlockRootAn } @Override - public boolean pruneOldestBlobSidecars(final UInt64 lastSlotToPrune, final int pruneLimit) { + public boolean pruneOldestBlobSidecars( + final UInt64 lastSlotToPrune, + final int pruneLimit, + final DatabaseArchiveWriter archiveWriter) { try (final Stream prunableBlobKeys = streamBlobSidecarKeys(UInt64.ZERO, lastSlotToPrune); final FinalizedUpdater updater = finalizedUpdater()) { - return pruneBlobSidecars(pruneLimit, prunableBlobKeys, updater, false); + return pruneBlobSidecars(pruneLimit, prunableBlobKeys, updater, archiveWriter, false); } } @@ -891,7 +896,12 @@ public boolean pruneOldestNonCanonicalBlobSidecars( try (final Stream prunableNoncanonicalBlobKeys = streamNonCanonicalBlobSidecarKeys(UInt64.ZERO, lastSlotToPrune); final FinalizedUpdater updater = finalizedUpdater()) { - return pruneBlobSidecars(pruneLimit, prunableNoncanonicalBlobKeys, updater, true); + return pruneBlobSidecars( + pruneLimit, + prunableNoncanonicalBlobKeys, + updater, + DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE, + true); } } @@ -899,6 +909,7 @@ private boolean pruneBlobSidecars( final int pruneLimit, final Stream prunableBlobKeys, final FinalizedUpdater updater, + final DatabaseArchiveWriter archiveWriter, final boolean nonCanonicalblobSidecars) { int remaining = pruneLimit; int pruned = 0; @@ -912,6 +923,13 @@ private boolean pruneBlobSidecars( if (finished && key.getBlobIndex().equals(ZERO)) { break; } + // Attempt to archive the BlobSidecar if present. If there's no BlobSidecar return true. + final boolean blobSidecarArchived = + getBlobSidecar(key).map(archiveWriter::archive).orElse(Boolean.TRUE); + if (!blobSidecarArchived) { + LOG.warn("Failed to archive BlobSidecar for slot:{}. Stopping pruning", key.getSlot()); + break; + } if (nonCanonicalblobSidecars) { updater.removeNonCanonicalBlobSidecar(key); } else { diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/noop/NoOpDatabase.java b/storage/src/main/java/tech/pegasys/teku/storage/server/noop/NoOpDatabase.java index 2cc8c3ed8d5..b08a1156c60 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/noop/NoOpDatabase.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/noop/NoOpDatabase.java @@ -44,6 +44,7 @@ import tech.pegasys.teku.storage.api.WeakSubjectivityState; import tech.pegasys.teku.storage.api.WeakSubjectivityUpdate; import tech.pegasys.teku.storage.server.Database; +import tech.pegasys.teku.storage.server.DatabaseArchiveWriter; public class NoOpDatabase implements Database { @@ -325,7 +326,10 @@ public Optional getEarliestBlobSidecarSlot() { } @Override - public boolean pruneOldestBlobSidecars(final UInt64 lastSlotToPrune, final int pruneLimit) { + public boolean pruneOldestBlobSidecars( + final UInt64 lastSlotToPrune, + final int pruneLimit, + final DatabaseArchiveWriter archiveWriter) { return false; } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPruner.java b/storage/src/main/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPruner.java index 6c0bf8ee126..f562a2a57e3 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPruner.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPruner.java @@ -32,7 +32,10 @@ import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.config.SpecConfig; import tech.pegasys.teku.spec.config.SpecConfigDeneb; +import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; import tech.pegasys.teku.storage.server.Database; +import tech.pegasys.teku.storage.server.DatabaseArchiveNoopWriter; +import tech.pegasys.teku.storage.server.DatabaseArchiveWriter; import tech.pegasys.teku.storage.server.ShuttingDownException; public class BlobSidecarPruner extends Service { @@ -55,6 +58,7 @@ public class BlobSidecarPruner extends Service { private final AtomicLong blobColumnSize = new AtomicLong(0); private final AtomicLong earliestBlobSidecarSlot = new AtomicLong(-1); private final boolean storeNonCanonicalBlobSidecars; + private final DatabaseArchiveWriter archiveWriter; public BlobSidecarPruner( final Spec spec, @@ -81,6 +85,9 @@ public BlobSidecarPruner( this.pruningActiveLabelledGauge = pruningActiveLabelledGauge; this.storeNonCanonicalBlobSidecars = storeNonCanonicalBlobSidecars; + // To be updated with other implementations. e.g. filesystem or s3 + this.archiveWriter = DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE; + if (blobSidecarsStorageCountersEnabled) { LabelledGauge labelledGauge = metricsSystem.createLabelledGauge( @@ -148,7 +155,7 @@ private void pruneBlobsPriorToAvailabilityWindow() { try { final long blobsPruningStart = System.currentTimeMillis(); final boolean blobsPruningLimitReached = - database.pruneOldestBlobSidecars(latestPrunableSlot, pruneLimit); + database.pruneOldestBlobSidecars(latestPrunableSlot, pruneLimit, archiveWriter); logPruningResult( "Blobs pruning finished in {} ms. Limit reached: {}", blobsPruningStart, diff --git a/storage/src/test/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPrunerTest.java b/storage/src/test/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPrunerTest.java index 17e7182d385..f9829190599 100644 --- a/storage/src/test/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPrunerTest.java +++ b/storage/src/test/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPrunerTest.java @@ -38,6 +38,7 @@ import tech.pegasys.teku.spec.config.SpecConfig; import tech.pegasys.teku.spec.config.SpecConfigDeneb; import tech.pegasys.teku.storage.server.Database; +import tech.pegasys.teku.storage.server.DatabaseArchiveNoopWriter; public class BlobSidecarPrunerTest { public static final Duration PRUNE_INTERVAL = Duration.ofSeconds(5); @@ -83,7 +84,7 @@ void shouldNotPruneWhenGenesisNotAvailable() { asyncRunner.executeDueActions(); verify(database).getGenesisTime(); - verify(database, never()).pruneOldestBlobSidecars(any(), anyInt()); + verify(database, never()).pruneOldestBlobSidecars(any(), anyInt(), any()); verify(database, never()).pruneOldestNonCanonicalBlobSidecars(any(), anyInt()); } @@ -92,7 +93,7 @@ void shouldNotPrunePriorGenesis() { asyncRunner.executeDueActions(); verify(database).getGenesisTime(); - verify(database, never()).pruneOldestBlobSidecars(any(), anyInt()); + verify(database, never()).pruneOldestBlobSidecars(any(), anyInt(), any()); verify(database, never()).pruneOldestNonCanonicalBlobSidecars(any(), anyInt()); } @@ -108,7 +109,7 @@ void shouldNotPruneWhenLatestPrunableIncludeGenesis() { timeProvider.advanceTimeBy(Duration.ofSeconds(currentTime.longValue())); asyncRunner.executeDueActions(); - verify(database, never()).pruneOldestBlobSidecars(any(), anyInt()); + verify(database, never()).pruneOldestBlobSidecars(any(), anyInt(), any()); verify(database, never()).pruneOldestNonCanonicalBlobSidecars(any(), anyInt()); } @@ -126,7 +127,11 @@ void shouldPruneWhenLatestPrunableSlotIsGreaterThanOldestDAEpoch() { timeProvider.advanceTimeBy(Duration.ofSeconds(currentTime.longValue())); asyncRunner.executeDueActions(); - verify(database).pruneOldestBlobSidecars(UInt64.valueOf((slotsPerEpoch / 2) - 1), PRUNE_LIMIT); + verify(database) + .pruneOldestBlobSidecars( + UInt64.valueOf((slotsPerEpoch / 2) - 1), + PRUNE_LIMIT, + DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE); verify(database) .pruneOldestNonCanonicalBlobSidecars(UInt64.valueOf((slotsPerEpoch / 2) - 1), PRUNE_LIMIT); } @@ -175,7 +180,7 @@ void shouldUseEpochsStoreBlobs() { timeProvider.advanceTimeBy(Duration.ofSeconds(timeForSlotOne.longValue())); asyncRunner.executeDueActions(); - verify(databaseOverride, never()).pruneOldestBlobSidecars(any(), anyInt()); + verify(databaseOverride, never()).pruneOldestBlobSidecars(any(), anyInt(), any()); // move more to open pruning zone near genesis final UInt64 slotDelta = @@ -186,7 +191,10 @@ void shouldUseEpochsStoreBlobs() { asyncRunner.executeDueActions(); verify(databaseOverride) - .pruneOldestBlobSidecars(UInt64.valueOf((slotsPerEpoch / 2) - 1), PRUNE_LIMIT); + .pruneOldestBlobSidecars( + UInt64.valueOf((slotsPerEpoch / 2) - 1), + PRUNE_LIMIT, + DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE); verify(databaseOverride) .pruneOldestNonCanonicalBlobSidecars(UInt64.valueOf((slotsPerEpoch / 2) - 1), PRUNE_LIMIT); } @@ -230,7 +238,7 @@ void shouldNotPruneWhenEpochsStoreBlobsIsMax() { timeProvider.advanceTimeBy(Duration.ofSeconds(currentTime.longValue())); asyncRunner.executeDueActions(); - verify(databaseOverride, never()).pruneOldestBlobSidecars(any(), anyInt()); + verify(databaseOverride, never()).pruneOldestBlobSidecars(any(), anyInt(), any()); verify(databaseOverride, never()).pruneOldestNonCanonicalBlobSidecars(any(), anyInt()); } } diff --git a/storage/src/test/java/tech/pegasys/teku/storage/store/StoreTest.java b/storage/src/test/java/tech/pegasys/teku/storage/store/StoreTest.java index fcb83a42b35..34f4df0456a 100644 --- a/storage/src/test/java/tech/pegasys/teku/storage/store/StoreTest.java +++ b/storage/src/test/java/tech/pegasys/teku/storage/store/StoreTest.java @@ -51,6 +51,7 @@ import tech.pegasys.teku.spec.generator.ChainBuilder; import tech.pegasys.teku.storage.api.StubStorageUpdateChannel; import tech.pegasys.teku.storage.api.StubStorageUpdateChannelWithDelays; +import tech.pegasys.teku.storage.server.DatabaseArchiveNoopWriter; import tech.pegasys.teku.storage.storageSystem.InMemoryStorageSystemBuilder; import tech.pegasys.teku.storage.storageSystem.StorageSystem; import tech.pegasys.teku.storage.store.UpdatableStore.StoreTransaction; @@ -392,7 +393,10 @@ public void retrieveEarliestBlobSidecarSlot_shouldReturnUpdatedValue() { maybeEarliestBlobSidecarSlot.isPresent() && maybeEarliestBlobSidecarSlot.get().equals(UInt64.ZERO)); - storageSystem.database().pruneOldestBlobSidecars(UInt64.valueOf(5), 5); + storageSystem + .database() + .pruneOldestBlobSidecars( + UInt64.valueOf(5), 5, DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE); assertThat(store.retrieveEarliestBlobSidecarSlot()) .isCompletedWithValueMatching(