diff --git a/CHANGELOG.md b/CHANGELOG.md index f9bffda3d31..e1e20bc9d74 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,5 +9,6 @@ ### Breaking Changes ### Additions and Improvements +- Clean up old beacon states when switching from ARCHIVE to PRUNE or MINIMAL data storage mode ### Bug Fixes diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/blobs/versions/deneb/BlobSidecar.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/blobs/versions/deneb/BlobSidecar.java index 4c0598462ca..812cfeb2c63 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/blobs/versions/deneb/BlobSidecar.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/blobs/versions/deneb/BlobSidecar.java @@ -138,4 +138,9 @@ public String toLogString() { getKZGCommitment().toAbbreviatedString(), getKZGProof().toAbbreviatedString()); } + + @Override + public BlobSidecarSchema getSchema() { + return (BlobSidecarSchema) super.getSchema(); + } } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index 1fce18dd2aa..8b90393da57 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -499,7 +499,7 @@ private BlockBlobSidecarsTracker internalOnNewBlock( .finish( error -> LOG.error( - "An occurred while attempting to fetch blobs via local EL")); + "An error occurred while attempting to fetch blobs via local EL")); } } }); @@ -576,7 +576,7 @@ private void onFirstSeen(final SlotAndBlockRoot slotAndBlockRoot) { error -> LOG.warn( "Local EL blobs lookup failed: {}", - ExceptionUtils.getMessage(error))) + ExceptionUtils.getRootCauseMessage(error))) .thenRun(() -> this.fetchMissingContentFromRemotePeers(slotAndBlockRoot)), fetchDelay) .finish( diff --git a/services/chainstorage/build.gradle b/services/chainstorage/build.gradle index c338497010e..d147836fa60 100644 --- a/services/chainstorage/build.gradle +++ b/services/chainstorage/build.gradle @@ -12,4 +12,7 @@ dependencies { implementation project(':infrastructure:events') implementation 'org.hyperledger.besu:plugin-api' + + testImplementation testFixtures(project(':infrastructure:async')) + testImplementation testFixtures(project(':ethereum:execution-types')) } diff --git a/services/chainstorage/src/main/java/tech/pegasys/teku/services/chainstorage/StorageService.java b/services/chainstorage/src/main/java/tech/pegasys/teku/services/chainstorage/StorageService.java index 878c90f0023..329211b237b 100644 --- a/services/chainstorage/src/main/java/tech/pegasys/teku/services/chainstorage/StorageService.java +++ b/services/chainstorage/src/main/java/tech/pegasys/teku/services/chainstorage/StorageService.java @@ -16,6 +16,8 @@ import static tech.pegasys.teku.infrastructure.async.AsyncRunnerFactory.DEFAULT_MAX_QUEUE_SIZE; import static tech.pegasys.teku.spec.config.Constants.STORAGE_QUERY_CHANNEL_PARALLELISM; +import com.google.common.annotations.VisibleForTesting; +import java.nio.file.Path; import java.util.Optional; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -33,6 +35,9 @@ import tech.pegasys.teku.storage.api.CombinedStorageChannel; import tech.pegasys.teku.storage.api.Eth1DepositStorageChannel; import tech.pegasys.teku.storage.api.VoteUpdateChannel; +import tech.pegasys.teku.storage.archive.DataArchive; +import tech.pegasys.teku.storage.archive.fsarchive.FileSystemArchive; +import tech.pegasys.teku.storage.archive.nooparchive.NoopDataArchive; import tech.pegasys.teku.storage.server.BatchingVoteUpdateChannel; import tech.pegasys.teku.storage.server.ChainStorage; import tech.pegasys.teku.storage.server.CombinedStorageChannelSplitter; @@ -126,35 +131,32 @@ protected SafeFuture doStart() { } if (config.getDataStorageMode().storesFinalizedStates() && config.getRetainedSlots() > 0) { - if (config.getDataStorageCreateDbVersion() == DatabaseVersion.LEVELDB_TREE) { - throw new InvalidConfigurationException( - "State pruning is not supported with leveldb_tree database."); - } else { - LOG.info( - "State pruner will run every: {} minute(s), retaining states for the last {} finalized slots. Limited to {} state prune per execution. ", - config.getStatePruningInterval().toMinutes(), - config.getRetainedSlots(), - config.getStatePruningLimit()); - statePruner = - Optional.of( - new StatePruner( - config.getSpec(), - database, - storagePrunerAsyncRunner, - config.getStatePruningInterval(), - config.getRetainedSlots(), - config.getStatePruningLimit(), - "state", - pruningTimingsLabelledGauge, - pruningActiveLabelledGauge)); - } + configureStatePruner( + config.getRetainedSlots(), + storagePrunerAsyncRunner, + pruningTimingsLabelledGauge, + pruningActiveLabelledGauge); + } else if (!config.getDataStorageMode().storesFinalizedStates()) { + configureStatePruner( + StorageConfiguration.DEFAULT_STORAGE_RETAINED_SLOTS, + storagePrunerAsyncRunner, + pruningTimingsLabelledGauge, + pruningActiveLabelledGauge); } + + final DataArchive dataArchive = + config + .getBlobsArchivePath() + .map(path -> new FileSystemArchive(Path.of(path))) + .orElse(new NoopDataArchive()); + if (config.getSpec().isMilestoneSupported(SpecMilestone.DENEB)) { blobsPruner = Optional.of( new BlobSidecarPruner( config.getSpec(), database, + dataArchive, serviceConfig.getMetricsSystem(), storagePrunerAsyncRunner, serviceConfig.getTimeProvider(), @@ -216,6 +218,41 @@ protected SafeFuture doStart() { .orElseGet(() -> SafeFuture.completedFuture(null))); } + void configureStatePruner( + final long slotsToRetain, + final AsyncRunner storagePrunerAsyncRunner, + final SettableLabelledGauge pruningTimingsLabelledGauge, + final SettableLabelledGauge pruningActiveLabelledGauge) { + if (config.getDataStorageCreateDbVersion() == DatabaseVersion.LEVELDB_TREE) { + throw new InvalidConfigurationException( + "State pruning is not supported with leveldb_tree database."); + } + + LOG.info( + "State pruner will run every: {} minute(s), retaining states for the last {} finalized slots. Limited to {} state prune per execution.", + config.getStatePruningInterval().toMinutes(), + slotsToRetain, + config.getStatePruningLimit()); + + statePruner = + Optional.of( + new StatePruner( + config.getSpec(), + database, + storagePrunerAsyncRunner, + config.getStatePruningInterval(), + slotsToRetain, + config.getStatePruningLimit(), + "state", + pruningTimingsLabelledGauge, + pruningActiveLabelledGauge)); + } + + @VisibleForTesting + public Optional getStatePruner() { + return statePruner; + } + @Override protected SafeFuture doStop() { return blockPruner diff --git a/services/chainstorage/src/test/java/tech/pegasys/teku/services/chainstorage/StorageServiceTest.java b/services/chainstorage/src/test/java/tech/pegasys/teku/services/chainstorage/StorageServiceTest.java new file mode 100644 index 00000000000..0d19a70753e --- /dev/null +++ b/services/chainstorage/src/test/java/tech/pegasys/teku/services/chainstorage/StorageServiceTest.java @@ -0,0 +1,118 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.services.chainstorage; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.nio.file.Path; +import java.util.Optional; +import org.hyperledger.besu.plugin.services.MetricsSystem; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import tech.pegasys.teku.ethereum.execution.types.Eth1Address; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.async.StubAsyncRunner; +import tech.pegasys.teku.infrastructure.async.StubAsyncRunnerFactory; +import tech.pegasys.teku.infrastructure.events.EventChannels; +import tech.pegasys.teku.service.serviceutils.ServiceConfig; +import tech.pegasys.teku.service.serviceutils.layout.DataDirLayout; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.storage.server.DatabaseVersion; +import tech.pegasys.teku.storage.server.StateStorageMode; +import tech.pegasys.teku.storage.server.StorageConfiguration; +import tech.pegasys.teku.storage.server.pruner.StatePruner; + +class StorageServiceTest { + + private final ServiceConfig serviceConfig = mock(ServiceConfig.class); + private final StorageConfiguration storageConfiguration = mock(StorageConfiguration.class); + private final MetricsSystem metricsSystem = mock(MetricsSystem.class); + private final DataDirLayout dataDirLayout = mock(DataDirLayout.class); + private final Eth1Address eth1DepositContract = mock(Eth1Address.class); + private final Spec spec = mock(Spec.class); + private final EventChannels eventChannels = mock(EventChannels.class); + private StorageService storageService; + + @BeforeEach + void setUp(@TempDir final Path tempDir) { + when(serviceConfig.getMetricsSystem()).thenReturn(metricsSystem); + when(dataDirLayout.getBeaconDataDirectory()).thenReturn(tempDir); + when(serviceConfig.getDataDirLayout()).thenReturn(dataDirLayout); + when(storageConfiguration.getDataStorageCreateDbVersion()).thenReturn(DatabaseVersion.NOOP); + when(storageConfiguration.getMaxKnownNodeCacheSize()) + .thenReturn(StorageConfiguration.DEFAULT_MAX_KNOWN_NODE_CACHE_SIZE); + when(storageConfiguration.getDataStorageFrequency()) + .thenReturn(StorageConfiguration.DEFAULT_STORAGE_FREQUENCY); + when(storageConfiguration.getEth1DepositContract()).thenReturn(eth1DepositContract); + when(storageConfiguration.isStoreNonCanonicalBlocksEnabled()).thenReturn(false); + when(storageConfiguration.getSpec()).thenReturn(spec); + + when(eventChannels.subscribe(any(), any())).thenReturn(eventChannels); + when(serviceConfig.getEventChannels()).thenReturn(eventChannels); + + final StubAsyncRunnerFactory asyncRunnerFactory = new StubAsyncRunnerFactory(); + when(serviceConfig.getAsyncRunnerFactory()).thenReturn(asyncRunnerFactory); + + final StubAsyncRunner stubAsyncRunner = new StubAsyncRunner(); + when(serviceConfig.createAsyncRunner(any(), anyInt(), anyInt(), anyInt())) + .thenReturn(stubAsyncRunner); + + storageService = new StorageService(serviceConfig, storageConfiguration, false, false); + } + + @Test + void shouldNotSetupStatePrunerWhenArchiveMode() { + when(storageConfiguration.getDataStorageMode()).thenReturn(StateStorageMode.ARCHIVE); + final SafeFuture future = storageService.doStart(); + final Optional statePruner = storageService.getStatePruner(); + assertThat(future).isCompleted(); + assertThat(statePruner).isEmpty(); + } + + @Test + void shouldSetupStatePrunerWhenArchiveModeAndRetentionSlotsEnabled() { + when(storageConfiguration.getDataStorageMode()).thenReturn(StateStorageMode.ARCHIVE); + when(storageConfiguration.getRetainedSlots()).thenReturn(5L); + final SafeFuture future = storageService.doStart(); + final Optional statePruner = storageService.getStatePruner(); + assertThat(future).isCompleted(); + assertThat(statePruner).isPresent(); + assertThat(storageService.getStatePruner().get().isRunning()).isTrue(); + } + + @Test + void shouldSetupStatePrunerWhenPruneMode() { + when(storageConfiguration.getDataStorageMode()).thenReturn(StateStorageMode.PRUNE); + final SafeFuture future = storageService.doStart(); + final Optional statePruner = storageService.getStatePruner(); + assertThat(future).isCompleted(); + assertThat(statePruner).isPresent(); + assertThat(storageService.getStatePruner().get().isRunning()).isTrue(); + } + + @Test + void shouldSetupStatePrunerWhenMinimalMode() { + when(storageConfiguration.getDataStorageMode()).thenReturn(StateStorageMode.MINIMAL); + final SafeFuture future = storageService.doStart(); + final Optional statePruner = storageService.getStatePruner(); + assertThat(future).isCompleted(); + assertThat(statePruner).isPresent(); + assertThat(storageService.getStatePruner().get().isRunning()).isTrue(); + } +} diff --git a/storage/src/integration-test/java/tech/pegasys/teku/storage/server/kvstore/DatabaseTest.java b/storage/src/integration-test/java/tech/pegasys/teku/storage/server/kvstore/DatabaseTest.java index 86a39a90f21..89a7e8c40c9 100644 --- a/storage/src/integration-test/java/tech/pegasys/teku/storage/server/kvstore/DatabaseTest.java +++ b/storage/src/integration-test/java/tech/pegasys/teku/storage/server/kvstore/DatabaseTest.java @@ -82,9 +82,9 @@ import tech.pegasys.teku.storage.api.OnDiskStoreData; import tech.pegasys.teku.storage.api.StorageUpdate; import tech.pegasys.teku.storage.api.WeakSubjectivityUpdate; +import tech.pegasys.teku.storage.archive.fsarchive.FileSystemArchive; import tech.pegasys.teku.storage.client.RecentChainData; import tech.pegasys.teku.storage.server.Database; -import tech.pegasys.teku.storage.server.DatabaseArchiveNoopWriter; import tech.pegasys.teku.storage.server.DatabaseContext; import tech.pegasys.teku.storage.server.ShuttingDownException; import tech.pegasys.teku.storage.server.StateStorageMode; @@ -122,20 +122,24 @@ public class DatabaseTest { private StateStorageMode storageMode; private StorageSystem storageSystem; private Database database; + private FileSystemArchive fileSystemDataArchive; private RecentChainData recentChainData; private UpdatableStore store; private final List storageSystems = new ArrayList<>(); @BeforeEach - public void setup() { + public void setup() throws IOException { setupWithSpec(TestSpecFactory.createMinimalDeneb()); } - private void setupWithSpec(final Spec spec) { + private void setupWithSpec(final Spec spec) throws IOException { this.spec = spec; this.dataStructureUtil = new DataStructureUtil(spec); this.chainBuilder = ChainBuilder.create(spec, VALIDATOR_KEYS); this.chainProperties = new ChainProperties(spec); + final Path blobsArchive = Files.createTempDirectory("blobs"); + tmpDirectories.add(blobsArchive.toFile()); + this.fileSystemDataArchive = new FileSystemArchive(blobsArchive); genesisBlockAndState = chainBuilder.generateGenesis(genesisTime, true); genesisCheckpoint = getCheckpointForBlock(genesisBlockAndState.getBlock()); genesisAnchor = AnchorPoint.fromGenesisState(spec, genesisBlockAndState.getState()); @@ -296,7 +300,7 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti // let's prune with limit to 1 assertThat( database.pruneOldestBlobSidecars( - UInt64.MAX_VALUE, 1, DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE)) + UInt64.MAX_VALUE, 1, fileSystemDataArchive.getBlobSidecarWriter())) .isTrue(); assertBlobSidecarKeys( blobSidecar2_0.getSlot(), @@ -313,10 +317,13 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti assertThat(database.getEarliestBlobSidecarSlot()).contains(UInt64.valueOf(2)); assertThat(database.getBlobSidecarColumnCount()).isEqualTo(4L); + // check if the pruned blob was written to disk. Not validating contents here. + assertThat(getSlotBlobsArchiveFile(blobSidecar1_0)).exists(); + assertThat(getSlotBlobsArchiveFile(blobSidecar2_0)).doesNotExist(); + // let's prune up to slot 1 (nothing will be pruned) assertThat( - database.pruneOldestBlobSidecars( - ONE, 10, DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE)) + database.pruneOldestBlobSidecars(ONE, 10, fileSystemDataArchive.getBlobSidecarWriter())) .isFalse(); assertBlobSidecarKeys( blobSidecar2_0.getSlot(), @@ -336,7 +343,7 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti // let's prune all from slot 4 excluded assertThat( database.pruneOldestBlobSidecars( - UInt64.valueOf(3), 10, DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE)) + UInt64.valueOf(3), 10, fileSystemDataArchive.getBlobSidecarWriter())) .isFalse(); assertBlobSidecarKeys( blobSidecar1_0.getSlot(), blobSidecar5_0.getSlot(), blobSidecarToKey(blobSidecar5_0)); @@ -344,15 +351,27 @@ public void verifyBlobsLifecycle(final DatabaseContext context) throws IOExcepti assertThat(database.getEarliestBlobSidecarSlot()).contains(UInt64.valueOf(4)); assertThat(database.getBlobSidecarColumnCount()).isEqualTo(1L); + // check if the pruned blob was written to disk. Not validating contents here. + assertThat(getSlotBlobsArchiveFile(blobSidecar2_0)).exists(); + assertThat(getSlotBlobsArchiveFile(blobSidecar3_0)).exists(); + assertThat(getSlotBlobsArchiveFile(blobSidecar5_0)).doesNotExist(); + // let's prune all assertThat( database.pruneOldestBlobSidecars( - UInt64.valueOf(5), 1, DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE)) + UInt64.valueOf(5), 1, fileSystemDataArchive.getBlobSidecarWriter())) .isTrue(); // all empty now assertBlobSidecarKeys(ZERO, UInt64.valueOf(10)); assertThat(database.getEarliestBlobSidecarSlot()).contains(UInt64.valueOf(6)); assertThat(database.getBlobSidecarColumnCount()).isEqualTo(0L); + + // check if the pruned blob was written to disk. Not validating contents here. + assertThat(getSlotBlobsArchiveFile(blobSidecar5_0)).exists(); + } + + private File getSlotBlobsArchiveFile(final BlobSidecar blobSidecar) { + return fileSystemDataArchive.resolve(blobSidecar.getSlotAndBlockRoot()); } @TestTemplate @@ -449,7 +468,10 @@ public void verifyNonCanonicalBlobsLifecycle(final DatabaseContext context) thro List.of(blobSidecar5_0))); // Pruning with a prune limit set to 1: Only blobSidecar1 will be pruned - assertThat(database.pruneOldestNonCanonicalBlobSidecars(UInt64.MAX_VALUE, 1)).isTrue(); + assertThat( + database.pruneOldestNonCanonicalBlobSidecars( + UInt64.MAX_VALUE, 1, fileSystemDataArchive.getBlobSidecarWriter())) + .isTrue(); assertNonCanonicalBlobSidecarKeys( blobSidecar2_0.getSlot(), blobSidecar5_0.getSlot(), @@ -464,8 +486,15 @@ public void verifyNonCanonicalBlobsLifecycle(final DatabaseContext context) thro blobSidecar5_0.getSlot(), List.of(blobSidecar5_0))); assertThat(database.getNonCanonicalBlobSidecarColumnCount()).isEqualTo(4L); + // check if the pruned blob was written to disk. Not validating contents here. + assertThat(getSlotBlobsArchiveFile(blobSidecar1_0)).exists(); + assertThat(getSlotBlobsArchiveFile(blobSidecar2_0)).doesNotExist(); + // Pruning up to slot 1: No blobs pruned - assertThat(database.pruneOldestNonCanonicalBlobSidecars(ONE, 10)).isFalse(); + assertThat( + database.pruneOldestNonCanonicalBlobSidecars( + ONE, 10, fileSystemDataArchive.getBlobSidecarWriter())) + .isFalse(); assertNonCanonicalBlobSidecarKeys( blobSidecar2_0.getSlot(), blobSidecar5_0.getSlot(), @@ -480,18 +509,36 @@ public void verifyNonCanonicalBlobsLifecycle(final DatabaseContext context) thro blobSidecar5_0.getSlot(), List.of(blobSidecar5_0))); assertThat(database.getNonCanonicalBlobSidecarColumnCount()).isEqualTo(4L); + // check if the pruned blob was written to disk. Not validating contents here. + assertThat(getSlotBlobsArchiveFile(blobSidecar1_0)).exists(); + assertThat(getSlotBlobsArchiveFile(blobSidecar2_0)).doesNotExist(); + // Prune blobs up to slot 3 - assertThat(database.pruneOldestNonCanonicalBlobSidecars(UInt64.valueOf(3), 10)).isFalse(); + assertThat( + database.pruneOldestNonCanonicalBlobSidecars( + UInt64.valueOf(3), 10, fileSystemDataArchive.getBlobSidecarWriter())) + .isFalse(); assertNonCanonicalBlobSidecarKeys( blobSidecar1_0.getSlot(), blobSidecar5_0.getSlot(), blobSidecarToKey(blobSidecar5_0)); assertNonCanonicalBlobSidecars(Map.of(blobSidecar5_0.getSlot(), List.of(blobSidecar5_0))); assertThat(database.getNonCanonicalBlobSidecarColumnCount()).isEqualTo(1L); + // check if the pruned blob was written to disk. Not validating contents here. + assertThat(getSlotBlobsArchiveFile(blobSidecar2_0)).exists(); + assertThat(getSlotBlobsArchiveFile(blobSidecar3_0)).exists(); + assertThat(getSlotBlobsArchiveFile(blobSidecar5_0)).doesNotExist(); + // Pruning all blobs - assertThat(database.pruneOldestNonCanonicalBlobSidecars(UInt64.valueOf(5), 1)).isTrue(); + assertThat( + database.pruneOldestNonCanonicalBlobSidecars( + UInt64.valueOf(5), 1, fileSystemDataArchive.getBlobSidecarWriter())) + .isTrue(); // No blobs should be left assertNonCanonicalBlobSidecarKeys(ZERO, UInt64.valueOf(10)); assertThat(database.getNonCanonicalBlobSidecarColumnCount()).isEqualTo(0L); + + // check if the pruned blob was written to disk. Not validating contents here. + assertThat(getSlotBlobsArchiveFile(blobSidecar5_0)).exists(); } @TestTemplate diff --git a/storage/src/main/java/tech/pegasys/teku/storage/archive/DataArchive.java b/storage/src/main/java/tech/pegasys/teku/storage/archive/DataArchive.java new file mode 100644 index 00000000000..2cad69228e3 --- /dev/null +++ b/storage/src/main/java/tech/pegasys/teku/storage/archive/DataArchive.java @@ -0,0 +1,35 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.archive; + +import java.io.IOException; +import java.util.List; +import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; + +/** + * Interface for a data archive which stores prunable BlobSidecars outside the data availability + * window and could be extended later to include other data types. It is expected that the + * DataArchive is on disk or externally stored with slow write and recovery times. Initial interface + * is write only, but may be expanded to include read operations later. + */ +public interface DataArchive { + + /** + * Returns the archive writer capable of storing BlobSidecars. + * + * @return a closeable DataArchiveWriter for writing BlobSidecars + * @throws IOException throw exception if it fails to get a writer. + */ + DataArchiveWriter> getBlobSidecarWriter() throws IOException; +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/DatabaseArchiveWriter.java b/storage/src/main/java/tech/pegasys/teku/storage/archive/DataArchiveWriter.java similarity index 63% rename from storage/src/main/java/tech/pegasys/teku/storage/server/DatabaseArchiveWriter.java rename to storage/src/main/java/tech/pegasys/teku/storage/archive/DataArchiveWriter.java index 479a5e4d866..1598c9b8d76 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/DatabaseArchiveWriter.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/archive/DataArchiveWriter.java @@ -11,16 +11,17 @@ * specific language governing permissions and limitations under the License. */ -package tech.pegasys.teku.storage.server; +package tech.pegasys.teku.storage.archive; + +import java.io.Closeable; /** - * A functional interface to allow storing data that is to be pruned from the Database. If the store - * function is successful it returns true, signalling the data can be pruned. If the store function - * fails, the data was not stored and the data should not be pruned. + * An interface to allow storing data that is to be pruned from the Database. If the store function + * is successful it returns true, signalling the data can be pruned. If the store function fails, + * the data was not stored and the data should not be pruned. * * @param the data to be stored. */ -@FunctionalInterface -public interface DatabaseArchiveWriter { +public interface DataArchiveWriter extends Closeable { boolean archive(final T data); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/archive/fsarchive/BlobSidecarJsonWriter.java b/storage/src/main/java/tech/pegasys/teku/storage/archive/fsarchive/BlobSidecarJsonWriter.java new file mode 100644 index 00000000000..ee361c2b1bc --- /dev/null +++ b/storage/src/main/java/tech/pegasys/teku/storage/archive/fsarchive/BlobSidecarJsonWriter.java @@ -0,0 +1,44 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.archive.fsarchive; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static tech.pegasys.teku.infrastructure.json.types.DeserializableTypeDefinition.listOf; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import java.util.Objects; +import tech.pegasys.teku.infrastructure.json.JsonUtil; +import tech.pegasys.teku.infrastructure.json.types.SerializableTypeDefinition; +import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; + +public class BlobSidecarJsonWriter { + + public void writeSlotBlobSidecars(final OutputStream out, final List blobSidecars) + throws IOException { + Objects.requireNonNull(out); + Objects.requireNonNull(blobSidecars); + + // Technically not possible as pruner prunes sidecars and not slots. + if (blobSidecars.isEmpty()) { + out.write("[]".getBytes(UTF_8)); + return; + } + + final SerializableTypeDefinition> type = + listOf(blobSidecars.getFirst().getSchema().getJsonTypeDefinition()); + JsonUtil.serializeToBytes(blobSidecars, type, out); + } +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/archive/fsarchive/FileSystemArchive.java b/storage/src/main/java/tech/pegasys/teku/storage/archive/fsarchive/FileSystemArchive.java new file mode 100644 index 00000000000..f731a1c081e --- /dev/null +++ b/storage/src/main/java/tech/pegasys/teku/storage/archive/fsarchive/FileSystemArchive.java @@ -0,0 +1,137 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.archive.fsarchive; + +import java.io.BufferedWriter; +import java.io.Closeable; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; +import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; +import tech.pegasys.teku.storage.archive.DataArchive; +import tech.pegasys.teku.storage.archive.DataArchiveWriter; + +/** + * A file system based implementations of the DataArchive. Writes to a directory using the + * PathResolver method to decide where to write the files. + */ +public class FileSystemArchive implements DataArchive { + static final String INDEX_FILE = "index.dat"; + private static final Logger LOG = LogManager.getLogger(); + + private final Path baseDirectory; + private final BlobSidecarJsonWriter jsonWriter; + + public FileSystemArchive(final Path baseDirectory) { + this.baseDirectory = baseDirectory; + this.jsonWriter = new BlobSidecarJsonWriter(); + } + + @Override + public DataArchiveWriter> getBlobSidecarWriter() throws IOException { + + try { + final File indexFile = baseDirectory.resolve(INDEX_FILE).toFile(); + return new FileSystemBlobSidecarWriter(indexFile); + } catch (IOException e) { + LOG.warn("Unable to create BlobSidecar archive writer", e); + throw e; + } + } + + private class FileSystemBlobSidecarWriter + implements DataArchiveWriter>, Closeable { + final BufferedWriter indexWriter; + + public FileSystemBlobSidecarWriter(final File indexFile) throws IOException { + indexWriter = + new BufferedWriter( + new OutputStreamWriter( + new FileOutputStream(indexFile, true), StandardCharsets.UTF_8)); + } + + @Override + public boolean archive(final List blobSidecars) { + if (blobSidecars == null || blobSidecars.isEmpty()) { + return true; + } + + final SlotAndBlockRoot slotAndBlockRoot = blobSidecars.getFirst().getSlotAndBlockRoot(); + final File file = resolve(slotAndBlockRoot); + if (file.exists()) { + LOG.error("Failed to write BlobSidecar. File exists: {}", file.toString()); + return false; + } + + try { + Files.createDirectories(file.toPath().getParent()); + } catch (IOException e) { + LOG.error( + "Failed to write BlobSidecar. Could not make directories to: {}", + file.getParentFile().toString()); + return false; + } + + try (FileOutputStream output = new FileOutputStream(file)) { + jsonWriter.writeSlotBlobSidecars(output, blobSidecars); + indexWriter.write(formatIndexOutput(slotAndBlockRoot)); + indexWriter.newLine(); + return true; + } catch (IOException | NullPointerException e) { + LOG.error("Failed to write BlobSidecar.", e); + return false; + } + } + + private String formatIndexOutput(final SlotAndBlockRoot slotAndBlockRoot) { + return slotAndBlockRoot.getSlot() + + " " + + slotAndBlockRoot.getBlockRoot().toUnprefixedHexString(); + } + + @Override + public void close() throws IOException { + indexWriter.flush(); + indexWriter.close(); + } + } + + /** + * Given a basePath, slot and block root, return where to store/find the BlobSidecar. Initial + * implementation uses blockRoot as a hex string in the directory of the first two characters. + * + * @param slotAndBlockRoot The slot and block root. + * @return a path of where to store or find the BlobSidecar + */ + public File resolve(final SlotAndBlockRoot slotAndBlockRoot) { + // For blockroot 0x1a2bcd... the directory is basePath/1a/2b/1a2bcd... + // 256 * 256 directories = 65,536. + // Assume 8000 to 10000 blobs per day. With perfect hash distribution, + // all directories have one file after a week. After 1 year, expect 50 files in each directory. + String blockRootString = slotAndBlockRoot.getBlockRoot().toUnprefixedHexString(); + final String dir1 = blockRootString.substring(0, 2); + final String dir2 = blockRootString.substring(2, 4); + final String blobSidecarFilename = + dir1 + File.separator + dir2 + File.separator + blockRootString; + return baseDirectory.resolve(blobSidecarFilename).toFile(); + } +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/DatabaseArchiveNoopWriter.java b/storage/src/main/java/tech/pegasys/teku/storage/archive/nooparchive/DataArchiveNoopWriter.java similarity index 67% rename from storage/src/main/java/tech/pegasys/teku/storage/server/DatabaseArchiveNoopWriter.java rename to storage/src/main/java/tech/pegasys/teku/storage/archive/nooparchive/DataArchiveNoopWriter.java index 6a59598bc99..0e05ec7b7e5 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/DatabaseArchiveNoopWriter.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/archive/nooparchive/DataArchiveNoopWriter.java @@ -11,17 +11,18 @@ * specific language governing permissions and limitations under the License. */ -package tech.pegasys.teku.storage.server; +package tech.pegasys.teku.storage.archive.nooparchive; -import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; +import java.io.IOException; +import tech.pegasys.teku.storage.archive.DataArchiveWriter; -public class DatabaseArchiveNoopWriter implements DatabaseArchiveWriter { - - public static final DatabaseArchiveNoopWriter NOOP_BLOBSIDECAR_STORE = - new DatabaseArchiveNoopWriter<>(); +public class DataArchiveNoopWriter implements DataArchiveWriter { @Override public boolean archive(final T data) { return true; } + + @Override + public void close() throws IOException {} } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/archive/nooparchive/NoopDataArchive.java b/storage/src/main/java/tech/pegasys/teku/storage/archive/nooparchive/NoopDataArchive.java new file mode 100644 index 00000000000..11efeccdcb1 --- /dev/null +++ b/storage/src/main/java/tech/pegasys/teku/storage/archive/nooparchive/NoopDataArchive.java @@ -0,0 +1,30 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.archive.nooparchive; + +import java.util.List; +import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; +import tech.pegasys.teku.storage.archive.DataArchive; +import tech.pegasys.teku.storage.archive.DataArchiveWriter; + +public class NoopDataArchive implements DataArchive { + + private static final DataArchiveWriter> BLOB_SIDECAR_WRITER = + new DataArchiveNoopWriter<>(); + + @Override + public DataArchiveWriter> getBlobSidecarWriter() { + return BLOB_SIDECAR_WRITER; + } +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/Database.java b/storage/src/main/java/tech/pegasys/teku/storage/server/Database.java index 32e3bfd4671..b4a06ee6004 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/Database.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/Database.java @@ -40,6 +40,7 @@ import tech.pegasys.teku.storage.api.UpdateResult; import tech.pegasys.teku.storage.api.WeakSubjectivityState; import tech.pegasys.teku.storage.api.WeakSubjectivityUpdate; +import tech.pegasys.teku.storage.archive.DataArchiveWriter; public interface Database extends AutoCloseable { @@ -72,16 +73,17 @@ void storeFinalizedBlocks( * of pruneLimit is to softly cap DB operation time. * * @param lastSlotToPrune inclusive, not reached if limit happens first - * @param pruneLimit soft BlobSidecars (not slots) limit + * @param pruneLimit maximum number of slots to prune. * @param archiveWriter write BlobSidecars to archive when pruning. * @return true if number of pruned blobs reached the pruneLimit, false otherwise */ boolean pruneOldestBlobSidecars( UInt64 lastSlotToPrune, int pruneLimit, - final DatabaseArchiveWriter archiveWriter); + final DataArchiveWriter> archiveWriter); - boolean pruneOldestNonCanonicalBlobSidecars(UInt64 lastSlotToPrune, int pruneLimit); + boolean pruneOldestNonCanonicalBlobSidecars( + UInt64 lastSlotToPrune, int pruneLimit, DataArchiveWriter> archiveWriter); @MustBeClosed Stream streamBlobSidecarKeys(UInt64 startSlot, UInt64 endSlot); diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/StorageConfiguration.java b/storage/src/main/java/tech/pegasys/teku/storage/server/StorageConfiguration.java index a413e6e66c7..3a28619081b 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/StorageConfiguration.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/StorageConfiguration.java @@ -19,6 +19,7 @@ import static tech.pegasys.teku.storage.server.StateStorageMode.PRUNE; import static tech.pegasys.teku.storage.server.VersionedDatabaseFactory.STORAGE_MODE_PATH; +import java.io.File; import java.nio.file.Path; import java.time.Duration; import java.util.Optional; @@ -42,9 +43,9 @@ public class StorageConfiguration { public static final long DEFAULT_STORAGE_RETAINED_SLOTS = 0; public static final int DEFAULT_STATE_PRUNING_LIMIT = 1; - // 60/12 = 5 blocks per minute * 6 max blobs per block = 30 blobs per minute at maximum, 15 as - // target. Let's configure 48 pruning per minute, so we have some room for catching up. - public static final int DEFAULT_BLOBS_PRUNING_LIMIT = 48; + // 60/12 = 5 blocks/slots per minute * 6 max blobs per block = 30 blobs per minute at maximum, + // This value prunes blobs by slots, using 12 to allow for catch up. + public static final int DEFAULT_BLOBS_PRUNING_LIMIT = 12; // Max limit we have tested so far without seeing perf degradation public static final int MAX_STATE_PRUNE_LIMIT = 100; @@ -62,6 +63,7 @@ public class StorageConfiguration { private final Duration statePruningInterval; private final Duration blobsPruningInterval; private final int blobsPruningLimit; + private final String blobsArchivePath; private final long retainedSlots; private final int statePruningLimit; @@ -78,6 +80,7 @@ private StorageConfiguration( final int blockPruningLimit, final Duration blobsPruningInterval, final int blobsPruningLimit, + final String blobsArchivePath, final int stateRebuildTimeoutSeconds, final long retainedSlots, final Duration statePruningInterval, @@ -93,6 +96,7 @@ private StorageConfiguration( this.blockPruningLimit = blockPruningLimit; this.blobsPruningInterval = blobsPruningInterval; this.blobsPruningLimit = blobsPruningLimit; + this.blobsArchivePath = blobsArchivePath; this.stateRebuildTimeoutSeconds = stateRebuildTimeoutSeconds; this.retainedSlots = retainedSlots; this.statePruningInterval = statePruningInterval; @@ -148,6 +152,10 @@ public int getBlobsPruningLimit() { return blobsPruningLimit; } + public Optional getBlobsArchivePath() { + return Optional.ofNullable(blobsArchivePath); + } + public long getRetainedSlots() { return retainedSlots; } @@ -178,6 +186,7 @@ public static final class Builder { private int blockPruningLimit = DEFAULT_BLOCK_PRUNING_LIMIT; private Duration blobsPruningInterval = DEFAULT_BLOBS_PRUNING_INTERVAL; private int blobsPruningLimit = DEFAULT_BLOBS_PRUNING_LIMIT; + private String blobsArchivePath = null; private int stateRebuildTimeoutSeconds = DEFAULT_STATE_REBUILD_TIMEOUT_SECONDS; private Duration statePruningInterval = DEFAULT_STATE_PRUNING_INTERVAL; private long retainedSlots = DEFAULT_STORAGE_RETAINED_SLOTS; @@ -274,6 +283,18 @@ public Builder blobsPruningLimit(final int blobsPruningLimit) { return this; } + public Builder blobsArchivePath(final String blobsArchivePath) { + if (blobsArchivePath != null) { + File file = Path.of(blobsArchivePath).toFile(); + if (!file.exists()) { + throw new InvalidConfigurationException( + String.format("Blobs archive path does not exist: '%s'", blobsArchivePath)); + } + } + this.blobsArchivePath = blobsArchivePath; + return this; + } + public Builder retainedSlots(final long retainedSlots) { if (retainedSlots < 0) { throw new InvalidConfigurationException( @@ -319,6 +340,7 @@ public StorageConfiguration build() { blockPruningLimit, blobsPruningInterval, blobsPruningLimit, + blobsArchivePath, stateRebuildTimeoutSeconds, retainedSlots, statePruningInterval, diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreDatabase.java b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreDatabase.java index f3cbf613905..9f3fdf05da0 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreDatabase.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/kvstore/KvStoreDatabase.java @@ -14,10 +14,10 @@ package tech.pegasys.teku.storage.server.kvstore; import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.stream.Collectors.groupingBy; import static tech.pegasys.teku.infrastructure.logging.DbLogger.DB_LOGGER; import static tech.pegasys.teku.infrastructure.logging.StatusLogger.STATUS_LOG; import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ONE; -import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ZERO; import com.google.common.annotations.VisibleForTesting; import com.google.errorprone.annotations.MustBeClosed; @@ -71,9 +71,8 @@ import tech.pegasys.teku.storage.api.UpdateResult; import tech.pegasys.teku.storage.api.WeakSubjectivityState; import tech.pegasys.teku.storage.api.WeakSubjectivityUpdate; +import tech.pegasys.teku.storage.archive.DataArchiveWriter; import tech.pegasys.teku.storage.server.Database; -import tech.pegasys.teku.storage.server.DatabaseArchiveNoopWriter; -import tech.pegasys.teku.storage.server.DatabaseArchiveWriter; import tech.pegasys.teku.storage.server.StateStorageMode; import tech.pegasys.teku.storage.server.kvstore.dataaccess.CombinedKvStoreDao; import tech.pegasys.teku.storage.server.kvstore.dataaccess.KvStoreCombinedDao; @@ -879,7 +878,7 @@ public Optional getNonCanonicalBlobSidecar(final SlotAndBlockRootAn public boolean pruneOldestBlobSidecars( final UInt64 lastSlotToPrune, final int pruneLimit, - final DatabaseArchiveWriter archiveWriter) { + final DataArchiveWriter> archiveWriter) { try (final Stream prunableBlobKeys = streamBlobSidecarKeys(UInt64.ZERO, lastSlotToPrune); final FinalizedUpdater updater = finalizedUpdater()) { @@ -889,16 +888,14 @@ public boolean pruneOldestBlobSidecars( @Override public boolean pruneOldestNonCanonicalBlobSidecars( - final UInt64 lastSlotToPrune, final int pruneLimit) { + final UInt64 lastSlotToPrune, + final int pruneLimit, + final DataArchiveWriter> archiveWriter) { try (final Stream prunableNoncanonicalBlobKeys = streamNonCanonicalBlobSidecarKeys(UInt64.ZERO, lastSlotToPrune); final FinalizedUpdater updater = finalizedUpdater()) { return pruneBlobSidecars( - pruneLimit, - prunableNoncanonicalBlobKeys, - updater, - DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE, - true); + pruneLimit, prunableNoncanonicalBlobKeys, updater, archiveWriter, true); } } @@ -906,43 +903,64 @@ private boolean pruneBlobSidecars( final int pruneLimit, final Stream prunableBlobKeys, final FinalizedUpdater updater, - final DatabaseArchiveWriter archiveWriter, - final boolean nonCanonicalblobSidecars) { - int remaining = pruneLimit; + final DataArchiveWriter> archiveWriter, + final boolean nonCanonicalBlobSidecars) { + int pruned = 0; Optional earliestBlobSidecarSlot = Optional.empty(); - for (final Iterator it = prunableBlobKeys.iterator(); - it.hasNext(); ) { - --remaining; - final boolean finished = remaining < 0; - final SlotAndBlockRootAndBlobIndex key = it.next(); - // Before we finish we should check that there are no BlobSidecars left in the same slot - if (finished && key.getBlobIndex().equals(ZERO)) { - break; + + // Group the BlobSidecars by slot. Potential for higher memory usage + // if it hasn't been pruned in a while + final Map> prunableMap = + prunableBlobKeys.collect(groupingBy(SlotAndBlockRootAndBlobIndex::getSlot)); + + // pruneLimit is the number of slots to prune, not the number of BlobSidecars + final List slots = prunableMap.keySet().stream().sorted().limit(pruneLimit).toList(); + for (final UInt64 slot : slots) { + final List keys = prunableMap.get(slot); + + // Retrieve the BlobSidecars for archiving. + final List blobSidecars = + keys.stream() + .map( + nonCanonicalBlobSidecars + ? this::getNonCanonicalBlobSidecar + : this::getBlobSidecar) + .filter(Optional::isPresent) + .map(Optional::get) + .toList(); + + // Just warn if we failed to find all the BlobSidecars. + if (keys.size() != blobSidecars.size()) { + LOG.warn("Failed to retrieve BlobSidecars for keys: {}", keys); } - // Attempt to archive the BlobSidecar if present. If there's no BlobSidecar return true. - final boolean blobSidecarArchived = - getBlobSidecar(key).map(archiveWriter::archive).orElse(Boolean.TRUE); + + // Attempt to archive the BlobSidecars. + final boolean blobSidecarArchived = archiveWriter.archive(blobSidecars); if (!blobSidecarArchived) { - LOG.warn("Failed to archive BlobSidecar for slot:{}. Stopping pruning", key.getSlot()); + LOG.error("Failed to archive and prune BlobSidecars. Stopping pruning"); break; } - if (nonCanonicalblobSidecars) { - updater.removeNonCanonicalBlobSidecar(key); - } else { - earliestBlobSidecarSlot = Optional.of(key.getSlot().plus(1)); - updater.removeBlobSidecar(key); + + // Remove the BlobSidecars from the database. + for (final SlotAndBlockRootAndBlobIndex key : keys) { + if (nonCanonicalBlobSidecars) { + updater.removeNonCanonicalBlobSidecar(key); + } else { + updater.removeBlobSidecar(key); + earliestBlobSidecarSlot = Optional.of(slot.plus(1)); + } } + ++pruned; } - if (!nonCanonicalblobSidecars) { + if (!nonCanonicalBlobSidecars) { earliestBlobSidecarSlot.ifPresent(updater::setEarliestBlobSidecarSlot); } updater.commit(); - // `pruned` will be greater when we reach pruneLimit not on the latest BlobSidecar - // in a slot + // `pruned` will be greater when we reach pruneLimit not on the latest BlobSidecar in a slot return pruned >= pruneLimit; } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/noop/NoOpDatabase.java b/storage/src/main/java/tech/pegasys/teku/storage/server/noop/NoOpDatabase.java index 53986e88fce..abbf50e67d4 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/noop/NoOpDatabase.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/noop/NoOpDatabase.java @@ -43,8 +43,8 @@ import tech.pegasys.teku.storage.api.UpdateResult; import tech.pegasys.teku.storage.api.WeakSubjectivityState; import tech.pegasys.teku.storage.api.WeakSubjectivityUpdate; +import tech.pegasys.teku.storage.archive.DataArchiveWriter; import tech.pegasys.teku.storage.server.Database; -import tech.pegasys.teku.storage.server.DatabaseArchiveWriter; public class NoOpDatabase implements Database { @@ -326,13 +326,15 @@ public Optional getEarliestBlobSidecarSlot() { public boolean pruneOldestBlobSidecars( final UInt64 lastSlotToPrune, final int pruneLimit, - final DatabaseArchiveWriter archiveWriter) { + final DataArchiveWriter> archiveWriter) { return false; } @Override public boolean pruneOldestNonCanonicalBlobSidecars( - final UInt64 lastSlotToPrune, final int pruneLimit) { + final UInt64 lastSlotToPrune, + final int pruneLimit, + final DataArchiveWriter> archiveWriter) { return false; } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPruner.java b/storage/src/main/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPruner.java index f562a2a57e3..c7d54252b42 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPruner.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPruner.java @@ -13,7 +13,9 @@ package tech.pegasys.teku.storage.server.pruner; +import java.io.IOException; import java.time.Duration; +import java.util.List; import java.util.Optional; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicLong; @@ -33,9 +35,9 @@ import tech.pegasys.teku.spec.config.SpecConfig; import tech.pegasys.teku.spec.config.SpecConfigDeneb; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; +import tech.pegasys.teku.storage.archive.DataArchive; +import tech.pegasys.teku.storage.archive.DataArchiveWriter; import tech.pegasys.teku.storage.server.Database; -import tech.pegasys.teku.storage.server.DatabaseArchiveNoopWriter; -import tech.pegasys.teku.storage.server.DatabaseArchiveWriter; import tech.pegasys.teku.storage.server.ShuttingDownException; public class BlobSidecarPruner extends Service { @@ -58,11 +60,12 @@ public class BlobSidecarPruner extends Service { private final AtomicLong blobColumnSize = new AtomicLong(0); private final AtomicLong earliestBlobSidecarSlot = new AtomicLong(-1); private final boolean storeNonCanonicalBlobSidecars; - private final DatabaseArchiveWriter archiveWriter; + private final DataArchive dataArchive; public BlobSidecarPruner( final Spec spec, final Database database, + final DataArchive dataArchive, final MetricsSystem metricsSystem, final AsyncRunner asyncRunner, final TimeProvider timeProvider, @@ -75,6 +78,7 @@ public BlobSidecarPruner( final boolean storeNonCanonicalBlobSidecars) { this.spec = spec; this.database = database; + this.dataArchive = dataArchive; this.asyncRunner = asyncRunner; this.pruneInterval = pruneInterval; this.pruneLimit = pruneLimit; @@ -85,9 +89,6 @@ public BlobSidecarPruner( this.pruningActiveLabelledGauge = pruningActiveLabelledGauge; this.storeNonCanonicalBlobSidecars = storeNonCanonicalBlobSidecars; - // To be updated with other implementations. e.g. filesystem or s3 - this.archiveWriter = DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE; - if (blobSidecarsStorageCountersEnabled) { LabelledGauge labelledGauge = metricsSystem.createLabelledGauge( @@ -152,7 +153,7 @@ private void pruneBlobsPriorToAvailabilityWindow() { return; } LOG.debug("Pruning blobs up to slot {}, limit {}", latestPrunableSlot, pruneLimit); - try { + try (DataArchiveWriter> archiveWriter = dataArchive.getBlobSidecarWriter()) { final long blobsPruningStart = System.currentTimeMillis(); final boolean blobsPruningLimitReached = database.pruneOldestBlobSidecars(latestPrunableSlot, pruneLimit, archiveWriter); @@ -164,12 +165,15 @@ private void pruneBlobsPriorToAvailabilityWindow() { if (storeNonCanonicalBlobSidecars) { final long nonCanonicalBlobsPruningStart = System.currentTimeMillis(); final boolean nonCanonicalBlobsLimitReached = - database.pruneOldestNonCanonicalBlobSidecars(latestPrunableSlot, pruneLimit); + database.pruneOldestNonCanonicalBlobSidecars( + latestPrunableSlot, pruneLimit, archiveWriter); logPruningResult( "Non canonical Blobs pruning finished in {} ms. Limit reached: {}", nonCanonicalBlobsPruningStart, nonCanonicalBlobsLimitReached); } + } catch (IOException ex) { + LOG.error("Failed to get the BlobSidecar archive writer", ex); } catch (ShuttingDownException | RejectedExecutionException ex) { LOG.debug("Shutting down", ex); } diff --git a/storage/src/test/java/tech/pegasys/teku/storage/archive/fsarchive/BlobSidecarJsonWriterTest.java b/storage/src/test/java/tech/pegasys/teku/storage/archive/fsarchive/BlobSidecarJsonWriterTest.java new file mode 100644 index 00000000000..3073f9af907 --- /dev/null +++ b/storage/src/test/java/tech/pegasys/teku/storage/archive/fsarchive/BlobSidecarJsonWriterTest.java @@ -0,0 +1,99 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.archive.fsarchive; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; +import tech.pegasys.teku.spec.util.DataStructureUtil; + +public class BlobSidecarJsonWriterTest { + private static final Spec SPEC = TestSpecFactory.createMinimalDeneb(); + + BlobSidecarJsonWriter blobSidecarJsonWriter; + private DataStructureUtil dataStructureUtil; + + @BeforeEach + public void test() { + this.blobSidecarJsonWriter = new BlobSidecarJsonWriter(); + this.dataStructureUtil = new DataStructureUtil(SPEC); + } + + @Test + void testWriteSlotBlobSidecarsWithEmptyList() throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + List blobSidecars = new ArrayList<>(); + blobSidecarJsonWriter.writeSlotBlobSidecars(out, blobSidecars); + String json = out.toString(StandardCharsets.UTF_8); + assertEquals("[]", json); + } + + @Test + void testWriteSlotBlobSidecarsWithSingleElement() throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + List blobSidecars = new ArrayList<>(); + final BlobSidecar blobSidecar = + dataStructureUtil.randomBlobSidecarForBlock( + dataStructureUtil.randomSignedBeaconBlock(1), 0); + blobSidecars.add(blobSidecar); + blobSidecarJsonWriter.writeSlotBlobSidecars(out, blobSidecars); + String json = out.toString(StandardCharsets.UTF_8); + assertTrue(json.contains("index")); + assertTrue(json.contains("blob")); + assertTrue(json.contains("kzg_commitment")); + assertTrue(json.contains("kzg_proof")); + assertTrue(json.contains("signed_block_header")); + assertTrue(json.contains("parent_root")); + assertTrue(json.contains("state_root")); + assertTrue(json.contains("body_root")); + assertTrue(json.contains("signature")); + } + + @Test + void testWriteSlotBlobSidecarsNulls() { + assertThrows( + NullPointerException.class, () -> blobSidecarJsonWriter.writeSlotBlobSidecars(null, null)); + } + + @Test + void testWriteSlotBlobSidecarsNullOut() { + assertThrows( + NullPointerException.class, + () -> { + List blobSidecars = new ArrayList<>(); + blobSidecarJsonWriter.writeSlotBlobSidecars(null, blobSidecars); + }); + } + + @Test + void testWriteSlotBlobSidecarsNullList() { + assertThrows( + NullPointerException.class, + () -> { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + blobSidecarJsonWriter.writeSlotBlobSidecars(out, null); + }); + } +} diff --git a/storage/src/test/java/tech/pegasys/teku/storage/archive/fsarchive/FileSystemArchiveTest.java b/storage/src/test/java/tech/pegasys/teku/storage/archive/fsarchive/FileSystemArchiveTest.java new file mode 100644 index 00000000000..f7733923a8b --- /dev/null +++ b/storage/src/test/java/tech/pegasys/teku/storage/archive/fsarchive/FileSystemArchiveTest.java @@ -0,0 +1,153 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.storage.archive.fsarchive; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Stream; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.Blob; +import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.type.SszKZGProof; +import tech.pegasys.teku.spec.datastructures.util.SlotAndBlockRootAndBlobIndex; +import tech.pegasys.teku.spec.logic.common.helpers.Predicates; +import tech.pegasys.teku.spec.logic.versions.deneb.helpers.MiscHelpersDeneb; +import tech.pegasys.teku.spec.schemas.SchemaDefinitionsDeneb; +import tech.pegasys.teku.spec.util.DataStructureUtil; +import tech.pegasys.teku.storage.archive.DataArchiveWriter; + +public class FileSystemArchiveTest { + private static final Spec SPEC = TestSpecFactory.createMinimalDeneb(); + private final Predicates predicates = new Predicates(SPEC.getGenesisSpecConfig()); + private final SchemaDefinitionsDeneb schemaDefinitionsDeneb = + SchemaDefinitionsDeneb.required(SPEC.getGenesisSchemaDefinitions()); + private final MiscHelpersDeneb miscHelpersDeneb = + new MiscHelpersDeneb( + SPEC.getGenesisSpecConfig().toVersionDeneb().orElseThrow(), + predicates, + schemaDefinitionsDeneb); + private final DataStructureUtil dataStructureUtil = new DataStructureUtil(SPEC); + + static Path testTempDir; + static FileSystemArchive dataArchive; + + @BeforeAll + static void beforeEach() throws IOException { + testTempDir = Files.createTempDirectory("blobs"); + dataArchive = new FileSystemArchive(testTempDir); + } + + @AfterEach + public void tearDown() throws IOException { + // Delete the temporary directory after each test + if (Files.exists(testTempDir)) { + try (Stream walk = Files.walk(testTempDir)) { + walk.map(Path::toFile) + .forEach( + file -> { + if (!file.delete()) { + file.deleteOnExit(); + } + }); + } + } + } + + BlobSidecar createBlobSidecar() { + final SignedBeaconBlock signedBeaconBlock = + dataStructureUtil.randomSignedBeaconBlockWithCommitments(1); + final Blob blob = dataStructureUtil.randomBlob(); + final SszKZGProof proof = dataStructureUtil.randomSszKZGProof(); + + return miscHelpersDeneb.constructBlobSidecar(signedBeaconBlock, UInt64.ZERO, blob, proof); + } + + @Test + void testResolve() { + SlotAndBlockRootAndBlobIndex slotAndBlockRootAndBlobIndex = + new SlotAndBlockRootAndBlobIndex( + UInt64.ONE, dataStructureUtil.randomBytes32(), UInt64.ZERO); + File file = dataArchive.resolve(slotAndBlockRootAndBlobIndex.getSlotAndBlockRoot()); + + // Check if the file path is correct. Doesn't check the intermediate directories. + assertTrue(file.toString().startsWith(testTempDir.toString())); + assertTrue( + file.toString() + .endsWith(slotAndBlockRootAndBlobIndex.getBlockRoot().toUnprefixedHexString())); + } + + @Test + void testArchiveWithEmptyList() throws IOException { + DataArchiveWriter> blobWriter = dataArchive.getBlobSidecarWriter(); + ArrayList list = new ArrayList<>(); + assertTrue(blobWriter.archive(list)); + blobWriter.close(); + } + + @Test + void testArchiveWithNullList() throws IOException { + DataArchiveWriter> blobWriter = dataArchive.getBlobSidecarWriter(); + assertTrue(blobWriter.archive(null)); + blobWriter.close(); + } + + @Test + void testWriteBlobSidecar() throws IOException { + DataArchiveWriter> blobWriter = dataArchive.getBlobSidecarWriter(); + ArrayList list = new ArrayList<>(); + BlobSidecar blobSidecar = createBlobSidecar(); + list.add(blobSidecar); + assertTrue(blobWriter.archive(list)); + blobWriter.close(); + + // Check if the file was written + try (FileInputStream fis = + new FileInputStream(testTempDir.resolve(FileSystemArchive.INDEX_FILE).toFile())) { + String content = new String(fis.readAllBytes(), StandardCharsets.UTF_8); + String expected = + blobSidecar.getSlot().toString() + + " " + + blobSidecar.getSlotAndBlockRoot().getBlockRoot().toUnprefixedHexString(); + + // Windows new lines are different, so don't include new lines in the comparison. + assertTrue(content.contains(expected)); + } + } + + @Test + void testFileAlreadyExists() throws IOException { + DataArchiveWriter> blobWriter = dataArchive.getBlobSidecarWriter(); + ArrayList list = new ArrayList<>(); + list.add(createBlobSidecar()); + assertTrue(blobWriter.archive(list)); + // Try to write the same file again + assertFalse(blobWriter.archive(list)); + blobWriter.close(); + } +} diff --git a/storage/src/test/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPrunerTest.java b/storage/src/test/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPrunerTest.java index f9829190599..e4bca263f2f 100644 --- a/storage/src/test/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPrunerTest.java +++ b/storage/src/test/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPrunerTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; import java.time.Duration; import java.util.Optional; import org.junit.jupiter.api.BeforeEach; @@ -37,8 +38,9 @@ import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.config.SpecConfig; import tech.pegasys.teku.spec.config.SpecConfigDeneb; +import tech.pegasys.teku.storage.archive.DataArchive; +import tech.pegasys.teku.storage.archive.nooparchive.NoopDataArchive; import tech.pegasys.teku.storage.server.Database; -import tech.pegasys.teku.storage.server.DatabaseArchiveNoopWriter; public class BlobSidecarPrunerTest { public static final Duration PRUNE_INTERVAL = Duration.ofSeconds(5); @@ -55,11 +57,13 @@ public class BlobSidecarPrunerTest { private final StubAsyncRunner asyncRunner = new StubAsyncRunner(timeProvider); private final Database database = mock(Database.class); private final StubMetricsSystem stubMetricsSystem = new StubMetricsSystem(); + private final DataArchive dataArchive = new NoopDataArchive(); private final BlobSidecarPruner blobsPruner = new BlobSidecarPruner( spec, database, + dataArchive, stubMetricsSystem, asyncRunner, timeProvider, @@ -85,7 +89,7 @@ void shouldNotPruneWhenGenesisNotAvailable() { verify(database).getGenesisTime(); verify(database, never()).pruneOldestBlobSidecars(any(), anyInt(), any()); - verify(database, never()).pruneOldestNonCanonicalBlobSidecars(any(), anyInt()); + verify(database, never()).pruneOldestNonCanonicalBlobSidecars(any(), anyInt(), any()); } @Test @@ -94,7 +98,7 @@ void shouldNotPrunePriorGenesis() { verify(database).getGenesisTime(); verify(database, never()).pruneOldestBlobSidecars(any(), anyInt(), any()); - verify(database, never()).pruneOldestNonCanonicalBlobSidecars(any(), anyInt()); + verify(database, never()).pruneOldestNonCanonicalBlobSidecars(any(), anyInt(), any()); } @Test @@ -110,11 +114,11 @@ void shouldNotPruneWhenLatestPrunableIncludeGenesis() { asyncRunner.executeDueActions(); verify(database, never()).pruneOldestBlobSidecars(any(), anyInt(), any()); - verify(database, never()).pruneOldestNonCanonicalBlobSidecars(any(), anyInt()); + verify(database, never()).pruneOldestNonCanonicalBlobSidecars(any(), anyInt(), any()); } @Test - void shouldPruneWhenLatestPrunableSlotIsGreaterThanOldestDAEpoch() { + void shouldPruneWhenLatestPrunableSlotIsGreaterThanOldestDAEpoch() throws IOException { final SpecConfig config = spec.forMilestone(SpecMilestone.DENEB).getConfig(); final SpecConfigDeneb specConfigDeneb = SpecConfigDeneb.required(config); // set current slot to MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS + 1 epoch + half epoch @@ -131,13 +135,16 @@ void shouldPruneWhenLatestPrunableSlotIsGreaterThanOldestDAEpoch() { .pruneOldestBlobSidecars( UInt64.valueOf((slotsPerEpoch / 2) - 1), PRUNE_LIMIT, - DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE); + dataArchive.getBlobSidecarWriter()); verify(database) - .pruneOldestNonCanonicalBlobSidecars(UInt64.valueOf((slotsPerEpoch / 2) - 1), PRUNE_LIMIT); + .pruneOldestNonCanonicalBlobSidecars( + UInt64.valueOf((slotsPerEpoch / 2) - 1), + PRUNE_LIMIT, + dataArchive.getBlobSidecarWriter()); } @Test - void shouldUseEpochsStoreBlobs() { + void shouldUseEpochsStoreBlobs() throws IOException { final SpecConfig config = spec.forMilestone(SpecMilestone.DENEB).getConfig(); final SpecConfigDeneb specConfigDeneb = SpecConfigDeneb.required(config); final int defaultValue = specConfigDeneb.getMinEpochsForBlobSidecarsRequests(); @@ -157,6 +164,7 @@ void shouldUseEpochsStoreBlobs() { new BlobSidecarPruner( specOverride, databaseOverride, + dataArchive, stubMetricsSystem, asyncRunner, timeProvider, @@ -194,9 +202,12 @@ void shouldUseEpochsStoreBlobs() { .pruneOldestBlobSidecars( UInt64.valueOf((slotsPerEpoch / 2) - 1), PRUNE_LIMIT, - DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE); + dataArchive.getBlobSidecarWriter()); verify(databaseOverride) - .pruneOldestNonCanonicalBlobSidecars(UInt64.valueOf((slotsPerEpoch / 2) - 1), PRUNE_LIMIT); + .pruneOldestNonCanonicalBlobSidecars( + UInt64.valueOf((slotsPerEpoch / 2) - 1), + PRUNE_LIMIT, + dataArchive.getBlobSidecarWriter()); } @Test @@ -217,6 +228,7 @@ void shouldNotPruneWhenEpochsStoreBlobsIsMax() { new BlobSidecarPruner( specOverride, databaseOverride, + dataArchive, stubMetricsSystem, asyncRunner, timeProvider, @@ -239,6 +251,6 @@ void shouldNotPruneWhenEpochsStoreBlobsIsMax() { asyncRunner.executeDueActions(); verify(databaseOverride, never()).pruneOldestBlobSidecars(any(), anyInt(), any()); - verify(databaseOverride, never()).pruneOldestNonCanonicalBlobSidecars(any(), anyInt()); + verify(databaseOverride, never()).pruneOldestNonCanonicalBlobSidecars(any(), anyInt(), any()); } } diff --git a/storage/src/test/java/tech/pegasys/teku/storage/store/StoreTest.java b/storage/src/test/java/tech/pegasys/teku/storage/store/StoreTest.java index 34f4df0456a..78a7a957dd0 100644 --- a/storage/src/test/java/tech/pegasys/teku/storage/store/StoreTest.java +++ b/storage/src/test/java/tech/pegasys/teku/storage/store/StoreTest.java @@ -51,7 +51,7 @@ import tech.pegasys.teku.spec.generator.ChainBuilder; import tech.pegasys.teku.storage.api.StubStorageUpdateChannel; import tech.pegasys.teku.storage.api.StubStorageUpdateChannelWithDelays; -import tech.pegasys.teku.storage.server.DatabaseArchiveNoopWriter; +import tech.pegasys.teku.storage.archive.nooparchive.DataArchiveNoopWriter; import tech.pegasys.teku.storage.storageSystem.InMemoryStorageSystemBuilder; import tech.pegasys.teku.storage.storageSystem.StorageSystem; import tech.pegasys.teku.storage.store.UpdatableStore.StoreTransaction; @@ -395,8 +395,7 @@ public void retrieveEarliestBlobSidecarSlot_shouldReturnUpdatedValue() { storageSystem .database() - .pruneOldestBlobSidecars( - UInt64.valueOf(5), 5, DatabaseArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE); + .pruneOldestBlobSidecars(UInt64.valueOf(5), 3, new DataArchiveNoopWriter<>()); assertThat(store.retrieveEarliestBlobSidecarSlot()) .isCompletedWithValueMatching( diff --git a/teku/src/main/java/tech/pegasys/teku/cli/options/BeaconNodeDataOptions.java b/teku/src/main/java/tech/pegasys/teku/cli/options/BeaconNodeDataOptions.java index a8befd3ef7a..8ce5d876600 100644 --- a/teku/src/main/java/tech/pegasys/teku/cli/options/BeaconNodeDataOptions.java +++ b/teku/src/main/java/tech/pegasys/teku/cli/options/BeaconNodeDataOptions.java @@ -164,12 +164,23 @@ public class BeaconNodeDataOptions extends ValidatorClientDataOptions { names = {"--Xdata-storage-blobs-pruning-limit"}, hidden = true, paramLabel = "", - description = "Maximum number of blob sidecars that can be pruned in each pruning session", + description = + "Maximum number of blocks of blob sidecars that can be pruned in each pruning session", fallbackValue = "true", showDefaultValue = Visibility.ALWAYS, arity = "0..1") private int blobsPruningLimit = StorageConfiguration.DEFAULT_BLOBS_PRUNING_LIMIT; + @CommandLine.Option( + names = {"--Xdata-storage-blobs-archive-path"}, + hidden = true, + paramLabel = "", + description = "Path to write pruned blobs", + fallbackValue = "true", + showDefaultValue = Visibility.ALWAYS, + arity = "0..1") + private String blobsArchivePath = null; + @Option( names = {"--Xdata-storage-state-rebuild-timeout-seconds"}, hidden = true, @@ -212,6 +223,7 @@ public void configure(final TekuConfiguration.Builder builder) { .stateRebuildTimeoutSeconds(stateRebuildTimeoutSeconds) .blobsPruningInterval(Duration.ofSeconds(blobsPruningIntervalSeconds)) .blobsPruningLimit(blobsPruningLimit) + .blobsArchivePath(blobsArchivePath) .retainedSlots(dataStorageRetainedSlots) .statePruningInterval(Duration.ofSeconds(statePruningIntervalSeconds)) .statePruningLimit(statePruningLimit)); diff --git a/teku/src/test/java/tech/pegasys/teku/cli/options/BeaconNodeDataOptionsTest.java b/teku/src/test/java/tech/pegasys/teku/cli/options/BeaconNodeDataOptionsTest.java index 7ead08e6080..022e004abfc 100644 --- a/teku/src/test/java/tech/pegasys/teku/cli/options/BeaconNodeDataOptionsTest.java +++ b/teku/src/test/java/tech/pegasys/teku/cli/options/BeaconNodeDataOptionsTest.java @@ -22,6 +22,7 @@ import java.nio.file.Path; import java.time.Duration; import java.util.function.Supplier; +import org.assertj.core.util.Files; import org.junit.jupiter.api.Test; import tech.pegasys.teku.cli.AbstractBeaconNodeCommandTest; import tech.pegasys.teku.config.TekuConfiguration; @@ -189,6 +190,19 @@ void shouldSetBlobsPruningOptions() { assertThat(config.storageConfiguration().getBlobsPruningLimit()).isEqualTo(10); } + @Test + void shouldSetBlobsPruningArchivePath() { + // path needs to exist. + String someTempPath = Files.temporaryFolderPath(); + final TekuConfiguration config = + getTekuConfigurationFromArguments("--Xdata-storage-blobs-archive-path=" + someTempPath); + + assertThat(config.storageConfiguration().getBlobsArchivePath()) + .isPresent() + .get() + .isEqualTo(someTempPath); + } + @Test void shouldNotAllowPruningBlocksAndReconstructingStates() { assertThatThrownBy(