From 7f9c7d1d56d432860988b4a054156bcb4ec6d76c Mon Sep 17 00:00:00 2001 From: David Ryan Date: Thu, 3 Oct 2024 12:13:52 +1000 Subject: [PATCH] WIP. Setup configuration. --- .../services/chainstorage/StorageService.java | 10 ++++++++++ .../teku/storage/archive/DataArchive.java | 13 +++++++++++- .../archive/fsarchive/FileSystemArchive.java | 5 +++-- .../archive/nooparchive/NoopDataArchive.java | 16 +++++++++++++++ .../storage/server/StorageConfiguration.java | 20 +++++++++++++++++++ .../server/pruner/BlobSidecarPruner.java | 14 ++++++------- .../server/pruner/BlobSidecarPrunerTest.java | 6 ++++++ 7 files changed, 74 insertions(+), 10 deletions(-) create mode 100644 storage/src/main/java/tech/pegasys/teku/storage/archive/nooparchive/NoopDataArchive.java diff --git a/services/chainstorage/src/main/java/tech/pegasys/teku/services/chainstorage/StorageService.java b/services/chainstorage/src/main/java/tech/pegasys/teku/services/chainstorage/StorageService.java index 6a1e4e9bb39..237d3e5f18a 100644 --- a/services/chainstorage/src/main/java/tech/pegasys/teku/services/chainstorage/StorageService.java +++ b/services/chainstorage/src/main/java/tech/pegasys/teku/services/chainstorage/StorageService.java @@ -16,6 +16,7 @@ import static tech.pegasys.teku.infrastructure.async.AsyncRunnerFactory.DEFAULT_MAX_QUEUE_SIZE; import static tech.pegasys.teku.spec.config.Constants.STORAGE_QUERY_CHANNEL_PARALLELISM; +import java.nio.file.Path; import java.util.Optional; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -33,6 +34,9 @@ import tech.pegasys.teku.storage.api.CombinedStorageChannel; import tech.pegasys.teku.storage.api.Eth1DepositStorageChannel; import tech.pegasys.teku.storage.api.VoteUpdateChannel; +import tech.pegasys.teku.storage.archive.DataArchive; +import tech.pegasys.teku.storage.archive.fsarchive.FileSystemArchive; +import tech.pegasys.teku.storage.archive.nooparchive.NoopDataArchive; import tech.pegasys.teku.storage.server.BatchingVoteUpdateChannel; import tech.pegasys.teku.storage.server.ChainStorage; import tech.pegasys.teku.storage.server.CombinedStorageChannelSplitter; @@ -143,12 +147,18 @@ protected SafeFuture doStart() { pruningActiveLabelledGauge)); } } + + final DataArchive dataArchive = config.getBlobsArchivePath() + .map( path->(DataArchive) new FileSystemArchive(Path.of(path))) + .orElse(new NoopDataArchive()); + if (config.getSpec().isMilestoneSupported(SpecMilestone.DENEB)) { blobsPruner = Optional.of( new BlobSidecarPruner( config.getSpec(), database, + dataArchive, serviceConfig.getMetricsSystem(), storagePrunerAsyncRunner, serviceConfig.getTimeProvider(), diff --git a/storage/src/main/java/tech/pegasys/teku/storage/archive/DataArchive.java b/storage/src/main/java/tech/pegasys/teku/storage/archive/DataArchive.java index e73870e89bd..651fc938562 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/archive/DataArchive.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/archive/DataArchive.java @@ -13,6 +13,10 @@ package tech.pegasys.teku.storage.archive; +import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; + +import java.io.IOException; + /** * Interface for a data archive which stores non-current BlobSidecars and could be extended later to * include other data types. It is expected that the DataArchive is on disk or externally stored @@ -21,5 +25,12 @@ */ public interface DataArchive { - // public boolean archiveBlobSidecar(final BlobSidecar blobSidecar); + /** + * Returns the archive writer capable of storing BlobSidecars. + * + * @return a closeable DataArchiveWriter for writing BlobSidecars + * @throws IOException + */ + DataArchiveWriter getBlobSidecarWriter() throws IOException; + } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/archive/fsarchive/FileSystemArchive.java b/storage/src/main/java/tech/pegasys/teku/storage/archive/fsarchive/FileSystemArchive.java index 7258ef78f99..2efbcdd635f 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/archive/fsarchive/FileSystemArchive.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/archive/fsarchive/FileSystemArchive.java @@ -33,8 +33,8 @@ * A file system based implementations of the DataArchive. Writes to a directory using the * PathResolver method to decide where to write the files. */ -public class FileSystemArchive implements DataArchive, DataArchiveWriterFactory { - private static final String INDEX_FILE = "index.txt"; +public class FileSystemArchive implements DataArchive { + private static final String INDEX_FILE = "index.dat"; private static final Logger LOG = LogManager.getLogger(); @@ -46,6 +46,7 @@ public FileSystemArchive(final Path baseDirectory) { this.jsonWriter = new BlobSidecarJsonWriter(); } + @Override public DataArchiveWriter getBlobSidecarWriter() throws IOException { try { diff --git a/storage/src/main/java/tech/pegasys/teku/storage/archive/nooparchive/NoopDataArchive.java b/storage/src/main/java/tech/pegasys/teku/storage/archive/nooparchive/NoopDataArchive.java new file mode 100644 index 00000000000..d4ec0c4dfe0 --- /dev/null +++ b/storage/src/main/java/tech/pegasys/teku/storage/archive/nooparchive/NoopDataArchive.java @@ -0,0 +1,16 @@ +package tech.pegasys.teku.storage.archive.nooparchive; + +import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; +import tech.pegasys.teku.storage.archive.DataArchive; +import tech.pegasys.teku.storage.archive.DataArchiveNoopWriter; +import tech.pegasys.teku.storage.archive.DataArchiveWriter; + +import java.io.IOException; + +public class NoopDataArchive implements DataArchive { + + @Override + public DataArchiveWriter getBlobSidecarWriter() throws IOException { + return DataArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE; + } +} diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/StorageConfiguration.java b/storage/src/main/java/tech/pegasys/teku/storage/server/StorageConfiguration.java index a413e6e66c7..a00e0344a65 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/StorageConfiguration.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/StorageConfiguration.java @@ -19,6 +19,7 @@ import static tech.pegasys.teku.storage.server.StateStorageMode.PRUNE; import static tech.pegasys.teku.storage.server.VersionedDatabaseFactory.STORAGE_MODE_PATH; +import java.io.File; import java.nio.file.Path; import java.time.Duration; import java.util.Optional; @@ -62,6 +63,7 @@ public class StorageConfiguration { private final Duration statePruningInterval; private final Duration blobsPruningInterval; private final int blobsPruningLimit; + private final String blobsArchivePath; private final long retainedSlots; private final int statePruningLimit; @@ -78,6 +80,7 @@ private StorageConfiguration( final int blockPruningLimit, final Duration blobsPruningInterval, final int blobsPruningLimit, + final String blobsArchivePath, final int stateRebuildTimeoutSeconds, final long retainedSlots, final Duration statePruningInterval, @@ -93,6 +96,7 @@ private StorageConfiguration( this.blockPruningLimit = blockPruningLimit; this.blobsPruningInterval = blobsPruningInterval; this.blobsPruningLimit = blobsPruningLimit; + this.blobsArchivePath = blobsArchivePath; this.stateRebuildTimeoutSeconds = stateRebuildTimeoutSeconds; this.retainedSlots = retainedSlots; this.statePruningInterval = statePruningInterval; @@ -148,6 +152,8 @@ public int getBlobsPruningLimit() { return blobsPruningLimit; } + public Optional getBlobsArchivePath() { return Optional.ofNullable(blobsArchivePath); } + public long getRetainedSlots() { return retainedSlots; } @@ -178,6 +184,7 @@ public static final class Builder { private int blockPruningLimit = DEFAULT_BLOCK_PRUNING_LIMIT; private Duration blobsPruningInterval = DEFAULT_BLOBS_PRUNING_INTERVAL; private int blobsPruningLimit = DEFAULT_BLOBS_PRUNING_LIMIT; + private String blobsArchivePath = null; private int stateRebuildTimeoutSeconds = DEFAULT_STATE_REBUILD_TIMEOUT_SECONDS; private Duration statePruningInterval = DEFAULT_STATE_PRUNING_INTERVAL; private long retainedSlots = DEFAULT_STORAGE_RETAINED_SLOTS; @@ -274,6 +281,18 @@ public Builder blobsPruningLimit(final int blobsPruningLimit) { return this; } + public Builder blobsArchivePath(final String blobsArchivePath) { + if (blobsArchivePath != null) { + File file = Path.of(blobsArchivePath).toFile(); + if (!file.exists()) { + throw new InvalidConfigurationException( + String.format("blobsArchivePath does not exist: '%s'", blobsArchivePath)); + } + } + this.blobsArchivePath = blobsArchivePath; + return this; + } + public Builder retainedSlots(final long retainedSlots) { if (retainedSlots < 0) { throw new InvalidConfigurationException( @@ -319,6 +338,7 @@ public StorageConfiguration build() { blockPruningLimit, blobsPruningInterval, blobsPruningLimit, + blobsArchivePath, stateRebuildTimeoutSeconds, retainedSlots, statePruningInterval, diff --git a/storage/src/main/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPruner.java b/storage/src/main/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPruner.java index 63c0fbad402..1b315f995e1 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPruner.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPruner.java @@ -13,6 +13,7 @@ package tech.pegasys.teku.storage.server.pruner; +import java.io.IOException; import java.time.Duration; import java.util.Optional; import java.util.concurrent.RejectedExecutionException; @@ -33,7 +34,7 @@ import tech.pegasys.teku.spec.config.SpecConfig; import tech.pegasys.teku.spec.config.SpecConfigDeneb; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; -import tech.pegasys.teku.storage.archive.DataArchiveNoopWriter; +import tech.pegasys.teku.storage.archive.DataArchive; import tech.pegasys.teku.storage.archive.DataArchiveWriter; import tech.pegasys.teku.storage.server.Database; import tech.pegasys.teku.storage.server.ShuttingDownException; @@ -58,11 +59,12 @@ public class BlobSidecarPruner extends Service { private final AtomicLong blobColumnSize = new AtomicLong(0); private final AtomicLong earliestBlobSidecarSlot = new AtomicLong(-1); private final boolean storeNonCanonicalBlobSidecars; - private final DataArchiveWriter archiveWriter; + private final DataArchive dataArchive; public BlobSidecarPruner( final Spec spec, final Database database, + final DataArchive dataArchive, final MetricsSystem metricsSystem, final AsyncRunner asyncRunner, final TimeProvider timeProvider, @@ -75,6 +77,7 @@ public BlobSidecarPruner( final boolean storeNonCanonicalBlobSidecars) { this.spec = spec; this.database = database; + this.dataArchive = dataArchive; this.asyncRunner = asyncRunner; this.pruneInterval = pruneInterval; this.pruneLimit = pruneLimit; @@ -85,9 +88,6 @@ public BlobSidecarPruner( this.pruningActiveLabelledGauge = pruningActiveLabelledGauge; this.storeNonCanonicalBlobSidecars = storeNonCanonicalBlobSidecars; - // TODO: To be updated with other implementations. e.g. filesystem or s3 - this.archiveWriter = DataArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE; - if (blobSidecarsStorageCountersEnabled) { LabelledGauge labelledGauge = metricsSystem.createLabelledGauge( @@ -152,7 +152,7 @@ private void pruneBlobsPriorToAvailabilityWindow() { return; } LOG.debug("Pruning blobs up to slot {}, limit {}", latestPrunableSlot, pruneLimit); - try { + try (DataArchiveWriter archiveWriter = dataArchive.getBlobSidecarWriter()) { final long blobsPruningStart = System.currentTimeMillis(); final boolean blobsPruningLimitReached = database.pruneOldestBlobSidecars(latestPrunableSlot, pruneLimit, archiveWriter); @@ -170,7 +170,7 @@ private void pruneBlobsPriorToAvailabilityWindow() { nonCanonicalBlobsPruningStart, nonCanonicalBlobsLimitReached); } - } catch (ShuttingDownException | RejectedExecutionException ex) { + } catch (ShuttingDownException | RejectedExecutionException | IOException ex) { LOG.debug("Shutting down", ex); } } diff --git a/storage/src/test/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPrunerTest.java b/storage/src/test/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPrunerTest.java index f9cd4d9cafa..885c154737a 100644 --- a/storage/src/test/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPrunerTest.java +++ b/storage/src/test/java/tech/pegasys/teku/storage/server/pruner/BlobSidecarPrunerTest.java @@ -37,7 +37,9 @@ import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.config.SpecConfig; import tech.pegasys.teku.spec.config.SpecConfigDeneb; +import tech.pegasys.teku.storage.archive.DataArchive; import tech.pegasys.teku.storage.archive.DataArchiveNoopWriter; +import tech.pegasys.teku.storage.archive.nooparchive.NoopDataArchive; import tech.pegasys.teku.storage.server.Database; public class BlobSidecarPrunerTest { @@ -55,11 +57,13 @@ public class BlobSidecarPrunerTest { private final StubAsyncRunner asyncRunner = new StubAsyncRunner(timeProvider); private final Database database = mock(Database.class); private final StubMetricsSystem stubMetricsSystem = new StubMetricsSystem(); + private final DataArchive dataArchive = new NoopDataArchive(); private final BlobSidecarPruner blobsPruner = new BlobSidecarPruner( spec, database, + dataArchive, stubMetricsSystem, asyncRunner, timeProvider, @@ -157,6 +161,7 @@ void shouldUseEpochsStoreBlobs() { new BlobSidecarPruner( specOverride, databaseOverride, + dataArchive, stubMetricsSystem, asyncRunner, timeProvider, @@ -217,6 +222,7 @@ void shouldNotPruneWhenEpochsStoreBlobsIsMax() { new BlobSidecarPruner( specOverride, databaseOverride, + dataArchive, stubMetricsSystem, asyncRunner, timeProvider,