Skip to content

Commit

Permalink
Merge branch 'master' into fix/gossip-start-subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
tbenr authored Oct 16, 2024
2 parents 95708f9 + 398be12 commit a5b531b
Show file tree
Hide file tree
Showing 24 changed files with 909 additions and 113 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
### Breaking Changes

### Additions and Improvements
- Clean up old beacon states when switching from ARCHIVE to PRUNE or MINIMAL data storage mode

### Bug Fixes
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,9 @@ public String toLogString() {
getKZGCommitment().toAbbreviatedString(),
getKZGProof().toAbbreviatedString());
}

@Override
public BlobSidecarSchema getSchema() {
return (BlobSidecarSchema) super.getSchema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ private BlockBlobSidecarsTracker internalOnNewBlock(
.finish(
error ->
LOG.error(
"An occurred while attempting to fetch blobs via local EL"));
"An error occurred while attempting to fetch blobs via local EL"));
}
}
});
Expand Down Expand Up @@ -576,7 +576,7 @@ private void onFirstSeen(final SlotAndBlockRoot slotAndBlockRoot) {
error ->
LOG.warn(
"Local EL blobs lookup failed: {}",
ExceptionUtils.getMessage(error)))
ExceptionUtils.getRootCauseMessage(error)))
.thenRun(() -> this.fetchMissingContentFromRemotePeers(slotAndBlockRoot)),
fetchDelay)
.finish(
Expand Down
3 changes: 3 additions & 0 deletions services/chainstorage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ dependencies {
implementation project(':infrastructure:events')

implementation 'org.hyperledger.besu:plugin-api'

testImplementation testFixtures(project(':infrastructure:async'))
testImplementation testFixtures(project(':ethereum:execution-types'))
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import static tech.pegasys.teku.infrastructure.async.AsyncRunnerFactory.DEFAULT_MAX_QUEUE_SIZE;
import static tech.pegasys.teku.spec.config.Constants.STORAGE_QUERY_CHANNEL_PARALLELISM;

import com.google.common.annotations.VisibleForTesting;
import java.nio.file.Path;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -33,6 +35,9 @@
import tech.pegasys.teku.storage.api.CombinedStorageChannel;
import tech.pegasys.teku.storage.api.Eth1DepositStorageChannel;
import tech.pegasys.teku.storage.api.VoteUpdateChannel;
import tech.pegasys.teku.storage.archive.DataArchive;
import tech.pegasys.teku.storage.archive.fsarchive.FileSystemArchive;
import tech.pegasys.teku.storage.archive.nooparchive.NoopDataArchive;
import tech.pegasys.teku.storage.server.BatchingVoteUpdateChannel;
import tech.pegasys.teku.storage.server.ChainStorage;
import tech.pegasys.teku.storage.server.CombinedStorageChannelSplitter;
Expand Down Expand Up @@ -126,35 +131,32 @@ protected SafeFuture<?> doStart() {
}
if (config.getDataStorageMode().storesFinalizedStates()
&& config.getRetainedSlots() > 0) {
if (config.getDataStorageCreateDbVersion() == DatabaseVersion.LEVELDB_TREE) {
throw new InvalidConfigurationException(
"State pruning is not supported with leveldb_tree database.");
} else {
LOG.info(
"State pruner will run every: {} minute(s), retaining states for the last {} finalized slots. Limited to {} state prune per execution. ",
config.getStatePruningInterval().toMinutes(),
config.getRetainedSlots(),
config.getStatePruningLimit());
statePruner =
Optional.of(
new StatePruner(
config.getSpec(),
database,
storagePrunerAsyncRunner,
config.getStatePruningInterval(),
config.getRetainedSlots(),
config.getStatePruningLimit(),
"state",
pruningTimingsLabelledGauge,
pruningActiveLabelledGauge));
}
configureStatePruner(
config.getRetainedSlots(),
storagePrunerAsyncRunner,
pruningTimingsLabelledGauge,
pruningActiveLabelledGauge);
} else if (!config.getDataStorageMode().storesFinalizedStates()) {
configureStatePruner(
StorageConfiguration.DEFAULT_STORAGE_RETAINED_SLOTS,
storagePrunerAsyncRunner,
pruningTimingsLabelledGauge,
pruningActiveLabelledGauge);
}

final DataArchive dataArchive =
config
.getBlobsArchivePath()
.<DataArchive>map(path -> new FileSystemArchive(Path.of(path)))
.orElse(new NoopDataArchive());

if (config.getSpec().isMilestoneSupported(SpecMilestone.DENEB)) {
blobsPruner =
Optional.of(
new BlobSidecarPruner(
config.getSpec(),
database,
dataArchive,
serviceConfig.getMetricsSystem(),
storagePrunerAsyncRunner,
serviceConfig.getTimeProvider(),
Expand Down Expand Up @@ -216,6 +218,41 @@ protected SafeFuture<?> doStart() {
.orElseGet(() -> SafeFuture.completedFuture(null)));
}

void configureStatePruner(
final long slotsToRetain,
final AsyncRunner storagePrunerAsyncRunner,
final SettableLabelledGauge pruningTimingsLabelledGauge,
final SettableLabelledGauge pruningActiveLabelledGauge) {
if (config.getDataStorageCreateDbVersion() == DatabaseVersion.LEVELDB_TREE) {
throw new InvalidConfigurationException(
"State pruning is not supported with leveldb_tree database.");
}

LOG.info(
"State pruner will run every: {} minute(s), retaining states for the last {} finalized slots. Limited to {} state prune per execution.",
config.getStatePruningInterval().toMinutes(),
slotsToRetain,
config.getStatePruningLimit());

statePruner =
Optional.of(
new StatePruner(
config.getSpec(),
database,
storagePrunerAsyncRunner,
config.getStatePruningInterval(),
slotsToRetain,
config.getStatePruningLimit(),
"state",
pruningTimingsLabelledGauge,
pruningActiveLabelledGauge));
}

@VisibleForTesting
public Optional<StatePruner> getStatePruner() {
return statePruner;
}

@Override
protected SafeFuture<?> doStop() {
return blockPruner
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.services.chainstorage;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.nio.file.Path;
import java.util.Optional;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import tech.pegasys.teku.ethereum.execution.types.Eth1Address;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.StubAsyncRunner;
import tech.pegasys.teku.infrastructure.async.StubAsyncRunnerFactory;
import tech.pegasys.teku.infrastructure.events.EventChannels;
import tech.pegasys.teku.service.serviceutils.ServiceConfig;
import tech.pegasys.teku.service.serviceutils.layout.DataDirLayout;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.storage.server.DatabaseVersion;
import tech.pegasys.teku.storage.server.StateStorageMode;
import tech.pegasys.teku.storage.server.StorageConfiguration;
import tech.pegasys.teku.storage.server.pruner.StatePruner;

class StorageServiceTest {

private final ServiceConfig serviceConfig = mock(ServiceConfig.class);
private final StorageConfiguration storageConfiguration = mock(StorageConfiguration.class);
private final MetricsSystem metricsSystem = mock(MetricsSystem.class);
private final DataDirLayout dataDirLayout = mock(DataDirLayout.class);
private final Eth1Address eth1DepositContract = mock(Eth1Address.class);
private final Spec spec = mock(Spec.class);
private final EventChannels eventChannels = mock(EventChannels.class);
private StorageService storageService;

@BeforeEach
void setUp(@TempDir final Path tempDir) {
when(serviceConfig.getMetricsSystem()).thenReturn(metricsSystem);
when(dataDirLayout.getBeaconDataDirectory()).thenReturn(tempDir);
when(serviceConfig.getDataDirLayout()).thenReturn(dataDirLayout);
when(storageConfiguration.getDataStorageCreateDbVersion()).thenReturn(DatabaseVersion.NOOP);
when(storageConfiguration.getMaxKnownNodeCacheSize())
.thenReturn(StorageConfiguration.DEFAULT_MAX_KNOWN_NODE_CACHE_SIZE);
when(storageConfiguration.getDataStorageFrequency())
.thenReturn(StorageConfiguration.DEFAULT_STORAGE_FREQUENCY);
when(storageConfiguration.getEth1DepositContract()).thenReturn(eth1DepositContract);
when(storageConfiguration.isStoreNonCanonicalBlocksEnabled()).thenReturn(false);
when(storageConfiguration.getSpec()).thenReturn(spec);

when(eventChannels.subscribe(any(), any())).thenReturn(eventChannels);
when(serviceConfig.getEventChannels()).thenReturn(eventChannels);

final StubAsyncRunnerFactory asyncRunnerFactory = new StubAsyncRunnerFactory();
when(serviceConfig.getAsyncRunnerFactory()).thenReturn(asyncRunnerFactory);

final StubAsyncRunner stubAsyncRunner = new StubAsyncRunner();
when(serviceConfig.createAsyncRunner(any(), anyInt(), anyInt(), anyInt()))
.thenReturn(stubAsyncRunner);

storageService = new StorageService(serviceConfig, storageConfiguration, false, false);
}

@Test
void shouldNotSetupStatePrunerWhenArchiveMode() {
when(storageConfiguration.getDataStorageMode()).thenReturn(StateStorageMode.ARCHIVE);
final SafeFuture<?> future = storageService.doStart();
final Optional<StatePruner> statePruner = storageService.getStatePruner();
assertThat(future).isCompleted();
assertThat(statePruner).isEmpty();
}

@Test
void shouldSetupStatePrunerWhenArchiveModeAndRetentionSlotsEnabled() {
when(storageConfiguration.getDataStorageMode()).thenReturn(StateStorageMode.ARCHIVE);
when(storageConfiguration.getRetainedSlots()).thenReturn(5L);
final SafeFuture<?> future = storageService.doStart();
final Optional<StatePruner> statePruner = storageService.getStatePruner();
assertThat(future).isCompleted();
assertThat(statePruner).isPresent();
assertThat(storageService.getStatePruner().get().isRunning()).isTrue();
}

@Test
void shouldSetupStatePrunerWhenPruneMode() {
when(storageConfiguration.getDataStorageMode()).thenReturn(StateStorageMode.PRUNE);
final SafeFuture<?> future = storageService.doStart();
final Optional<StatePruner> statePruner = storageService.getStatePruner();
assertThat(future).isCompleted();
assertThat(statePruner).isPresent();
assertThat(storageService.getStatePruner().get().isRunning()).isTrue();
}

@Test
void shouldSetupStatePrunerWhenMinimalMode() {
when(storageConfiguration.getDataStorageMode()).thenReturn(StateStorageMode.MINIMAL);
final SafeFuture<?> future = storageService.doStart();
final Optional<StatePruner> statePruner = storageService.getStatePruner();
assertThat(future).isCompleted();
assertThat(statePruner).isPresent();
assertThat(storageService.getStatePruner().get().isRunning()).isTrue();
}
}
Loading

0 comments on commit a5b531b

Please sign in to comment.