Skip to content

Commit

Permalink
WIP. Setup configuration.
Browse files Browse the repository at this point in the history
  • Loading branch information
david-ry4n committed Oct 3, 2024
1 parent 0d21bfa commit 7f9c7d1
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static tech.pegasys.teku.infrastructure.async.AsyncRunnerFactory.DEFAULT_MAX_QUEUE_SIZE;
import static tech.pegasys.teku.spec.config.Constants.STORAGE_QUERY_CHANNEL_PARALLELISM;

import java.nio.file.Path;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -33,6 +34,9 @@
import tech.pegasys.teku.storage.api.CombinedStorageChannel;
import tech.pegasys.teku.storage.api.Eth1DepositStorageChannel;
import tech.pegasys.teku.storage.api.VoteUpdateChannel;
import tech.pegasys.teku.storage.archive.DataArchive;
import tech.pegasys.teku.storage.archive.fsarchive.FileSystemArchive;
import tech.pegasys.teku.storage.archive.nooparchive.NoopDataArchive;
import tech.pegasys.teku.storage.server.BatchingVoteUpdateChannel;
import tech.pegasys.teku.storage.server.ChainStorage;
import tech.pegasys.teku.storage.server.CombinedStorageChannelSplitter;
Expand Down Expand Up @@ -143,12 +147,18 @@ protected SafeFuture<?> doStart() {
pruningActiveLabelledGauge));
}
}

final DataArchive dataArchive = config.getBlobsArchivePath()
.map( path->(DataArchive) new FileSystemArchive(Path.of(path)))
.orElse(new NoopDataArchive());

