Skip to content

Commit

Permalink
Add blob sidecars cache in Store + usage
Browse files Browse the repository at this point in the history
  • Loading branch information
zilm13 committed Oct 13, 2023
1 parent 2842a85 commit 28213de
Show file tree
Hide file tree
Showing 24 changed files with 108 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void shouldGetBlobSidecars() throws Exception {
final SignedBlockAndState lastBlock = chainUpdater.advanceChainUntil(targetSlot);
chainUpdater.updateBestBlock(lastBlock);
final List<BlobSidecar> expected =
recentChainData.retrieveBlobSidecars(lastBlock.getSlotAndBlockRoot()).get();
recentChainData.getBlobSidecars(lastBlock.getSlotAndBlockRoot()).get();

final Response responseAll = get("head");
assertThat(responseAll.code()).isEqualTo(SC_OK);
Expand Down Expand Up @@ -116,7 +116,7 @@ public void shouldGetBlobSidecarsAsSsz() throws Exception {
final SignedBlockAndState lastBlock = chainUpdater.advanceChainUntil(targetSlot);
chainUpdater.updateBestBlock(lastBlock);
final List<BlobSidecar> expected =
recentChainData.retrieveBlobSidecars(lastBlock.getSlotAndBlockRoot()).get();
recentChainData.getBlobSidecars(lastBlock.getSlotAndBlockRoot()).get();

final Response response = get("head", OCTET_STREAM);
assertThat(response.code()).isEqualTo(SC_OK);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

package tech.pegasys.teku.spec.datastructures.forkchoice;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes32;
Expand Down Expand Up @@ -43,7 +42,7 @@ void putBlockAndState(
SignedBeaconBlock block,
BeaconState state,
BlockCheckpoints checkpoints,
List<BlobSidecar> blobSidecars,
Optional<List<BlobSidecar>> blobSidecars,
Optional<UInt64> earliestBlobSidecarSlot);

default void putBlockAndState(
Expand All @@ -52,7 +51,7 @@ default void putBlockAndState(
blockAndState.getBlock(),
blockAndState.getState(),
checkpoints,
Collections.emptyList(),
Optional.empty(),
Optional.empty());
}

Expand All @@ -64,7 +63,7 @@ default void putBlockAndState(
blockAndState.getBlock(),
blockAndState.getState(),
checkpoints,
blobSidecars,
Optional.of(blobSidecars),
Optional.empty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ default UInt64 getGenesisTimeMillis() {
*/
Optional<SignedBeaconBlock> getBlockIfAvailable(final Bytes32 blockRoot);

Optional<List<BlobSidecar>> getBlobSidecarsIfAvailable(SlotAndBlockRoot slotAndBlockRoot);

default SafeFuture<Optional<BeaconBlock>> retrieveBlock(Bytes32 blockRoot) {
return retrieveSignedBlock(blockRoot).thenApply(res -> res.map(SignedBeaconBlock::getMessage));
}
Expand All @@ -124,8 +126,6 @@ default SafeFuture<Optional<BeaconBlock>> retrieveBlock(Bytes32 blockRoot) {

SafeFuture<Optional<BeaconState>> retrieveStateAtSlot(SlotAndBlockRoot checkpoint);

SafeFuture<List<BlobSidecar>> retrieveBlobSidecars(SlotAndBlockRoot slotAndBlockRoot);

SafeFuture<Optional<UInt64>> retrieveEarliestBlobSidecarSlot();

SafeFuture<CheckpointState> retrieveFinalizedCheckpointAndState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ public void applyBlockToStore(
final SignedBeaconBlock signedBlock,
final BeaconState postState,
final boolean isBlockOptimistic,
final List<BlobSidecar> blobSidecars,
final Optional<List<BlobSidecar>> blobSidecars,
final Optional<UInt64> earliestBlobSidecarsSlot) {

BlockCheckpoints blockCheckpoints = epochProcessor.calculateBlockCheckpoints(postState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import static tech.pegasys.teku.infrastructure.time.TimeUtilities.secondsToMillis;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -248,10 +247,9 @@ public SafeFuture<Optional<BeaconState>> retrieveCheckpointState(
}

@Override
public SafeFuture<List<BlobSidecar>> retrieveBlobSidecars(
public Optional<List<BlobSidecar>> getBlobSidecarsIfAvailable(
final SlotAndBlockRoot slotAndBlockRoot) {
return SafeFuture.completedFuture(
Optional.ofNullable(blobSidecars.get(slotAndBlockRoot)).orElse(Collections.emptyList()));
return Optional.ofNullable(blobSidecars.get(slotAndBlockRoot));
}

@Override
Expand All @@ -265,14 +263,13 @@ public void putBlockAndState(
final SignedBeaconBlock block,
final BeaconState state,
final BlockCheckpoints checkpoints,
final List<BlobSidecar> blobSidecars,
final Optional<List<BlobSidecar>> blobSidecars,
final Optional<UInt64> maybeEarliestBlobSidecarSlot) {
blocks.put(block.getRoot(), block);
blockStates.put(block.getRoot(), state);
blockCheckpoints.put(block.getRoot(), checkpoints);
if (!blobSidecars.isEmpty()) {
this.blobSidecars.put(block.getSlotAndBlockRoot(), blobSidecars);
}
blobSidecars.ifPresent(
sidecars -> this.blobSidecars.put(block.getSlotAndBlockRoot(), sidecars));
if (earliestBlobSidecarSlot.isEmpty()) {
earliestBlobSidecarSlot = maybeEarliestBlobSidecarSlot;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.base.Throwables;
import java.net.ConnectException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -540,12 +539,12 @@ private BlockImportResult importBlockAndState(
final StoreTransaction transaction = recentChainData.startStoreTransaction();
addParentStateRoots(spec, blockSlotState, transaction);

final List<BlobSidecar> blobSidecars;
final Optional<List<BlobSidecar>> blobSidecars;
if (blobSidecarsAndValidationResult.isNotRequired()) {
// Outside availability window or pre-Deneb
blobSidecars = Collections.emptyList();
blobSidecars = Optional.empty();
} else if (blobSidecarsAndValidationResult.isValid()) {
blobSidecars = blobSidecarsAndValidationResult.getBlobSidecars();
blobSidecars = Optional.of(blobSidecarsAndValidationResult.getBlobSidecars());
} else {
throw new IllegalStateException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -941,8 +941,7 @@ void onDeneb_shouldStoreBlockWhenBlobSidecarsNotRequired() {
verify(blobSidecarsAvailabilityChecker1).getAvailabilityCheckResult();
assertThat(localRecentChainData.retrieveBlockByRoot(block1.getRoot()))
.isCompletedWithValue(Optional.of(block1.getMessage()));
assertThat(localRecentChainData.retrieveBlobSidecars(block1.getSlotAndBlockRoot()))
.isCompletedWithValue(Collections.emptyList());
assertThat(localRecentChainData.getBlobSidecars(block1.getSlotAndBlockRoot())).isEmpty();
assertThat(localRecentChainData.retrieveEarliestBlobSidecarSlot())
.isCompletedWithValueMatching(Optional::isEmpty);
}
Expand Down Expand Up @@ -981,8 +980,7 @@ void onDeneb_shouldNotStoreBlockWhenBlobSidecarsIsInvalid() {
verify(blobSidecarsAvailabilityChecker1).getAvailabilityCheckResult();
assertThat(localRecentChainData.retrieveBlockByRoot(block1.getRoot()))
.isCompletedWithValue(Optional.empty());
assertThat(localRecentChainData.retrieveBlobSidecars(block1.getSlotAndBlockRoot()))
.isCompletedWithValue(Collections.emptyList());
assertThat(localRecentChainData.getBlobSidecars(block1.getSlotAndBlockRoot())).isEmpty();
assertThat(localRecentChainData.retrieveEarliestBlobSidecarSlot())
.isCompletedWithValueMatching(Optional::isEmpty);

Expand Down Expand Up @@ -1022,8 +1020,7 @@ void onDeneb_shouldNotStoreBlockWhenBlobSidecarsIsNotAvailable() {
verify(blobSidecarsAvailabilityChecker1).getAvailabilityCheckResult();
assertThat(localRecentChainData.retrieveBlockByRoot(block1.getRoot()))
.isCompletedWithValue(Optional.empty());
assertThat(localRecentChainData.retrieveBlobSidecars(block1.getSlotAndBlockRoot()))
.isCompletedWithValue(Collections.emptyList());
assertThat(localRecentChainData.getBlobSidecars(block1.getSlotAndBlockRoot())).isEmpty();
assertThat(localRecentChainData.retrieveEarliestBlobSidecarSlot())
.isCompletedWithValueMatching(Optional::isEmpty);
}
Expand Down Expand Up @@ -1053,8 +1050,7 @@ void preDeneb_shouldNotWorryAboutBlobSidecars() {
verify(blobSidecarsAvailabilityChecker1).getAvailabilityCheckResult();
assertThat(localRecentChainData.retrieveBlockByRoot(block1.getRoot()))
.isCompletedWithValue(Optional.of(block1.getMessage()));
assertThat(localRecentChainData.retrieveBlobSidecars(block1.getSlotAndBlockRoot()))
.isCompletedWithValue(Collections.emptyList());
assertThat(localRecentChainData.getBlobSidecars(block1.getSlotAndBlockRoot())).isEmpty();
assertThat(localRecentChainData.retrieveEarliestBlobSidecarSlot())
.isCompletedWithValueMatching(Optional::isEmpty);
}
Expand Down Expand Up @@ -1115,15 +1111,14 @@ private void assertThatStored(
final BeaconBlock beaconBlock, final List<BlobSidecar> blobSidecars) {
assertThat(localRecentChainData.retrieveBlockByRoot(beaconBlock.getRoot()))
.isCompletedWithValue(Optional.of(beaconBlock));
assertThat(localRecentChainData.retrieveBlobSidecars(beaconBlock.getSlotAndBlockRoot()))
.isCompletedWithValue(blobSidecars);
assertThat(localRecentChainData.getBlobSidecars(beaconBlock.getSlotAndBlockRoot()))
.contains(blobSidecars);
}

private void assertThatNothingStoredForSlotRoot(final SlotAndBlockRoot slotAndBlockRoot) {
assertThat(localRecentChainData.retrieveBlockByRoot(slotAndBlockRoot.getBlockRoot()))
.isCompletedWithValueMatching(Optional::isEmpty);
assertThat(localRecentChainData.retrieveBlobSidecars(slotAndBlockRoot))
.isCompletedWithValueMatching(List::isEmpty);
assertThat(localRecentChainData.getBlobSidecars(slotAndBlockRoot)).isEmpty();
}

private void assertImportBlockWithResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.params.provider.Arguments;
Expand Down Expand Up @@ -261,12 +260,13 @@ protected List<BlobSidecar> retrieveCanonicalBlobSidecarsFromPeerStorage(
.flatMap(Optional::stream)
.map(this::safeRetrieveBlobSidecars)
.flatMap(Collection::stream)
.collect(Collectors.toUnmodifiableList());
.toList();
}

private List<BlobSidecar> safeRetrieveBlobSidecars(final SlotAndBlockRoot slotAndBlockRoot) {
try {
return Waiter.waitFor(peerStorage.recentChainData().retrieveBlobSidecars(slotAndBlockRoot));
return Waiter.waitFor(
peerStorage.chainStorage().getBlobSidecarsBySlotAndBlockRoot(slotAndBlockRoot));
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ public SafeFuture<List<BlobSidecar>> getBlobSidecars(

public SafeFuture<List<BlobSidecar>> getBlobSidecars(
final SlotAndBlockRoot slotAndBlockRoot, final List<UInt64> indices) {
final Optional<List<BlobSidecar>> maybeBlobSidecars =
recentChainData.getBlobSidecars(slotAndBlockRoot);
if (maybeBlobSidecars.isPresent()) {
return SafeFuture.completedFuture(filterBlobSidecars(maybeBlobSidecars.get(), indices));
}
return historicalChainData
.getBlobSidecarKeys(slotAndBlockRoot)
.thenApply(keys -> filterBlobSidecarKeys(keys, indices))
Expand All @@ -192,6 +197,14 @@ public SafeFuture<List<BlobSidecar>> getAllBlobSidecars(
.thenCompose(this::getAllBlobSidecars);
}

private List<BlobSidecar> filterBlobSidecars(
final List<BlobSidecar> blobSidecars, final List<UInt64> indices) {
if (indices.isEmpty()) {
return blobSidecars;
}
return blobSidecars.stream().filter(key -> indices.contains(key.getIndex())).toList();
}

private Stream<SlotAndBlockRootAndBlobIndex> filterBlobSidecarKeys(
final List<SlotAndBlockRootAndBlobIndex> keys, final List<UInt64> indices) {
if (indices.isEmpty()) {
Expand Down Expand Up @@ -569,6 +582,14 @@ public SafeFuture<Optional<BlobSidecar>> getBlobSidecarByBlockRootAndIndex(

public SafeFuture<Optional<BlobSidecar>> getBlobSidecarByKey(
final SlotAndBlockRootAndBlobIndex key) {
final Optional<List<BlobSidecar>> maybeBlobSidecars =
recentChainData.getBlobSidecars(key.getSlotAndBlockRoot());
if (maybeBlobSidecars.isPresent()) {
return key.getBlobIndex().isLessThan(maybeBlobSidecars.get().size())
? SafeFuture.completedFuture(
Optional.of(maybeBlobSidecars.get().get(key.getBlobIndex().intValue())))
: SafeFuture.completedFuture(Optional.empty());
}
return historicalChainData.getBlobSidecar(key);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.tuweni.bytes.Bytes32;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import tech.pegasys.teku.dataproviders.lookup.BlobSidecarsProvider;
import tech.pegasys.teku.dataproviders.lookup.BlockProvider;
import tech.pegasys.teku.dataproviders.lookup.EarliestBlobSidecarSlotProvider;
import tech.pegasys.teku.dataproviders.lookup.StateAndBlockSummaryProvider;
Expand Down Expand Up @@ -77,7 +76,6 @@ public abstract class RecentChainData implements StoreUpdateHandler {

private final BlockProvider blockProvider;
private final StateAndBlockSummaryProvider stateProvider;
private final BlobSidecarsProvider blobSidecarsProvider;
private final EarliestBlobSidecarSlotProvider earliestBlobSidecarSlotProvider;
protected final FinalizedCheckpointChannel finalizedCheckpointChannel;
protected final StorageUpdateChannel storageUpdateChannel;
Expand Down Expand Up @@ -106,7 +104,6 @@ public abstract class RecentChainData implements StoreUpdateHandler {
final StoreConfig storeConfig,
final BlockProvider blockProvider,
final StateAndBlockSummaryProvider stateProvider,
final BlobSidecarsProvider blobSidecarsProvider,
final EarliestBlobSidecarSlotProvider earliestBlobSidecarSlotProvider,
final StorageUpdateChannel storageUpdateChannel,
final VoteUpdateChannel voteUpdateChannel,
Expand All @@ -118,7 +115,6 @@ public abstract class RecentChainData implements StoreUpdateHandler {
this.storeConfig = storeConfig;
this.blockProvider = blockProvider;
this.stateProvider = stateProvider;
this.blobSidecarsProvider = blobSidecarsProvider;
this.earliestBlobSidecarSlotProvider = earliestBlobSidecarSlotProvider;
this.voteUpdateChannel = voteUpdateChannel;
this.chainHeadChannel = chainHeadChannel;
Expand Down Expand Up @@ -154,7 +150,6 @@ public void initializeFromAnchorPoint(final AnchorPoint anchorPoint, final UInt6
.specProvider(spec)
.blockProvider(blockProvider)
.stateProvider(stateProvider)
.blobSidecarsProvider(blobSidecarsProvider)
.earliestBlobSidecarSlotProvider(earliestBlobSidecarSlotProvider)
.storeConfig(storeConfig)
.build();
Expand Down Expand Up @@ -495,6 +490,11 @@ public Optional<Bytes32> getExecutionBlockHashForBlockRoot(final Bytes32 root) {
return getForkChoiceStrategy().flatMap(forkChoice -> forkChoice.executionBlockHash(root));
}

public Optional<List<BlobSidecar>> getBlobSidecars(final SlotAndBlockRoot slotAndBlockRoot) {
return Optional.ofNullable(store)
.flatMap(s -> store.getBlobSidecarsIfAvailable(slotAndBlockRoot));
}

public SafeFuture<Optional<BeaconBlock>> retrieveBlockByRoot(final Bytes32 root) {
if (store == null) {
return EmptyStoreResults.EMPTY_BLOCK_FUTURE;
Expand Down Expand Up @@ -532,14 +532,6 @@ public SafeFuture<Optional<BeaconState>> retrieveStateInEffectAtSlot(final UInt6
return store.retrieveBlockState(rootAtSlot.get());
}

public SafeFuture<List<BlobSidecar>> retrieveBlobSidecars(
final SlotAndBlockRoot slotAndBlockRoot) {
if (store == null) {
return EmptyStoreResults.NO_BLOB_SIDECARS_FUTURE;
}
return store.retrieveBlobSidecars(slotAndBlockRoot);
}

public SafeFuture<Optional<UInt64>> retrieveEarliestBlobSidecarSlot() {
if (store == null) {
return EmptyStoreResults.NO_EARLIEST_BLOB_SIDECAR_SLOT_FUTURE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import tech.pegasys.teku.dataproviders.lookup.BlobSidecarsProvider;
import tech.pegasys.teku.dataproviders.lookup.BlockProvider;
import tech.pegasys.teku.dataproviders.lookup.StateAndBlockSummaryProvider;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
Expand All @@ -43,7 +42,6 @@ public class StorageBackedRecentChainData extends RecentChainData {
private static final Logger LOG = LogManager.getLogger();
private final BlockProvider blockProvider;
private final StateAndBlockSummaryProvider stateProvider;
private final BlobSidecarsProvider blobSidecarsProvider;
private final StorageQueryChannel storageQueryChannel;
private final StoreConfig storeConfig;

Expand All @@ -63,7 +61,6 @@ public StorageBackedRecentChainData(
storeConfig,
storageQueryChannel::getHotBlocksByRoot,
storageQueryChannel::getHotStateAndBlockSummaryByBlockRoot,
storageQueryChannel::getBlobSidecarsBySlotAndBlockRoot,
storageQueryChannel::getEarliestAvailableBlobSidecarSlot,
storageUpdateChannel,
voteUpdateChannel,
Expand All @@ -74,7 +71,6 @@ public StorageBackedRecentChainData(
this.storageQueryChannel = storageQueryChannel;
this.blockProvider = storageQueryChannel::getHotBlocksByRoot;
this.stateProvider = storageQueryChannel::getHotStateAndBlockSummaryByBlockRoot;
this.blobSidecarsProvider = storageQueryChannel::getBlobSidecarsBySlotAndBlockRoot;
}

public static SafeFuture<RecentChainData> create(
Expand Down Expand Up @@ -158,7 +154,6 @@ private SafeFuture<RecentChainData> processStoreFuture(
.asyncRunner(asyncRunner)
.blockProvider(blockProvider)
.stateProvider(stateProvider)
.blobSidecarsProvider(blobSidecarsProvider)
.storeConfig(storeConfig)
.build();
setStore(store);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package tech.pegasys.teku.storage.store;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.BlockAndCheckpoints;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
import tech.pegasys.teku.spec.datastructures.blocks.StateAndBlockSummary;
Expand All @@ -38,6 +40,8 @@ public abstract class CacheableStore implements UpdatableStore {

abstract void cacheStates(Map<Bytes32, StateAndBlockSummary> stateAndBlockSummaries);

abstract void cacheBlobSidecars(Map<SlotAndBlockRoot, List<BlobSidecar>> blobSidecarsMap);

abstract void cacheFinalizedOptimisticTransitionPayload(
Optional<SlotAndExecutionPayloadSummary> finalizedOptimisticTransitionPayload);

Expand Down
Loading

0 comments on commit 28213de

Please sign in to comment.