diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d7628c3eec..a6cc88b16f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,5 +10,6 @@ ### Additions and Improvements - Optimized blobs validation pipeline +- Remove delay when fetching blobs from the local EL on block arrival ### Bug Fixes \ No newline at end of file diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/versions/deneb/helpers/MiscHelpersDeneb.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/versions/deneb/helpers/MiscHelpersDeneb.java index ebf9e188b45..87953843804 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/versions/deneb/helpers/MiscHelpersDeneb.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/versions/deneb/helpers/MiscHelpersDeneb.java @@ -37,8 +37,12 @@ import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarSchema; import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlockHeader; import tech.pegasys.teku.spec.datastructures.blocks.blockbody.BeaconBlockBody; +import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.BeaconBlockBodyDeneb; import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.BeaconBlockBodySchemaDeneb; +import tech.pegasys.teku.spec.datastructures.execution.BlobAndProof; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; import tech.pegasys.teku.spec.datastructures.type.SszKZGCommitment; import tech.pegasys.teku.spec.datastructures.type.SszKZGProof; import tech.pegasys.teku.spec.logic.common.helpers.MiscHelpers; @@ -266,6 +270,32 @@ public BlobSidecar constructBlobSidecar( index, blob, commitment, proof, signedBeaconBlock.asHeader(), kzgCommitmentInclusionProof); } + public BlobSidecar constructBlobSidecarFromBlobAndProof( + final BlobIdentifier blobIdentifier, + final BlobAndProof blobAndProof, + final BeaconBlockBodyDeneb beaconBlockBodyDeneb, + final SignedBeaconBlockHeader signedBeaconBlockHeader) { + + final SszKZGCommitment sszKZGCommitment = + beaconBlockBodyDeneb.getBlobKzgCommitments().get(blobIdentifier.getIndex().intValue()); + + final BlobSidecar blobSidecar = + blobSidecarSchema.create( + blobIdentifier.getIndex(), + blobAndProof.blob(), + sszKZGCommitment, + new SszKZGProof(blobAndProof.proof()), + signedBeaconBlockHeader, + computeKzgCommitmentInclusionProof(blobIdentifier.getIndex(), beaconBlockBodyDeneb)); + + blobSidecar.markSignatureAsValidated(); + blobSidecar.markKzgCommitmentInclusionProofAsValidated(); + // assume kzg validation done by local EL + blobSidecar.markKzgAsValidated(); + + return blobSidecar; + } + public boolean verifyBlobKzgCommitmentInclusionProof(final BlobSidecar blobSidecar) { if (blobSidecar.isKzgCommitmentInclusionProofValidated()) { return true; diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerImpl.java index fc20d0c60e8..22609e00b69 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerImpl.java @@ -66,9 +66,7 @@ public BlobSidecarManagerImpl( invalidBlobSidecarRoots, (tracker) -> new ForkChoiceBlobSidecarsAvailabilityChecker(spec, recentChainData, tracker, kzg), - // we don't care to set maxBlobsPerBlock since it isn't used with this immediate validation - // flow - (block) -> new BlockBlobSidecarsTracker(block.getSlotAndBlockRoot(), UInt64.ZERO)); + (block) -> new BlockBlobSidecarsTracker(block.getSlotAndBlockRoot())); } @VisibleForTesting diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java index 64c89d91ba1..83faba90209 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java @@ -40,13 +40,15 @@ public class BlockBlobSidecarsTracker { private static final Logger LOG = LogManager.getLogger(); + private static final UInt64 CREATION_TIMING_IDX = UInt64.MAX_VALUE; private static final UInt64 BLOCK_ARRIVAL_TIMING_IDX = CREATION_TIMING_IDX.decrement(); - private static final UInt64 RPC_FETCH_TIMING_IDX = BLOCK_ARRIVAL_TIMING_IDX.decrement(); - private static final UInt64 LOCAL_EL_FETCH_TIMING_IDX = RPC_FETCH_TIMING_IDX.decrement(); + private static final UInt64 RPC_BLOCK_FETCH_TIMING_IDX = BLOCK_ARRIVAL_TIMING_IDX.decrement(); + private static final UInt64 RPC_BLOBS_FETCH_TIMING_IDX = RPC_BLOCK_FETCH_TIMING_IDX.decrement(); + private static final UInt64 LOCAL_EL_BLOBS_FETCH_TIMING_IDX = + RPC_BLOBS_FETCH_TIMING_IDX.decrement(); private final SlotAndBlockRoot slotAndBlockRoot; - private final UInt64 maxBlobsPerBlock; private final AtomicReference> block = new AtomicReference<>(Optional.empty()); @@ -56,8 +58,9 @@ public class BlockBlobSidecarsTracker { private final NavigableMap blobSidecars = new ConcurrentSkipListMap<>(); private final SafeFuture blobSidecarsComplete = new SafeFuture<>(); - private volatile boolean rpcFetchTriggered = false; - private volatile boolean localElFetchTriggered = false; + private volatile boolean localElBlobsFetchTriggered = false; + private volatile boolean rpcBlockFetchTriggered = false; + private volatile boolean rpcBlobsFetchTriggered = false; private final Optional> maybeDebugTimings; @@ -69,12 +72,9 @@ public class BlockBlobSidecarsTracker { * tracker instance will be used, so no synchronization is required * * @param slotAndBlockRoot slot and block root to create tracker for - * @param maxBlobsPerBlock max number of blobs per block for the slot */ - public BlockBlobSidecarsTracker( - final SlotAndBlockRoot slotAndBlockRoot, final UInt64 maxBlobsPerBlock) { + public BlockBlobSidecarsTracker(final SlotAndBlockRoot slotAndBlockRoot) { this.slotAndBlockRoot = slotAndBlockRoot; - this.maxBlobsPerBlock = maxBlobsPerBlock; if (LOG.isDebugEnabled()) { // don't need a concurrent hashmap since we'll interact with it from synchronized BlobSidecar // pool methods @@ -112,31 +112,13 @@ public Optional getBlobSidecar(final UInt64 index) { public Stream getMissingBlobSidecars() { final Optional blockCommitmentsCount = getBlockKzgCommitmentsCount(); - if (blockCommitmentsCount.isPresent()) { - return UInt64.range(UInt64.ZERO, UInt64.valueOf(blockCommitmentsCount.get())) - .filter(blobIndex -> !blobSidecars.containsKey(blobIndex)) - .map(blobIndex -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), blobIndex)); - } + checkState(blockCommitmentsCount.isPresent(), "Block must be known to call this method"); - if (blobSidecars.isEmpty()) { - return Stream.of(); - } - - // We may return maxBlobsPerBlock because we don't know the block - return UInt64.range(UInt64.ZERO, maxBlobsPerBlock) + return UInt64.range(UInt64.ZERO, UInt64.valueOf(blockCommitmentsCount.get())) .filter(blobIndex -> !blobSidecars.containsKey(blobIndex)) .map(blobIndex -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), blobIndex)); } - public Stream getUnusedBlobSidecarsForBlock() { - final Optional blockCommitmentsCount = getBlockKzgCommitmentsCount(); - checkState(blockCommitmentsCount.isPresent(), "Block must me known to call this method"); - - final UInt64 firstUnusedIndex = UInt64.valueOf(blockCommitmentsCount.get()); - return UInt64.range(firstUnusedIndex, maxBlobsPerBlock) - .map(blobIndex -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), blobIndex)); - } - public boolean add(final BlobSidecar blobSidecar) { checkArgument( blobSidecar.getBlockRoot().equals(slotAndBlockRoot.getBlockRoot()), @@ -247,24 +229,35 @@ public boolean isComplete() { return blobSidecarsComplete.isDone(); } - public boolean isRpcFetchTriggered() { - return rpcFetchTriggered; + public boolean isLocalElBlobsFetchTriggered() { + return localElBlobsFetchTriggered; } - public void setRpcFetchTriggered() { - this.rpcFetchTriggered = true; + public void setLocalElBlobsFetchTriggered() { + this.localElBlobsFetchTriggered = true; maybeDebugTimings.ifPresent( - debugTimings -> debugTimings.put(RPC_FETCH_TIMING_IDX, System.currentTimeMillis())); + debugTimings -> + debugTimings.put(LOCAL_EL_BLOBS_FETCH_TIMING_IDX, System.currentTimeMillis())); } - public boolean isLocalElFetchTriggered() { - return localElFetchTriggered; + public boolean isRpcBlockFetchTriggered() { + return rpcBlockFetchTriggered; } - public void setLocalElFetchTriggered() { - this.localElFetchTriggered = true; + public void setRpcBlockFetchTriggered() { + this.rpcBlockFetchTriggered = true; maybeDebugTimings.ifPresent( - debugTimings -> debugTimings.put(LOCAL_EL_FETCH_TIMING_IDX, System.currentTimeMillis())); + debugTimings -> debugTimings.put(RPC_BLOCK_FETCH_TIMING_IDX, System.currentTimeMillis())); + } + + public boolean isRpcBlobsFetchTriggered() { + return rpcBlobsFetchTriggered; + } + + public void setRpcBlobsFetchTriggered() { + this.rpcBlobsFetchTriggered = true; + maybeDebugTimings.ifPresent( + debugTimings -> debugTimings.put(RPC_BLOBS_FETCH_TIMING_IDX, System.currentTimeMillis())); } private boolean areBlobsComplete() { @@ -315,22 +308,31 @@ private void printDebugTimings(final Map debugTimings) { .append(debugTimings.getOrDefault(BLOCK_ARRIVAL_TIMING_IDX, 0L) - creationTime) .append("ms - "); - if (debugTimings.containsKey(LOCAL_EL_FETCH_TIMING_IDX)) { + if (debugTimings.containsKey(LOCAL_EL_BLOBS_FETCH_TIMING_IDX)) { + timingsReport + .append("Local EL blobs fetch delay ") + .append(debugTimings.get(LOCAL_EL_BLOBS_FETCH_TIMING_IDX) - creationTime) + .append("ms - "); + } else { + timingsReport.append("Local EL blobs fetch wasn't required - "); + } + + if (debugTimings.containsKey(RPC_BLOCK_FETCH_TIMING_IDX)) { timingsReport - .append("Local EL fetch delay ") - .append(debugTimings.get(LOCAL_EL_FETCH_TIMING_IDX) - creationTime) + .append("RPC block fetch delay ") + .append(debugTimings.get(RPC_BLOCK_FETCH_TIMING_IDX) - creationTime) .append("ms - "); } else { - timingsReport.append("Local EL fetch wasn't required - "); + timingsReport.append("RPC block fetch wasn't required - "); } - if (debugTimings.containsKey(RPC_FETCH_TIMING_IDX)) { + if (debugTimings.containsKey(RPC_BLOBS_FETCH_TIMING_IDX)) { timingsReport - .append("RPC fetch delay ") - .append(debugTimings.get(RPC_FETCH_TIMING_IDX) - creationTime) + .append("RPC blobs fetch delay ") + .append(debugTimings.get(RPC_BLOBS_FETCH_TIMING_IDX) - creationTime) .append("ms"); } else { - timingsReport.append("RPC fetch wasn't required"); + timingsReport.append("RPC blobs fetch wasn't required"); } LOG.debug(timingsReport.toString()); @@ -342,8 +344,9 @@ public String toString() { .add("slotAndBlockRoot", slotAndBlockRoot) .add("isBlockPresent", block.get().isPresent()) .add("isComplete", isComplete()) - .add("rpcFetchTriggered", rpcFetchTriggered) - .add("localElFetchTriggered", localElFetchTriggered) + .add("localElBlobsFetchTriggered", localElBlobsFetchTriggered) + .add("rpcBlockFetchTriggered", rpcBlockFetchTriggered) + .add("rpcBlobsFetchTriggered", rpcBlobsFetchTriggered) .add("blockImportOnCompletionEnabled", blockImportOnCompletionEnabled.get()) .add( "blobSidecars", diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index 0bfb291d0e2..652e5fc9607 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -48,9 +48,7 @@ import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.SpecVersion; -import tech.pegasys.teku.spec.config.SpecConfigDeneb; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; -import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarSchema; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlockHeader; import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; @@ -58,7 +56,6 @@ import tech.pegasys.teku.spec.datastructures.execution.BlobAndProof; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; import tech.pegasys.teku.spec.datastructures.type.SszKZGCommitment; -import tech.pegasys.teku.spec.datastructures.type.SszKZGProof; import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel; import tech.pegasys.teku.spec.logic.versions.deneb.helpers.MiscHelpersDeneb; import tech.pegasys.teku.spec.logic.versions.deneb.types.VersionedHash; @@ -92,6 +89,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis static final String GAUGE_BLOB_SIDECARS_LABEL = "blob_sidecars"; static final String GAUGE_BLOB_SIDECARS_TRACKERS_LABEL = "blob_sidecars_trackers"; + // RPC fetching delay timings static final UInt64 MAX_WAIT_RELATIVE_TO_ATT_DUE_MILLIS = UInt64.valueOf(1500); static final UInt64 MIN_WAIT_MILLIS = UInt64.valueOf(500); static final UInt64 TARGET_WAIT_MILLIS = UInt64.valueOf(1000); @@ -154,7 +152,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis this.maxTrackers = maxTrackers; this.sizeGauge = sizeGauge; this.poolStatsCounters = poolStatsCounters; - this.trackerFactory = (slotAndBlockRoot) -> createTracker(spec, slotAndBlockRoot); + this.trackerFactory = BlockBlobSidecarsTracker::new; initMetrics(sizeGauge, poolStatsCounters); } @@ -209,15 +207,6 @@ private static void initMetrics( }); } - private static BlockBlobSidecarsTracker createTracker( - final Spec spec, final SlotAndBlockRoot slotAndBlockRoot) { - return new BlockBlobSidecarsTracker( - slotAndBlockRoot, - UInt64.valueOf( - SpecConfigDeneb.required(spec.atSlot(slotAndBlockRoot.getSlot()).getConfig()) - .getMaxBlobsPerBlock())); - } - @Override public synchronized void onNewBlobSidecar( final BlobSidecar blobSidecar, final RemoteOrigin remoteOrigin) { @@ -230,12 +219,25 @@ public synchronized void onNewBlobSidecar( final SlotAndBlockRoot slotAndBlockRoot = blobSidecar.getSlotAndBlockRoot(); - final BlockBlobSidecarsTracker blobSidecarsTracker = - getOrCreateBlobSidecarsTracker( - slotAndBlockRoot, - newTracker -> onFirstSeen(slotAndBlockRoot, Optional.of(remoteOrigin)), - existingTracker -> {}); + getOrCreateBlobSidecarsTracker( + slotAndBlockRoot, + newTracker -> { + addBlobSidecarToTracker(newTracker, slotAndBlockRoot, blobSidecar, remoteOrigin); + onFirstSeen(slotAndBlockRoot, Optional.of(remoteOrigin)); + }, + existingTracker -> + addBlobSidecarToTracker(existingTracker, slotAndBlockRoot, blobSidecar, remoteOrigin)); + + if (orderedBlobSidecarsTrackers.add(slotAndBlockRoot)) { + sizeGauge.set(orderedBlobSidecarsTrackers.size(), GAUGE_BLOB_SIDECARS_TRACKERS_LABEL); + } + } + private void addBlobSidecarToTracker( + final BlockBlobSidecarsTracker blobSidecarsTracker, + final SlotAndBlockRoot slotAndBlockRoot, + final BlobSidecar blobSidecar, + final RemoteOrigin remoteOrigin) { if (blobSidecarsTracker.add(blobSidecar)) { sizeGauge.set(++totalBlobSidecars, GAUGE_BLOB_SIDECARS_LABEL); countBlobSidecar(remoteOrigin); @@ -246,10 +248,6 @@ public synchronized void onNewBlobSidecar( } else { countDuplicateBlobSidecar(remoteOrigin); } - - if (orderedBlobSidecarsTrackers.add(slotAndBlockRoot)) { - sizeGauge.set(orderedBlobSidecarsTrackers.size(), GAUGE_BLOB_SIDECARS_TRACKERS_LABEL); - } } private void publishRecoveredBlobSidecar(final BlobSidecar blobSidecar) { @@ -326,7 +324,7 @@ public synchronized void onCompletedBlockAndBlobSidecars( long addedBlobs = blobSidecars.stream() - .map( + .filter( blobSidecar -> { final boolean isNew = blobSidecarsTracker.add(blobSidecar); if (isNew) { @@ -335,7 +333,6 @@ public synchronized void onCompletedBlockAndBlobSidecars( } return isNew; }) - .filter(Boolean::booleanValue) .count(); totalBlobSidecars += (int) addedBlobs; sizeGauge.set(totalBlobSidecars, GAUGE_BLOB_SIDECARS_LABEL); @@ -427,7 +424,13 @@ public synchronized Optional getBlock(final Bytes32 blockRoot @Override public synchronized Set getAllRequiredBlobSidecars() { return blockBlobSidecarsTrackers.values().stream() - .flatMap(BlockBlobSidecarsTracker::getMissingBlobSidecars) + .flatMap( + tracker -> { + if (tracker.getBlock().isEmpty()) { + return Stream.empty(); + } + return tracker.getMissingBlobSidecars(); + }) .collect(Collectors.toSet()); } @@ -474,6 +477,7 @@ public synchronized int getTotalBlobSidecarsTrackers() { return blockBlobSidecarsTrackers.size(); } + @SuppressWarnings("FutureReturnValueIgnored") private BlockBlobSidecarsTracker internalOnNewBlock( final SignedBeaconBlock block, final Optional remoteOrigin) { final SlotAndBlockRoot slotAndBlockRoot = block.getSlotAndBlockRoot(); @@ -495,25 +499,22 @@ private BlockBlobSidecarsTracker internalOnNewBlock( countBlock(remoteOrigin); - if (existingTracker.isRpcFetchTriggered()) { - // block has been set for the first time and we previously triggered fetching of - // missing blobSidecars. So we may have requested to fetch more sidecars - // than the block actually requires. Let's drop them. - existingTracker - .getUnusedBlobSidecarsForBlock() - .forEach( - blobIdentifier -> - requiredBlobSidecarDroppedSubscribers.deliver( - RequiredBlobSidecarDroppedSubscriber::onRequiredBlobSidecarDropped, - blobIdentifier)); - - // if we attempted to fetch via RPC, we missed the opportunity to complete the - // tracker via local EL (local El fetch requires the block to be known) - // Let's try now - if (!existingTracker.isLocalElFetchTriggered() && !existingTracker.isComplete()) { - fetchMissingContentFromLocalEL(slotAndBlockRoot) - .finish(this::logLocalElBlobsLookupFailure); - } + if (!existingTracker.isComplete()) { + // we missed the opportunity to complete the blob sidecars via local EL and RPC + // (since the block is required to be known) Let's try now + asyncRunner.runAsync( + () -> + fetchMissingBlobsFromLocalEL(slotAndBlockRoot) + .handleException(this::logLocalElBlobsLookupFailure) + .thenRun( + () -> { + // only run if RPC block fetch has happened ( no blobs RPC fetch + // has occurred) + if (existingTracker.isRpcBlockFetchTriggered()) { + fetchMissingBlockOrBlobsFromRPC(slotAndBlockRoot); + } + }) + .finish(this::logBlockOrBlobsRPCFailure)); } }); @@ -575,7 +576,7 @@ private BlockBlobSidecarsTracker getOrCreateBlobSidecarsTracker( } private void makeRoomForNewTracker() { - while (blockBlobSidecarsTrackers.size() > (maxTrackers - 1)) { + while (blockBlobSidecarsTrackers.size() > maxTrackers - 1) { final SlotAndBlockRoot toRemove = orderedBlobSidecarsTrackers.pollFirst(); if (toRemove == null) { break; @@ -584,6 +585,7 @@ private void makeRoomForNewTracker() { } } + @SuppressWarnings("FutureReturnValueIgnored") private void onFirstSeen( final SlotAndBlockRoot slotAndBlockRoot, final Optional remoteOrigin) { final boolean isLocalBlockProduction = @@ -591,27 +593,22 @@ private void onFirstSeen( if (isLocalBlockProduction) { return; } + // delay RPC fetching + final SafeFuture rpcFetchDelay = + asyncRunner.getDelayedFuture(calculateRpcFetchDelay(slotAndBlockRoot)); - final Duration fetchDelay = calculateFetchDelay(slotAndBlockRoot); - - asyncRunner - .runAfterDelay( - () -> - this.fetchMissingContentFromLocalEL(slotAndBlockRoot) - .handleException(this::logLocalElBlobsLookupFailure) - .thenRun(() -> this.fetchMissingContentFromRemotePeers(slotAndBlockRoot)), - fetchDelay) - .finish( - error -> - LOG.error("An error occurred while attempting to fetch missing blobs.", error)); - } - - private void logLocalElBlobsLookupFailure(final Throwable error) { - LOG.warn("Local EL blobs lookup failed: {}", getRootCauseMessage(error)); + asyncRunner.runAsync( + () -> + // fetch blobs from EL with no delay + fetchMissingBlobsFromLocalEL(slotAndBlockRoot) + .handleException(this::logLocalElBlobsLookupFailure) + .thenCompose(__ -> rpcFetchDelay) + .thenRun(() -> fetchMissingBlockOrBlobsFromRPC(slotAndBlockRoot)) + .finish(this::logBlockOrBlobsRPCFailure)); } @VisibleForTesting - Duration calculateFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) { + Duration calculateRpcFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) { final UInt64 slot = slotAndBlockRoot.getSlot(); if (slot.isLessThan(getCurrentSlot())) { @@ -641,28 +638,21 @@ Duration calculateFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) { return Duration.ofMillis(finalTime.minus(nowMillis).intValue()); } - private synchronized SafeFuture fetchMissingContentFromLocalEL( + private synchronized SafeFuture fetchMissingBlobsFromLocalEL( final SlotAndBlockRoot slotAndBlockRoot) { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = blockBlobSidecarsTrackers.get(slotAndBlockRoot.getBlockRoot()); - if (blockBlobSidecarsTracker == null) { - return SafeFuture.COMPLETE; - } - - if (blockBlobSidecarsTracker.isComplete()) { - return SafeFuture.COMPLETE; - } - - if (blockBlobSidecarsTracker.getBlock().isEmpty()) { + if (blockBlobSidecarsTracker == null + || blockBlobSidecarsTracker.isComplete() + || blockBlobSidecarsTracker.getBlock().isEmpty()) { return SafeFuture.COMPLETE; } - blockBlobSidecarsTracker.setLocalElFetchTriggered(); + final List missingBlobsIdentifiers = + blockBlobSidecarsTracker.getMissingBlobSidecars().toList(); final SpecVersion specVersion = spec.atSlot(slotAndBlockRoot.getSlot()); - final BlobSidecarSchema blobSidecarSchema = - specVersion.getSchemaDefinitions().toVersionDeneb().orElseThrow().getBlobSidecarSchema(); final MiscHelpersDeneb miscHelpersDeneb = specVersion.miscHelpers().toVersionDeneb().orElseThrow(); final SignedBeaconBlockHeader signedBeaconBlockHeader = @@ -679,9 +669,6 @@ private synchronized SafeFuture fetchMissingContentFromLocalEL( final SszList sszKZGCommitments = beaconBlockBodyDeneb.getBlobKzgCommitments(); - final List missingBlobsIdentifiers = - blockBlobSidecarsTracker.getMissingBlobSidecars().toList(); - final List versionedHashes = missingBlobsIdentifiers.stream() .map( @@ -692,6 +679,8 @@ private synchronized SafeFuture fetchMissingContentFromLocalEL( .getKZGCommitment())) .toList(); + blockBlobSidecarsTracker.setLocalElBlobsFetchTriggered(); + poolStatsCounters .labels(COUNTER_SIDECAR_TYPE, COUNTER_LOCAL_EL_FETCH_SUBTYPE) .inc(versionedHashes.size()); @@ -708,16 +697,15 @@ private synchronized SafeFuture fetchMissingContentFromLocalEL( for (int index = 0; index < blobAndProofs.size(); index++) { final Optional blobAndProof = blobAndProofs.get(index); + final BlobIdentifier blobIdentifier = missingBlobsIdentifiers.get(index); if (blobAndProof.isEmpty()) { - LOG.trace("Blob not found on local EL: {}", missingBlobsIdentifiers.get(index)); + LOG.trace("Blob not found on local EL: {}", blobIdentifier); continue; } final BlobSidecar blobSidecar = - createBlobSidecarFromBlobAndProof( - blobSidecarSchema, - miscHelpersDeneb, - missingBlobsIdentifiers.get(index), + miscHelpersDeneb.constructBlobSidecarFromBlobAndProof( + blobIdentifier, blobAndProof.get(), beaconBlockBodyDeneb, signedBeaconBlockHeader); @@ -726,57 +714,32 @@ private synchronized SafeFuture fetchMissingContentFromLocalEL( }); } - private BlobSidecar createBlobSidecarFromBlobAndProof( - final BlobSidecarSchema blobSidecarSchema, - final MiscHelpersDeneb miscHelpersDeneb, - final BlobIdentifier blobIdentifier, - final BlobAndProof blobAndProof, - final BeaconBlockBodyDeneb beaconBlockBodyDeneb, - final SignedBeaconBlockHeader signedBeaconBlockHeader) { - - final SszKZGCommitment sszKZGCommitment = - beaconBlockBodyDeneb.getBlobKzgCommitments().get(blobIdentifier.getIndex().intValue()); - - final BlobSidecar blobSidecar = - blobSidecarSchema.create( - blobIdentifier.getIndex(), - blobAndProof.blob(), - sszKZGCommitment, - new SszKZGProof(blobAndProof.proof()), - signedBeaconBlockHeader, - miscHelpersDeneb.computeKzgCommitmentInclusionProof( - blobIdentifier.getIndex(), beaconBlockBodyDeneb)); - - blobSidecar.markSignatureAsValidated(); - blobSidecar.markKzgCommitmentInclusionProofAsValidated(); - // assume kzg validation done by local EL - blobSidecar.markKzgAsValidated(); - - return blobSidecar; + private void logLocalElBlobsLookupFailure(final Throwable error) { + LOG.warn("Local EL blobs lookup failed: {}", getRootCauseMessage(error)); } - private synchronized void fetchMissingContentFromRemotePeers( + private synchronized void fetchMissingBlockOrBlobsFromRPC( final SlotAndBlockRoot slotAndBlockRoot) { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = blockBlobSidecarsTrackers.get(slotAndBlockRoot.getBlockRoot()); - if (blockBlobSidecarsTracker == null) { + if (blockBlobSidecarsTracker == null || blockBlobSidecarsTracker.isComplete()) { return; } - if (blockBlobSidecarsTracker.isComplete()) { - return; - } + if (blockBlobSidecarsTracker.getBlock().isEmpty()) { - blockBlobSidecarsTracker.setRpcFetchTriggered(); + blockBlobSidecarsTracker.setRpcBlockFetchTriggered(); - if (blockBlobSidecarsTracker.getBlock().isEmpty()) { poolStatsCounters.labels(COUNTER_BLOCK_TYPE, COUNTER_RPC_FETCH_SUBTYPE).inc(); requiredBlockRootSubscribers.deliver( RequiredBlockRootSubscriber::onRequiredBlockRoot, blockBlobSidecarsTracker.getSlotAndBlockRoot().getBlockRoot()); + return; } + blockBlobSidecarsTracker.setRpcBlobsFetchTriggered(); + blockBlobSidecarsTracker .getMissingBlobSidecars() .forEach( @@ -787,24 +750,27 @@ private synchronized void fetchMissingContentFromRemotePeers( }); } - private void dropMissingContent(final BlockBlobSidecarsTracker blockBlobSidecarsTracker) { + private void logBlockOrBlobsRPCFailure(final Throwable error) { + LOG.error("An error occurred while attempting to fetch block or blobs via RPC.", error); + } - if (!blockBlobSidecarsTracker.isRpcFetchTriggered()) { - return; - } + private void dropMissingContent(final BlockBlobSidecarsTracker blockBlobSidecarsTracker) { - if (blockBlobSidecarsTracker.getBlock().isEmpty()) { + if (blockBlobSidecarsTracker.isRpcBlockFetchTriggered() + && blockBlobSidecarsTracker.getBlock().isEmpty()) { requiredBlockRootDroppedSubscribers.deliver( RequiredBlockRootDroppedSubscriber::onRequiredBlockRootDropped, blockBlobSidecarsTracker.getSlotAndBlockRoot().getBlockRoot()); } - blockBlobSidecarsTracker - .getMissingBlobSidecars() - .forEach( - blobIdentifier -> - requiredBlobSidecarDroppedSubscribers.deliver( - RequiredBlobSidecarDroppedSubscriber::onRequiredBlobSidecarDropped, - blobIdentifier)); + if (blockBlobSidecarsTracker.isRpcBlobsFetchTriggered()) { + blockBlobSidecarsTracker + .getMissingBlobSidecars() + .forEach( + blobIdentifier -> + requiredBlobSidecarDroppedSubscribers.deliver( + RequiredBlobSidecarDroppedSubscriber::onRequiredBlobSidecarDropped, + blobIdentifier)); + } } } diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTrackerTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTrackerTest.java index 275fb2fcada..662c97d818d 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTrackerTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTrackerTest.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.junit.jupiter.api.Test; @@ -65,11 +64,10 @@ public class BlockBlobSidecarsTrackerTest { @Test void isNotCompletedJustAfterCreation() { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); SafeFutureAssert.assertThatSafeFuture(blockBlobSidecarsTracker.getCompletionFuture()) .isNotCompleted(); - assertThat(blockBlobSidecarsTracker.getMissingBlobSidecars()).isEmpty(); assertThat(blockBlobSidecarsTracker.getBlock()).isEmpty(); assertThat(blockBlobSidecarsTracker.getBlobSidecars()).isEmpty(); assertThat(blockBlobSidecarsTracker.getSlotAndBlockRoot()).isEqualTo(slotAndBlockRoot); @@ -81,7 +79,7 @@ void isNotCompletedJustAfterCreation() { @Test void setBlock_shouldAcceptCorrectBlock() { BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); blockBlobSidecarsTracker.setBlock(block); SafeFutureAssert.assertThatSafeFuture(blockBlobSidecarsTracker.getCompletionFuture()) @@ -95,7 +93,7 @@ void setBlock_shouldAcceptCorrectBlock() { @Test void setBlock_shouldThrowWithWrongBlock() { BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(dataStructureUtil.randomSlotAndBlockRoot(), maxBlobsPerBlock); + new BlockBlobSidecarsTracker(dataStructureUtil.randomSlotAndBlockRoot()); assertThatThrownBy(() -> blockBlobSidecarsTracker.setBlock(block)) .isInstanceOf(IllegalArgumentException.class); } @@ -103,7 +101,7 @@ void setBlock_shouldThrowWithWrongBlock() { @Test void setBlock_shouldAcceptBlockTwice() { BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); blockBlobSidecarsTracker.setBlock(block); blockBlobSidecarsTracker.setBlock(block); assertThat(blockBlobSidecarsTracker.getBlock()).isEqualTo(Optional.of(block)); @@ -115,7 +113,7 @@ void setBlock_immediatelyCompletesWithBlockWithoutBlobs() { final SlotAndBlockRoot slotAndBlockRoot = block.getSlotAndBlockRoot(); final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); final SafeFuture completionFuture = blockBlobSidecarsTracker.getCompletionFuture(); SafeFutureAssert.assertThatSafeFuture(completionFuture).isNotCompleted(); @@ -135,7 +133,7 @@ void getCompletionFuture_returnsIndependentFutures() { final SlotAndBlockRoot slotAndBlockRoot = block.getSlotAndBlockRoot(); final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); final SafeFuture completionFuture1 = blockBlobSidecarsTracker.getCompletionFuture(); final SafeFuture completionFuture2 = blockBlobSidecarsTracker.getCompletionFuture(); final SafeFuture completionFuture3 = blockBlobSidecarsTracker.getCompletionFuture(); @@ -168,23 +166,18 @@ void getCompletionFuture_returnsIndependentFutures() { @Test void add_shouldWorkTillCompletionWhenAddingBlobsBeforeBlockIsSet() { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock.plus(1)); - final BlobSidecar toAdd = blobSidecarsForBlock.get(0); + new BlockBlobSidecarsTracker(slotAndBlockRoot); + final BlobSidecar toAdd = blobSidecarsForBlock.getFirst(); final Map added = new HashMap<>(); final SafeFuture completionFuture = blockBlobSidecarsTracker.getCompletionFuture(); added.put(toAdd.getIndex(), toAdd); blockBlobSidecarsTracker.add(toAdd); - // we don't know the block, missing blobs are max blobs minus the blob we already have - final Set potentialMissingBlobs = - UInt64.range(UInt64.valueOf(1), maxBlobsPerBlock.plus(1)) - .map(index -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), index)) - .collect(Collectors.toSet()); - SafeFutureAssert.assertThatSafeFuture(completionFuture).isNotCompleted(); - assertThat(blockBlobSidecarsTracker.getMissingBlobSidecars()) - .containsExactlyInAnyOrderElementsOf(potentialMissingBlobs); + assertThatThrownBy(blockBlobSidecarsTracker::getMissingBlobSidecars) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Block must be known to call this method"); assertThat(blockBlobSidecarsTracker.getBlobSidecars()) .containsExactlyInAnyOrderEntriesOf(added); @@ -224,7 +217,7 @@ void add_shouldWorkTillCompletionWhenAddingBlobsBeforeBlockIsSet() { @Test void add_shouldWorkWhenBlockIsSetFirst() { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); final SafeFuture completionFuture = blockBlobSidecarsTracker.getCompletionFuture(); blockBlobSidecarsTracker.setBlock(block); @@ -249,7 +242,7 @@ void add_shouldWorkWhenBlockIsSetFirst() { @Test void add_shouldThrowWhenAddingInconsistentBlobSidecar() { BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(dataStructureUtil.randomSlotAndBlockRoot(), maxBlobsPerBlock); + new BlockBlobSidecarsTracker(dataStructureUtil.randomSlotAndBlockRoot()); assertThatThrownBy(() -> blockBlobSidecarsTracker.add(dataStructureUtil.randomBlobSidecar())) .isInstanceOf(IllegalArgumentException.class); } @@ -257,108 +250,29 @@ void add_shouldThrowWhenAddingInconsistentBlobSidecar() { @Test void add_shouldAcceptAcceptSameBlobSidecarTwice() { BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); blockBlobSidecarsTracker.setBlock(block); blockBlobSidecarsTracker.setBlock(block); assertThat(blockBlobSidecarsTracker.getBlock()).isEqualTo(Optional.of(block)); } @Test - void getMissingBlobSidecars_shouldReturnPartialBlobsIdentifierWhenBlockIsUnknown() { + void getMissingBlobSidecars_shouldThrowWhenBlockIsUnknown() { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); final BlobSidecar toAdd = blobSidecarsForBlock.get(2); blockBlobSidecarsTracker.add(toAdd); - final List knownMissing = - blobIdentifiersForBlock.stream() - .filter(blobIdentifier -> !blobIdentifier.getIndex().equals(UInt64.valueOf(2))) - .collect(Collectors.toList()); - - assertThat(blockBlobSidecarsTracker.getMissingBlobSidecars()) - .containsExactlyInAnyOrderElementsOf(knownMissing); - } - - @Test - void getUnusedBlobSidecarsForBlock_shouldReturnShouldFailIfBlockIsUnknown() { - final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); - - assertThatThrownBy(blockBlobSidecarsTracker::getUnusedBlobSidecarsForBlock) - .isInstanceOf(IllegalStateException.class); - } - - @Test - void getUnusedBlobSidecarsForBlock_shouldReturnEmptySetIfBlockIsFull() { - final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); - - blockBlobSidecarsTracker.setBlock(block); - - assertThat(blockBlobSidecarsTracker.getUnusedBlobSidecarsForBlock()).isEmpty(); - } - - @Test - void getUnusedBlobSidecarsForBlock_shouldReturnUnusedBlobSpace() { - final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock.plus(2)); - - blockBlobSidecarsTracker.setBlock(block); - - final Set expectedUnusedBlobs = - UInt64.range(maxBlobsPerBlock, maxBlobsPerBlock.plus(2)) - .map(index -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), index)) - .collect(Collectors.toSet()); - - assertThat(blockBlobSidecarsTracker.getUnusedBlobSidecarsForBlock()) - .containsExactlyInAnyOrderElementsOf(expectedUnusedBlobs); - } - - @Test - void getUnusedBlobSidecarsForBlock_shouldReturnAllMaxBlobsPerBlockIfBlockIsEmpty() { - - final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlockWithEmptyCommitments(); - final SlotAndBlockRoot slotAndBlockRoot = block.getSlotAndBlockRoot(); - - final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock.plus(2)); - - blockBlobSidecarsTracker.setBlock(block); - - final Set expectedUnusedBlobs = - UInt64.range(UInt64.ZERO, maxBlobsPerBlock.plus(2)) - .map(index -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), index)) - .collect(Collectors.toSet()); - - assertThat(blockBlobSidecarsTracker.getUnusedBlobSidecarsForBlock()) - .containsExactlyInAnyOrderElementsOf(expectedUnusedBlobs); - } - - @Test - void getMissingBlobSidecars_shouldRespectMaxBlobsPerBlock() { - final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); - final BlobSidecar toAdd = - dataStructureUtil - .createRandomBlobSidecarBuilder() - .signedBeaconBlockHeader(block.asHeader()) - .index(UInt64.valueOf(100)) - .build(); - - blockBlobSidecarsTracker.add(toAdd); - - final List knownMissing = - blobIdentifiersForBlock.subList(0, maxBlobsPerBlock.intValue()); - - assertThat(blockBlobSidecarsTracker.getMissingBlobSidecars()) - .containsExactlyInAnyOrderElementsOf(knownMissing); + assertThatThrownBy(blockBlobSidecarsTracker::getMissingBlobSidecars) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Block must be known to call this method"); } @Test void shouldNotIgnoreExcessiveBlobSidecarWhenBlockIsUnknownAndWePruneItLater() { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); final BlobSidecar legitBlobSidecar = createBlobSidecar(UInt64.valueOf(2)); final BlobSidecar excessiveBlobSidecar1 = createBlobSidecar(maxBlobsPerBlock); @@ -382,7 +296,7 @@ void shouldNotIgnoreExcessiveBlobSidecarWhenBlockIsUnknownAndWePruneItLater() { @Test void add_shouldIgnoreExcessiveBlobSidecarWhenBlockIsKnown() { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); blockBlobSidecarsTracker.setBlock(block); @@ -400,7 +314,7 @@ void add_shouldIgnoreExcessiveBlobSidecarWhenBlockIsKnown() { @Test void enableBlockImportOnCompletion_shouldImportOnlyOnceWhenCalled() { BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); blockBlobSidecarsTracker.setBlock(block); diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java index 2dad121a9eb..65b0c20c6d6 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java @@ -15,6 +15,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -123,6 +124,8 @@ public void setup() { blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecarDropped( requiredBlobSidecarDroppedEvents::add); blockBlobSidecarsTrackersPool.subscribeNewBlobSidecar(newBlobSidecarEvents::add); + when(executionLayer.engineGetBlobs(any(), eq(currentSlot))) + .thenReturn(SafeFuture.completedFuture(List.of())); when(blobSidecarPublisher.apply(any())).thenReturn(SafeFuture.COMPLETE); setSlot(currentSlot); } @@ -463,7 +466,7 @@ public void onCompletedBlockAndBlobSidecars_shouldCreateTrackerIgnoringHistorica assertThat(blockBlobSidecarsTracker.getBlobSidecars().values()) .containsExactlyInAnyOrderElementsOf(blobSidecars); assertThat(blockBlobSidecarsTracker.getBlock()).isPresent(); - assertThat(blockBlobSidecarsTracker.isRpcFetchTriggered()).isFalse(); + assertThat(blockBlobSidecarsTracker.isRpcBlockFetchTriggered()).isFalse(); assertThatSafeFuture(blockBlobSidecarsTracker.getCompletionFuture()).isCompleted(); assertBlobSidecarsCount(blobSidecars.size()); @@ -493,7 +496,7 @@ public void onCompletedBlockAndBlobSidecars_shouldNotTriggerFetch() { assertThat(blockBlobSidecarsTracker.getBlobSidecars().values()) .containsExactlyInAnyOrderElementsOf(blobSidecars); assertThat(blockBlobSidecarsTracker.getBlock()).isPresent(); - assertThat(blockBlobSidecarsTracker.isRpcFetchTriggered()).isFalse(); + assertThat(blockBlobSidecarsTracker.isRpcBlockFetchTriggered()).isFalse(); assertThatSafeFuture(blockBlobSidecarsTracker.getCompletionFuture()).isNotCompleted(); assertBlobSidecarsCount(0); @@ -511,7 +514,7 @@ public void onCompletedBlockAndBlobSidecars_shouldNotTriggerFetch() { blockBlobSidecarsTrackersPool.getOrCreateBlockBlobSidecarsTracker(block); assertThat(blockBlobSidecarsTracker.getBlock()).isPresent(); - assertThat(blockBlobSidecarsTracker.isRpcFetchTriggered()).isFalse(); + assertThat(blockBlobSidecarsTracker.isRpcBlockFetchTriggered()).isFalse(); assertThatSafeFuture(blockBlobSidecarsTracker.getCompletionFuture()).isNotCompleted(); assertBlobSidecarsCount(0); @@ -671,7 +674,8 @@ void shouldFetchMissingBlobSidecarsFromLocalELFirst() { (slotAndRoot) -> { when(tracker.add(any())).thenReturn(true); when(tracker.getMissingBlobSidecars()) - .thenAnswer(__ -> missingBlobIdentifiers.stream()); + .thenAnswer(__ -> missingBlobIdentifiers.stream()) + .thenAnswer(__ -> Stream.empty()); when(tracker.getBlock()).thenReturn(Optional.of(block)); return tracker; }); @@ -694,7 +698,7 @@ void shouldFetchMissingBlobSidecarsFromLocalELFirst() { assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); // local el fetch triggered - verify(tracker).setLocalElFetchTriggered(); + verify(tracker).setLocalElBlobsFetchTriggered(); // prepare partial response of 3 blobAndProofs final List> blobAndProofsFromEL = @@ -794,7 +798,7 @@ void shouldFetchMissingBlobSidecarsViaRPCWhenELLookupFails() { } @Test - void shouldFetchMissingBlockAndBlobSidecars() { + void shouldFetchMissingBlock() { final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(currentSlot); final BlobSidecar blobSidecar = dataStructureUtil @@ -803,14 +807,8 @@ void shouldFetchMissingBlockAndBlobSidecars() { .index(UInt64.valueOf(2)) .build(); - final Set missingBlobs = - Set.of( - new BlobIdentifier(block.getRoot(), UInt64.ONE), - new BlobIdentifier(block.getRoot(), UInt64.ZERO)); - final BlockBlobSidecarsTracker mockedTracker = mock(BlockBlobSidecarsTracker.class); when(mockedTracker.getBlock()).thenReturn(Optional.empty()); - when(mockedTracker.getMissingBlobSidecars()).thenReturn(missingBlobs.stream()); when(mockedTracker.getSlotAndBlockRoot()).thenReturn(block.getSlotAndBlockRoot()); mockedTrackersFactory = Optional.of((__) -> mockedTracker); @@ -821,91 +819,14 @@ void shouldFetchMissingBlockAndBlobSidecars() { asyncRunner.executeQueuedActions(); - verify(mockedTracker).setRpcFetchTriggered(); - verify(mockedTracker, never()).setLocalElFetchTriggered(); + verify(mockedTracker).setRpcBlockFetchTriggered(); + verify(mockedTracker, never()).setLocalElBlobsFetchTriggered(); assertThat(requiredBlockRootEvents).containsExactly(block.getRoot()); - assertThat(requiredBlobSidecarEvents).containsExactlyElementsOf(missingBlobs); assertStats("block", "rpc_fetch", 1); - assertStats("blob_sidecar", "rpc_fetch", missingBlobs.size()); assertThat(requiredBlockRootDroppedEvents).isEmpty(); - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); - } - - @Test - void shouldDropBlobSidecarsThatHasBeenFetchedButNotPresentInBlock() { - final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(currentSlot); - - final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, block.getRoot()); - final BlobSidecar blobSidecar = - dataStructureUtil - .createRandomBlobSidecarBuilder() - .signedBeaconBlockHeader(block.asHeader()) - .index(UInt64.valueOf(2)) - .build(); - - final Set blobsNotPresentInBlock = - Set.of( - new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), UInt64.valueOf(2)), - new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), UInt64.valueOf(3))); - - mockedTrackersFactory = - Optional.of( - (slotAndRoot) -> { - BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getBlock()).thenReturn(Optional.empty()); - when(tracker.getSlotAndBlockRoot()).thenReturn(slotAndBlockRoot); - when(tracker.setBlock(block)).thenReturn(true); - when(tracker.isRpcFetchTriggered()).thenReturn(true); - when(tracker.getUnusedBlobSidecarsForBlock()) - .thenReturn(blobsNotPresentInBlock.stream()); - return tracker; - }); - - blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar, RemoteOrigin.GOSSIP); - - blockBlobSidecarsTrackersPool.onNewBlock(block, Optional.empty()); - - assertThat(requiredBlobSidecarDroppedEvents).containsExactlyElementsOf(blobsNotPresentInBlock); - } - - @Test - void shouldNotDropUnusedBlobSidecarsIfFetchingHasNotOccurred() { - final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(currentSlot); - - final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, block.getRoot()); - final BlobSidecar blobSidecar = - dataStructureUtil - .createRandomBlobSidecarBuilder() - .signedBeaconBlockHeader(block.asHeader()) - .index(UInt64.valueOf(2)) - .build(); - - final Set blobsNotUserInBlock = - Set.of( - new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), UInt64.valueOf(2)), - new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), UInt64.valueOf(3))); - - mockedTrackersFactory = - Optional.of( - (slotAndRoot) -> { - BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getBlock()).thenReturn(Optional.empty()); - when(tracker.getSlotAndBlockRoot()).thenReturn(slotAndBlockRoot); - when(tracker.setBlock(block)).thenReturn(true); - when(tracker.isRpcFetchTriggered()).thenReturn(false); - when(tracker.getUnusedBlobSidecarsForBlock()) - .thenReturn(blobsNotUserInBlock.stream()); - return tracker; - }); - - blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar, RemoteOrigin.GOSSIP); - - blockBlobSidecarsTrackersPool.onNewBlock(block, Optional.empty()); - - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); } @Test @@ -945,10 +866,10 @@ void shouldDropPossiblyFetchedBlobSidecars() { Optional.of( (slotAndRoot) -> { BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getMissingBlobSidecars()).thenReturn(missingBlobs.stream()); + when(tracker.getMissingBlobSidecars()).thenAnswer(__ -> missingBlobs.stream()); when(tracker.getBlock()).thenReturn(Optional.of(block)); when(tracker.getSlotAndBlockRoot()).thenReturn(block.getSlotAndBlockRoot()); - when(tracker.isRpcFetchTriggered()).thenReturn(true); + when(tracker.isRpcBlobsFetchTriggered()).thenReturn(true); return tracker; }); @@ -967,7 +888,7 @@ void shouldDropPossiblyFetchedBlobSidecars() { } @Test - void shouldTryToFetchFromLocalELWhenBlockArrivesAfterRPCFetch() { + void shouldTryToFetchBlobSidecarsWhenBlockArrivesAfterRPCFetch() { final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(currentSlot); final Set missingBlobs = @@ -991,8 +912,8 @@ void shouldTryToFetchFromLocalELWhenBlockArrivesAfterRPCFetch() { when(tracker.getBlock()).thenReturn(Optional.empty()); when(tracker.setBlock(any())).thenReturn(true); when(tracker.getSlotAndBlockRoot()).thenReturn(block.getSlotAndBlockRoot()); - when(tracker.isRpcFetchTriggered()).thenReturn(true); - when(tracker.isLocalElFetchTriggered()).thenReturn(false); + when(tracker.isRpcBlockFetchTriggered()).thenReturn(true); + when(tracker.isLocalElBlobsFetchTriggered()).thenReturn(true); return tracker; }); @@ -1001,7 +922,7 @@ void shouldTryToFetchFromLocalELWhenBlockArrivesAfterRPCFetch() { assertThat(asyncRunner.hasDelayedActions()).isTrue(); asyncRunner.executeQueuedActions(); - verify(tracker, never()).setLocalElFetchTriggered(); + verify(tracker, never()).setLocalElBlobsFetchTriggered(); when(tracker.getBlock()).thenReturn(Optional.of(block)); @@ -1011,7 +932,10 @@ void shouldTryToFetchFromLocalELWhenBlockArrivesAfterRPCFetch() { blockBlobSidecarsTrackersPool.onNewBlock(block, Optional.empty()); - verify(tracker).setLocalElFetchTriggered(); + assertThat(asyncRunner.hasDelayedActions()).isTrue(); + asyncRunner.executeQueuedActions(); + + verify(tracker).setLocalElBlobsFetchTriggered(); verify(executionLayer).engineGetBlobs(any(), any()); } @@ -1032,7 +956,7 @@ void shouldDropPossiblyFetchedBlock() { when(tracker.getBlock()).thenReturn(Optional.empty()); when(tracker.getSlotAndBlockRoot()) .thenReturn(signedBeaconBlock.getSlotAndBlockRoot()); - when(tracker.isRpcFetchTriggered()).thenReturn(true); + when(tracker.isRpcBlockFetchTriggered()).thenReturn(true); return tracker; }); @@ -1067,7 +991,7 @@ void shouldNotDropPossiblyFetchedBlockIfFetchHasNotOccurred() { when(tracker.getBlock()).thenReturn(Optional.empty()); when(tracker.getSlotAndBlockRoot()) .thenReturn(signedBeaconBlock.getSlotAndBlockRoot()); - when(tracker.isRpcFetchTriggered()).thenReturn(false); + when(tracker.isRpcBlockFetchTriggered()).thenReturn(false); return tracker; }); @@ -1093,14 +1017,15 @@ void shouldRespectTargetWhenBlockIsEarly() { // blocks arrives at slot start timeProvider.advanceTimeBySeconds(startSlotInSeconds.longValue()); - final Duration fetchDelay = blockBlobSidecarsTrackersPool.calculateFetchDelay(slotAndBlockRoot); + final Duration fetchDelay = + blockBlobSidecarsTrackersPool.calculateRpcFetchDelay(slotAndBlockRoot); // we can wait the full target assertThat(fetchDelay).isEqualTo(Duration.ofMillis(TARGET_WAIT_MILLIS.longValue())); } @Test - void calculateFetchDelay_shouldRespectMinimumWhenBlockIsLate() { + void calculateBlockFetchDelay_shouldRespectMinimumWhenRpcIsLate() { final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32()); @@ -1112,14 +1037,15 @@ void calculateFetchDelay_shouldRespectMinimumWhenBlockIsLate() { // blocks arrives 200ms before attestation due timeProvider.advanceTimeByMillis(startSlotInMillis.plus(3_800).longValue()); - final Duration fetchDelay = blockBlobSidecarsTrackersPool.calculateFetchDelay(slotAndBlockRoot); + final Duration fetchDelay = + blockBlobSidecarsTrackersPool.calculateRpcFetchDelay(slotAndBlockRoot); // we can wait the full target assertThat(fetchDelay).isEqualTo(Duration.ofMillis(MIN_WAIT_MILLIS.longValue())); } @Test - void calculateFetchDelay_shouldRespectTargetWhenBlockIsVeryLate() { + void calculateBlockFetchDelay_shouldRespectTargetWhenRpcIsVeryLate() { final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32()); @@ -1130,14 +1056,15 @@ void calculateFetchDelay_shouldRespectTargetWhenBlockIsVeryLate() { // blocks arrives 1s after attestation due timeProvider.advanceTimeBySeconds(startSlotInSeconds.plus(5).longValue()); - final Duration fetchDelay = blockBlobSidecarsTrackersPool.calculateFetchDelay(slotAndBlockRoot); + final Duration fetchDelay = + blockBlobSidecarsTrackersPool.calculateRpcFetchDelay(slotAndBlockRoot); // we can wait the full target assertThat(fetchDelay).isEqualTo(Duration.ofMillis(TARGET_WAIT_MILLIS.longValue())); } @Test - void calculateFetchDelay_shouldRespectAttestationDueLimit() { + void calculateRpcFetchDelay_shouldRespectAttestationDueLimit() { final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32()); @@ -1157,7 +1084,8 @@ void calculateFetchDelay_shouldRespectAttestationDueLimit() { timeProvider.advanceTimeByMillis(blockArrivalTimeMillis.longValue()); - final Duration fetchDelay = blockBlobSidecarsTrackersPool.calculateFetchDelay(slotAndBlockRoot); + final Duration fetchDelay = + blockBlobSidecarsTrackersPool.calculateRpcFetchDelay(slotAndBlockRoot); // we can only wait 200ms less than target assertThat(fetchDelay) @@ -1166,11 +1094,12 @@ void calculateFetchDelay_shouldRespectAttestationDueLimit() { } @Test - void calculateFetchDelay_shouldReturnZeroIfSlotIsOld() { + void calculateRpcFetchDelay_shouldReturnZeroIfSlotIsOld() { final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot.minus(1), dataStructureUtil.randomBytes32()); - final Duration fetchDelay = blockBlobSidecarsTrackersPool.calculateFetchDelay(slotAndBlockRoot); + final Duration fetchDelay = + blockBlobSidecarsTrackersPool.calculateRpcFetchDelay(slotAndBlockRoot); assertThat(fetchDelay).isEqualTo(Duration.ZERO); } @@ -1400,8 +1329,6 @@ private BlockBlobSidecarsTracker trackerFactory(final SlotAndBlockRoot slotAndBl if (mockedTrackersFactory.isPresent()) { return mockedTrackersFactory.get().apply(slotAndBlockRoot); } - return new BlockBlobSidecarsTracker( - slotAndBlockRoot, - UInt64.valueOf(spec.getMaxBlobsPerBlockForHighestMilestone().orElseThrow())); + return new BlockBlobSidecarsTracker(slotAndBlockRoot); } }