if (config.getSpec().isMilestoneSupported(SpecMilestone.DENEB)) {
blobsPruner =
Optional.of(
new BlobSidecarPruner(
config.getSpec(),
database,
dataArchive,
serviceConfig.getMetricsSystem(),
storagePrunerAsyncRunner,
serviceConfig.getTimeProvider(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

package tech.pegasys.teku.storage.archive;

import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;

import java.io.IOException;

/**
* Interface for a data archive which stores non-current BlobSidecars and could be extended later to
* include other data types. It is expected that the DataArchive is on disk or externally stored
Expand All @@ -21,5 +25,12 @@
*/
public interface DataArchive {

// public boolean archiveBlobSidecar(final BlobSidecar blobSidecar);
/**
* Returns the archive writer capable of storing BlobSidecars.
*
* @return a closeable DataArchiveWriter for writing BlobSidecars
* @throws IOException
*/
DataArchiveWriter<BlobSidecar> getBlobSidecarWriter() throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
* A file system based implementations of the DataArchive. Writes to a directory using the
* PathResolver method to decide where to write the files.
*/
public class FileSystemArchive implements DataArchive, DataArchiveWriterFactory {
private static final String INDEX_FILE = "index.txt";
public class FileSystemArchive implements DataArchive {
private static final String INDEX_FILE = "index.dat";

private static final Logger LOG = LogManager.getLogger();

Expand All @@ -46,6 +46,7 @@ public FileSystemArchive(final Path baseDirectory) {
this.jsonWriter = new BlobSidecarJsonWriter();
}

@Override
public DataArchiveWriter<BlobSidecar> getBlobSidecarWriter() throws IOException {

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package tech.pegasys.teku.storage.archive.nooparchive;

import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.storage.archive.DataArchive;
import tech.pegasys.teku.storage.archive.DataArchiveNoopWriter;
import tech.pegasys.teku.storage.archive.DataArchiveWriter;

import java.io.IOException;

public class NoopDataArchive implements DataArchive {

@Override
public DataArchiveWriter<BlobSidecar> getBlobSidecarWriter() throws IOException {
return DataArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static tech.pegasys.teku.storage.server.StateStorageMode.PRUNE;
import static tech.pegasys.teku.storage.server.VersionedDatabaseFactory.STORAGE_MODE_PATH;

import java.io.File;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Optional;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class StorageConfiguration {
private final Duration statePruningInterval;
private final Duration blobsPruningInterval;
private final int blobsPruningLimit;
private final String blobsArchivePath;
private final long retainedSlots;
private final int statePruningLimit;

Expand All @@ -78,6 +80,7 @@ private StorageConfiguration(
final int blockPruningLimit,
final Duration blobsPruningInterval,
final int blobsPruningLimit,
final String blobsArchivePath,
final int stateRebuildTimeoutSeconds,
final long retainedSlots,
final Duration statePruningInterval,
Expand All @@ -93,6 +96,7 @@ private StorageConfiguration(
this.blockPruningLimit = blockPruningLimit;
this.blobsPruningInterval = blobsPruningInterval;
this.blobsPruningLimit = blobsPruningLimit;
this.blobsArchivePath = blobsArchivePath;
this.stateRebuildTimeoutSeconds = stateRebuildTimeoutSeconds;
this.retainedSlots = retainedSlots;
this.statePruningInterval = statePruningInterval;
Expand Down Expand Up @@ -148,6 +152,8 @@ public int getBlobsPruningLimit() {
return blobsPruningLimit;
}

public Optional<String> getBlobsArchivePath() { return Optional.ofNullable(blobsArchivePath); }

public long getRetainedSlots() {
return retainedSlots;
}
Expand Down Expand Up @@ -178,6 +184,7 @@ public static final class Builder {
private int blockPruningLimit = DEFAULT_BLOCK_PRUNING_LIMIT;
private Duration blobsPruningInterval = DEFAULT_BLOBS_PRUNING_INTERVAL;
private int blobsPruningLimit = DEFAULT_BLOBS_PRUNING_LIMIT;
private String blobsArchivePath = null;
private int stateRebuildTimeoutSeconds = DEFAULT_STATE_REBUILD_TIMEOUT_SECONDS;
private Duration statePruningInterval = DEFAULT_STATE_PRUNING_INTERVAL;
private long retainedSlots = DEFAULT_STORAGE_RETAINED_SLOTS;
Expand Down Expand Up @@ -274,6 +281,18 @@ public Builder blobsPruningLimit(final int blobsPruningLimit) {
return this;
}

public Builder blobsArchivePath(final String blobsArchivePath) {
if (blobsArchivePath != null) {
File file = Path.of(blobsArchivePath).toFile();
if (!file.exists()) {
throw new InvalidConfigurationException(
String.format("blobsArchivePath does not exist: '%s'", blobsArchivePath));
}
}
this.blobsArchivePath = blobsArchivePath;
return this;
}

public Builder retainedSlots(final long retainedSlots) {
if (retainedSlots < 0) {
throw new InvalidConfigurationException(
Expand Down Expand Up @@ -319,6 +338,7 @@ public StorageConfiguration build() {
blockPruningLimit,
blobsPruningInterval,
blobsPruningLimit,
blobsArchivePath,
stateRebuildTimeoutSeconds,
retainedSlots,
statePruningInterval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.storage.server.pruner;

import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
Expand All @@ -33,7 +34,7 @@
import tech.pegasys.teku.spec.config.SpecConfig;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.storage.archive.DataArchiveNoopWriter;
import tech.pegasys.teku.storage.archive.DataArchive;
import tech.pegasys.teku.storage.archive.DataArchiveWriter;
import tech.pegasys.teku.storage.server.Database;
import tech.pegasys.teku.storage.server.ShuttingDownException;
Expand All @@ -58,11 +59,12 @@ public class BlobSidecarPruner extends Service {
private final AtomicLong blobColumnSize = new AtomicLong(0);
private final AtomicLong earliestBlobSidecarSlot = new AtomicLong(-1);
private final boolean storeNonCanonicalBlobSidecars;
private final DataArchiveWriter<BlobSidecar> archiveWriter;
private final DataArchive dataArchive;

public BlobSidecarPruner(
final Spec spec,
final Database database,
final DataArchive dataArchive,
final MetricsSystem metricsSystem,
final AsyncRunner asyncRunner,
final TimeProvider timeProvider,
Expand All @@ -75,6 +77,7 @@ public BlobSidecarPruner(
final boolean storeNonCanonicalBlobSidecars) {
this.spec = spec;
this.database = database;
this.dataArchive = dataArchive;
this.asyncRunner = asyncRunner;
this.pruneInterval = pruneInterval;
this.pruneLimit = pruneLimit;
Expand All @@ -85,9 +88,6 @@ public BlobSidecarPruner(
this.pruningActiveLabelledGauge = pruningActiveLabelledGauge;
this.storeNonCanonicalBlobSidecars = storeNonCanonicalBlobSidecars;

// TODO: To be updated with other implementations. e.g. filesystem or s3
this.archiveWriter = DataArchiveNoopWriter.NOOP_BLOBSIDECAR_STORE;

if (blobSidecarsStorageCountersEnabled) {
LabelledGauge labelledGauge =
metricsSystem.createLabelledGauge(
Expand Down Expand Up @@ -152,7 +152,7 @@ private void pruneBlobsPriorToAvailabilityWindow() {
return;
}
LOG.debug("Pruning blobs up to slot {}, limit {}", latestPrunableSlot, pruneLimit);
try {
try (DataArchiveWriter<BlobSidecar> archiveWriter = dataArchive.getBlobSidecarWriter()) {
final long blobsPruningStart = System.currentTimeMillis();
final boolean blobsPruningLimitReached =
database.pruneOldestBlobSidecars(latestPrunableSlot, pruneLimit, archiveWriter);
Expand All @@ -170,7 +170,7 @@ private void pruneBlobsPriorToAvailabilityWindow() {
nonCanonicalBlobsPruningStart,
nonCanonicalBlobsLimitReached);
}
} catch (ShuttingDownException | RejectedExecutionException ex) {
} catch (ShuttingDownException | RejectedExecutionException | IOException ex) {
LOG.debug("Shutting down", ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.config.SpecConfig;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.storage.archive.DataArchive;
import tech.pegasys.teku.storage.archive.DataArchiveNoopWriter;
import tech.pegasys.teku.storage.archive.nooparchive.NoopDataArchive;
import tech.pegasys.teku.storage.server.Database;

public class BlobSidecarPrunerTest {
Expand All @@ -55,11 +57,13 @@ public class BlobSidecarPrunerTest {
private final StubAsyncRunner asyncRunner = new StubAsyncRunner(timeProvider);
private final Database database = mock(Database.class);
private final StubMetricsSystem stubMetricsSystem = new StubMetricsSystem();
private final DataArchive dataArchive = new NoopDataArchive();

private final BlobSidecarPruner blobsPruner =
new BlobSidecarPruner(
spec,
database,
dataArchive,
stubMetricsSystem,
asyncRunner,
timeProvider,
Expand Down Expand Up @@ -157,6 +161,7 @@ void shouldUseEpochsStoreBlobs() {
new BlobSidecarPruner(
specOverride,
databaseOverride,
dataArchive,
stubMetricsSystem,
asyncRunner,
timeProvider,
Expand Down Expand Up @@ -217,6 +222,7 @@ void shouldNotPruneWhenEpochsStoreBlobsIsMax() {
new BlobSidecarPruner(
specOverride,
databaseOverride,
dataArchive,
stubMetricsSystem,
asyncRunner,
timeProvider,
Expand Down

0 comments on commit 7f9c7d1

Please sign in to comment.