From 639bf146d3bdb7f21285e2f8cac6f1fd3ce7aba3 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Mon, 16 Dec 2024 16:21:00 +0000 Subject: [PATCH 01/14] Remove attempt to recover blobs when we don't have block --- .../deneb/helpers/MiscHelpersDeneb.java | 24 +++ .../blobs/BlockBlobSidecarsTracker.java | 101 ++++++------ .../BlockBlobSidecarsTrackersPoolImpl.java | 146 ++++++------------ .../blobs/BlockBlobSidecarsTrackerTest.java | 105 +++---------- ...BlockBlobSidecarsTrackersPoolImplTest.java | 57 ++++--- 5 files changed, 175 insertions(+), 258 deletions(-) diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/versions/deneb/helpers/MiscHelpersDeneb.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/versions/deneb/helpers/MiscHelpersDeneb.java index ebf9e188b45..bea7c222df7 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/versions/deneb/helpers/MiscHelpersDeneb.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/versions/deneb/helpers/MiscHelpersDeneb.java @@ -37,8 +37,12 @@ import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarSchema; import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlockHeader; import tech.pegasys.teku.spec.datastructures.blocks.blockbody.BeaconBlockBody; +import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.BeaconBlockBodyDeneb; import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.BeaconBlockBodySchemaDeneb; +import tech.pegasys.teku.spec.datastructures.execution.BlobAndProof; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; import tech.pegasys.teku.spec.datastructures.type.SszKZGCommitment; import tech.pegasys.teku.spec.datastructures.type.SszKZGProof; import tech.pegasys.teku.spec.logic.common.helpers.MiscHelpers; @@ -266,6 +270,26 @@ public BlobSidecar constructBlobSidecar( index, blob, commitment, proof, signedBeaconBlock.asHeader(), kzgCommitmentInclusionProof); } + public BlobSidecar constructBlobSidecarFromBlobAndProof( + final BlobIdentifier blobIdentifier, + final BlobAndProof blobAndProof, + final BeaconBlockBodyDeneb beaconBlockBodyDeneb, + final SignedBeaconBlockHeader signedBeaconBlockHeader) { + + final SszKZGCommitment sszKZGCommitment = + beaconBlockBodyDeneb.getBlobKzgCommitments().get(blobIdentifier.getIndex().intValue()); + + return blobSidecarSchema.create( + blobIdentifier.getIndex(), + blobAndProof.blob(), + sszKZGCommitment, + new SszKZGProof(blobAndProof.proof()), + signedBeaconBlockHeader, + computeKzgCommitmentInclusionProof(blobIdentifier.getIndex(), beaconBlockBodyDeneb)); + + + } + public boolean verifyBlobKzgCommitmentInclusionProof(final BlobSidecar blobSidecar) { if (blobSidecar.isKzgCommitmentInclusionProofValidated()) { return true; diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java index 64c89d91ba1..6d3c9f95d21 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java @@ -40,13 +40,15 @@ public class BlockBlobSidecarsTracker { private static final Logger LOG = LogManager.getLogger(); + private static final UInt64 CREATION_TIMING_IDX = UInt64.MAX_VALUE; private static final UInt64 BLOCK_ARRIVAL_TIMING_IDX = CREATION_TIMING_IDX.decrement(); - private static final UInt64 RPC_FETCH_TIMING_IDX = BLOCK_ARRIVAL_TIMING_IDX.decrement(); - private static final UInt64 LOCAL_EL_FETCH_TIMING_IDX = RPC_FETCH_TIMING_IDX.decrement(); + private static final UInt64 RPC_BLOCK_FETCH_TIMING_IDX = BLOCK_ARRIVAL_TIMING_IDX.decrement(); + private static final UInt64 RPC_BLOBS_FETCH_TIMING_IDX = RPC_BLOCK_FETCH_TIMING_IDX.decrement(); + private static final UInt64 LOCAL_EL_BLOBS_FETCH_TIMING_IDX = + RPC_BLOBS_FETCH_TIMING_IDX.decrement(); private final SlotAndBlockRoot slotAndBlockRoot; - private final UInt64 maxBlobsPerBlock; private final AtomicReference> block = new AtomicReference<>(Optional.empty()); @@ -56,8 +58,9 @@ public class BlockBlobSidecarsTracker { private final NavigableMap blobSidecars = new ConcurrentSkipListMap<>(); private final SafeFuture blobSidecarsComplete = new SafeFuture<>(); - private volatile boolean rpcFetchTriggered = false; - private volatile boolean localElFetchTriggered = false; + private volatile boolean localElBlobsFetchTriggered = false; + private volatile boolean rpcBlockFetchTriggered = false; + private volatile boolean rpcBlobsFetchTriggered = false; private final Optional> maybeDebugTimings; @@ -69,12 +72,9 @@ public class BlockBlobSidecarsTracker { * tracker instance will be used, so no synchronization is required * * @param slotAndBlockRoot slot and block root to create tracker for - * @param maxBlobsPerBlock max number of blobs per block for the slot */ - public BlockBlobSidecarsTracker( - final SlotAndBlockRoot slotAndBlockRoot, final UInt64 maxBlobsPerBlock) { + public BlockBlobSidecarsTracker(final SlotAndBlockRoot slotAndBlockRoot) { this.slotAndBlockRoot = slotAndBlockRoot; - this.maxBlobsPerBlock = maxBlobsPerBlock; if (LOG.isDebugEnabled()) { // don't need a concurrent hashmap since we'll interact with it from synchronized BlobSidecar // pool methods @@ -110,30 +110,12 @@ public Optional getBlobSidecar(final UInt64 index) { return Optional.ofNullable(blobSidecars.get(index)); } - public Stream getMissingBlobSidecars() { - final Optional blockCommitmentsCount = getBlockKzgCommitmentsCount(); - if (blockCommitmentsCount.isPresent()) { - return UInt64.range(UInt64.ZERO, UInt64.valueOf(blockCommitmentsCount.get())) - .filter(blobIndex -> !blobSidecars.containsKey(blobIndex)) - .map(blobIndex -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), blobIndex)); - } - - if (blobSidecars.isEmpty()) { - return Stream.of(); - } - - // We may return maxBlobsPerBlock because we don't know the block - return UInt64.range(UInt64.ZERO, maxBlobsPerBlock) - .filter(blobIndex -> !blobSidecars.containsKey(blobIndex)) - .map(blobIndex -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), blobIndex)); - } - - public Stream getUnusedBlobSidecarsForBlock() { + public Stream getMissingBlobSidecarsForBlock() { final Optional blockCommitmentsCount = getBlockKzgCommitmentsCount(); checkState(blockCommitmentsCount.isPresent(), "Block must me known to call this method"); - final UInt64 firstUnusedIndex = UInt64.valueOf(blockCommitmentsCount.get()); - return UInt64.range(firstUnusedIndex, maxBlobsPerBlock) + return UInt64.range(UInt64.ZERO, UInt64.valueOf(blockCommitmentsCount.get())) + .filter(blobIndex -> !blobSidecars.containsKey(blobIndex)) .map(blobIndex -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), blobIndex)); } @@ -247,24 +229,35 @@ public boolean isComplete() { return blobSidecarsComplete.isDone(); } - public boolean isRpcFetchTriggered() { - return rpcFetchTriggered; + public boolean isLocalElBlobsFetchTriggered() { + return localElBlobsFetchTriggered; } - public void setRpcFetchTriggered() { - this.rpcFetchTriggered = true; + public void setLocalElBlobsFetchTriggered() { + this.localElBlobsFetchTriggered = true; maybeDebugTimings.ifPresent( - debugTimings -> debugTimings.put(RPC_FETCH_TIMING_IDX, System.currentTimeMillis())); + debugTimings -> + debugTimings.put(LOCAL_EL_BLOBS_FETCH_TIMING_IDX, System.currentTimeMillis())); } - public boolean isLocalElFetchTriggered() { - return localElFetchTriggered; + public boolean isRpcBlockFetchTriggered() { + return rpcBlockFetchTriggered; } - public void setLocalElFetchTriggered() { - this.localElFetchTriggered = true; + public void setRpcBlockFetchTriggered() { + this.rpcBlockFetchTriggered = true; maybeDebugTimings.ifPresent( - debugTimings -> debugTimings.put(LOCAL_EL_FETCH_TIMING_IDX, System.currentTimeMillis())); + debugTimings -> debugTimings.put(RPC_BLOCK_FETCH_TIMING_IDX, System.currentTimeMillis())); + } + + public boolean isRpcBlobsFetchTriggered() { + return rpcBlobsFetchTriggered; + } + + public void setRpcBlobsFetchTriggered() { + this.rpcBlobsFetchTriggered = true; + maybeDebugTimings.ifPresent( + debugTimings -> debugTimings.put(RPC_BLOBS_FETCH_TIMING_IDX, System.currentTimeMillis())); } private boolean areBlobsComplete() { @@ -315,22 +308,31 @@ private void printDebugTimings(final Map debugTimings) { .append(debugTimings.getOrDefault(BLOCK_ARRIVAL_TIMING_IDX, 0L) - creationTime) .append("ms - "); - if (debugTimings.containsKey(LOCAL_EL_FETCH_TIMING_IDX)) { + if (debugTimings.containsKey(LOCAL_EL_BLOBS_FETCH_TIMING_IDX)) { timingsReport - .append("Local EL fetch delay ") - .append(debugTimings.get(LOCAL_EL_FETCH_TIMING_IDX) - creationTime) + .append("Local EL blobs fetch delay ") + .append(debugTimings.get(LOCAL_EL_BLOBS_FETCH_TIMING_IDX) - creationTime) .append("ms - "); } else { timingsReport.append("Local EL fetch wasn't required - "); } - if (debugTimings.containsKey(RPC_FETCH_TIMING_IDX)) { + if (debugTimings.containsKey(RPC_BLOCK_FETCH_TIMING_IDX)) { + timingsReport + .append("RPC block fetch delay ") + .append(debugTimings.get(RPC_BLOCK_FETCH_TIMING_IDX) - creationTime) + .append("ms"); + } else { + timingsReport.append("RPC block fetch wasn't required"); + } + + if (debugTimings.containsKey(RPC_BLOBS_FETCH_TIMING_IDX)) { timingsReport - .append("RPC fetch delay ") - .append(debugTimings.get(RPC_FETCH_TIMING_IDX) - creationTime) + .append("RPC blobs fetch delay ") + .append(debugTimings.get(RPC_BLOBS_FETCH_TIMING_IDX) - creationTime) .append("ms"); } else { - timingsReport.append("RPC fetch wasn't required"); + timingsReport.append("RPC blobs fetch wasn't required"); } LOG.debug(timingsReport.toString()); @@ -342,8 +344,9 @@ public String toString() { .add("slotAndBlockRoot", slotAndBlockRoot) .add("isBlockPresent", block.get().isPresent()) .add("isComplete", isComplete()) - .add("rpcFetchTriggered", rpcFetchTriggered) - .add("localElFetchTriggered", localElFetchTriggered) + .add("localElBlobsFetchTriggered", localElBlobsFetchTriggered) + .add("rpcBlockFetchTriggered", rpcBlockFetchTriggered) + .add("rpcBlobsFetchTriggered", rpcBlobsFetchTriggered) .add("blockImportOnCompletionEnabled", blockImportOnCompletionEnabled.get()) .add( "blobSidecars", diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index 0bfb291d0e2..d4aedb221fd 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -48,9 +48,7 @@ import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.SpecVersion; -import tech.pegasys.teku.spec.config.SpecConfigDeneb; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; -import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarSchema; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlockHeader; import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; @@ -58,7 +56,6 @@ import tech.pegasys.teku.spec.datastructures.execution.BlobAndProof; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; import tech.pegasys.teku.spec.datastructures.type.SszKZGCommitment; -import tech.pegasys.teku.spec.datastructures.type.SszKZGProof; import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel; import tech.pegasys.teku.spec.logic.versions.deneb.helpers.MiscHelpersDeneb; import tech.pegasys.teku.spec.logic.versions.deneb.types.VersionedHash; @@ -154,7 +151,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis this.maxTrackers = maxTrackers; this.sizeGauge = sizeGauge; this.poolStatsCounters = poolStatsCounters; - this.trackerFactory = (slotAndBlockRoot) -> createTracker(spec, slotAndBlockRoot); + this.trackerFactory = BlockBlobSidecarsTracker::new; initMetrics(sizeGauge, poolStatsCounters); } @@ -209,15 +206,6 @@ private static void initMetrics( }); } - private static BlockBlobSidecarsTracker createTracker( - final Spec spec, final SlotAndBlockRoot slotAndBlockRoot) { - return new BlockBlobSidecarsTracker( - slotAndBlockRoot, - UInt64.valueOf( - SpecConfigDeneb.required(spec.atSlot(slotAndBlockRoot.getSlot()).getConfig()) - .getMaxBlobsPerBlock())); - } - @Override public synchronized void onNewBlobSidecar( final BlobSidecar blobSidecar, final RemoteOrigin remoteOrigin) { @@ -344,7 +332,7 @@ public synchronized void onCompletedBlockAndBlobSidecars( LOG.error( "Tracker for block {} is supposed to be completed but it is not. Missing blob sidecars: {}", block.toLogString(), - blobSidecarsTracker.getMissingBlobSidecars().count()); + blobSidecarsTracker.getMissingBlobSidecarsForBlock().count()); } if (orderedBlobSidecarsTrackers.add(slotAndBlockRoot)) { @@ -427,7 +415,13 @@ public synchronized Optional getBlock(final Bytes32 blockRoot @Override public synchronized Set getAllRequiredBlobSidecars() { return blockBlobSidecarsTrackers.values().stream() - .flatMap(BlockBlobSidecarsTracker::getMissingBlobSidecars) + .flatMap( + tracker -> { + if (tracker.getBlock().isEmpty()) { + return Stream.empty(); + } + return tracker.getMissingBlobSidecarsForBlock(); + }) .collect(Collectors.toSet()); } @@ -495,24 +489,12 @@ private BlockBlobSidecarsTracker internalOnNewBlock( countBlock(remoteOrigin); - if (existingTracker.isRpcFetchTriggered()) { - // block has been set for the first time and we previously triggered fetching of - // missing blobSidecars. So we may have requested to fetch more sidecars - // than the block actually requires. Let's drop them. - existingTracker - .getUnusedBlobSidecarsForBlock() - .forEach( - blobIdentifier -> - requiredBlobSidecarDroppedSubscribers.deliver( - RequiredBlobSidecarDroppedSubscriber::onRequiredBlobSidecarDropped, - blobIdentifier)); - - // if we attempted to fetch via RPC, we missed the opportunity to complete the - // tracker via local EL (local El fetch requires the block to be known) - // Let's try now - if (!existingTracker.isLocalElFetchTriggered() && !existingTracker.isComplete()) { - fetchMissingContentFromLocalEL(slotAndBlockRoot) - .finish(this::logLocalElBlobsLookupFailure); + if (existingTracker.isRpcBlockFetchTriggered()) { + // if we attempted to fetch this block via RPC, we missed the opportunity to + // complete the blob sidecars via local EL and RPC (since the block is required to + // be known) Let's try now + if (!existingTracker.isCompleted()) { + fetchMissingContent(slotAndBlockRoot).finish(this::logMissingContentFetchFailure); } } }); @@ -595,21 +577,24 @@ private void onFirstSeen( final Duration fetchDelay = calculateFetchDelay(slotAndBlockRoot); asyncRunner - .runAfterDelay( - () -> - this.fetchMissingContentFromLocalEL(slotAndBlockRoot) - .handleException(this::logLocalElBlobsLookupFailure) - .thenRun(() -> this.fetchMissingContentFromRemotePeers(slotAndBlockRoot)), - fetchDelay) - .finish( - error -> - LOG.error("An error occurred while attempting to fetch missing blobs.", error)); + .runAfterDelay(() -> fetchMissingContent(slotAndBlockRoot), fetchDelay) + .finish(this::logMissingContentFetchFailure); + } + + private SafeFuture fetchMissingContent(final SlotAndBlockRoot slotAndBlockRoot) { + return fetchMissingBlobsFromLocalEL(slotAndBlockRoot) + .handleException(this::logLocalElBlobsLookupFailure) + .thenRun(() -> fetchMissingContentFromRemotePeers(slotAndBlockRoot)); } private void logLocalElBlobsLookupFailure(final Throwable error) { LOG.warn("Local EL blobs lookup failed: {}", getRootCauseMessage(error)); } + private void logMissingContentFetchFailure(final Throwable error) { + LOG.error("An error occurred while attempting to fetch missing content.", error); + } + @VisibleForTesting Duration calculateFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) { final UInt64 slot = slotAndBlockRoot.getSlot(); @@ -641,28 +626,21 @@ Duration calculateFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) { return Duration.ofMillis(finalTime.minus(nowMillis).intValue()); } - private synchronized SafeFuture fetchMissingContentFromLocalEL( + private synchronized SafeFuture fetchMissingBlobsFromLocalEL( final SlotAndBlockRoot slotAndBlockRoot) { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = blockBlobSidecarsTrackers.get(slotAndBlockRoot.getBlockRoot()); - if (blockBlobSidecarsTracker == null) { + if (blockBlobSidecarsTracker == null + || blockBlobSidecarsTracker.isComplete() + || blockBlobSidecarsTracker.getBlock().isEmpty()) { return SafeFuture.COMPLETE; } - if (blockBlobSidecarsTracker.isComplete()) { - return SafeFuture.COMPLETE; - } - - if (blockBlobSidecarsTracker.getBlock().isEmpty()) { - return SafeFuture.COMPLETE; - } - - blockBlobSidecarsTracker.setLocalElFetchTriggered(); + final List missingBlobsIdentifiers = + blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock().toList(); final SpecVersion specVersion = spec.atSlot(slotAndBlockRoot.getSlot()); - final BlobSidecarSchema blobSidecarSchema = - specVersion.getSchemaDefinitions().toVersionDeneb().orElseThrow().getBlobSidecarSchema(); final MiscHelpersDeneb miscHelpersDeneb = specVersion.miscHelpers().toVersionDeneb().orElseThrow(); final SignedBeaconBlockHeader signedBeaconBlockHeader = @@ -679,9 +657,6 @@ private synchronized SafeFuture fetchMissingContentFromLocalEL( final SszList sszKZGCommitments = beaconBlockBodyDeneb.getBlobKzgCommitments(); - final List missingBlobsIdentifiers = - blockBlobSidecarsTracker.getMissingBlobSidecars().toList(); - final List versionedHashes = missingBlobsIdentifiers.stream() .map( @@ -692,6 +667,8 @@ private synchronized SafeFuture fetchMissingContentFromLocalEL( .getKZGCommitment())) .toList(); + blockBlobSidecarsTracker.setLocalElBlobsFetchTriggered(); + poolStatsCounters .labels(COUNTER_SIDECAR_TYPE, COUNTER_LOCAL_EL_FETCH_SUBTYPE) .inc(versionedHashes.size()); @@ -714,9 +691,7 @@ private synchronized SafeFuture fetchMissingContentFromLocalEL( } final BlobSidecar blobSidecar = - createBlobSidecarFromBlobAndProof( - blobSidecarSchema, - miscHelpersDeneb, + miscHelpersDeneb.constructBlobSidecarFromBlobAndProof( missingBlobsIdentifiers.get(index), blobAndProof.get(), beaconBlockBodyDeneb, @@ -726,59 +701,30 @@ private synchronized SafeFuture fetchMissingContentFromLocalEL( }); } - private BlobSidecar createBlobSidecarFromBlobAndProof( - final BlobSidecarSchema blobSidecarSchema, - final MiscHelpersDeneb miscHelpersDeneb, - final BlobIdentifier blobIdentifier, - final BlobAndProof blobAndProof, - final BeaconBlockBodyDeneb beaconBlockBodyDeneb, - final SignedBeaconBlockHeader signedBeaconBlockHeader) { - - final SszKZGCommitment sszKZGCommitment = - beaconBlockBodyDeneb.getBlobKzgCommitments().get(blobIdentifier.getIndex().intValue()); - - final BlobSidecar blobSidecar = - blobSidecarSchema.create( - blobIdentifier.getIndex(), - blobAndProof.blob(), - sszKZGCommitment, - new SszKZGProof(blobAndProof.proof()), - signedBeaconBlockHeader, - miscHelpersDeneb.computeKzgCommitmentInclusionProof( - blobIdentifier.getIndex(), beaconBlockBodyDeneb)); - - blobSidecar.markSignatureAsValidated(); - blobSidecar.markKzgCommitmentInclusionProofAsValidated(); - // assume kzg validation done by local EL - blobSidecar.markKzgAsValidated(); - - return blobSidecar; - } - private synchronized void fetchMissingContentFromRemotePeers( final SlotAndBlockRoot slotAndBlockRoot) { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = blockBlobSidecarsTrackers.get(slotAndBlockRoot.getBlockRoot()); - if (blockBlobSidecarsTracker == null) { + if (blockBlobSidecarsTracker == null || blockBlobSidecarsTracker.isComplete()) { return; } - if (blockBlobSidecarsTracker.isComplete()) { - return; - } + if (blockBlobSidecarsTracker.getBlock().isEmpty()) { - blockBlobSidecarsTracker.setRpcFetchTriggered(); + blockBlobSidecarsTracker.setRpcBlockFetchTriggered(); - if (blockBlobSidecarsTracker.getBlock().isEmpty()) { poolStatsCounters.labels(COUNTER_BLOCK_TYPE, COUNTER_RPC_FETCH_SUBTYPE).inc(); requiredBlockRootSubscribers.deliver( RequiredBlockRootSubscriber::onRequiredBlockRoot, blockBlobSidecarsTracker.getSlotAndBlockRoot().getBlockRoot()); + return; } + blockBlobSidecarsTracker.setRpcBlobsFetchTriggered(); + blockBlobSidecarsTracker - .getMissingBlobSidecars() + .getMissingBlobSidecarsForBlock() .forEach( blobIdentifier -> { poolStatsCounters.labels(COUNTER_SIDECAR_TYPE, COUNTER_RPC_FETCH_SUBTYPE).inc(); @@ -789,7 +735,8 @@ private synchronized void fetchMissingContentFromRemotePeers( private void dropMissingContent(final BlockBlobSidecarsTracker blockBlobSidecarsTracker) { - if (!blockBlobSidecarsTracker.isRpcFetchTriggered()) { + if (!blockBlobSidecarsTracker.isRpcBlockFetchTriggered() + && !blockBlobSidecarsTracker.isRpcBlobsFetchTriggered()) { return; } @@ -797,10 +744,11 @@ private void dropMissingContent(final BlockBlobSidecarsTracker blockBlobSidecars requiredBlockRootDroppedSubscribers.deliver( RequiredBlockRootDroppedSubscriber::onRequiredBlockRootDropped, blockBlobSidecarsTracker.getSlotAndBlockRoot().getBlockRoot()); + return; } blockBlobSidecarsTracker - .getMissingBlobSidecars() + .getMissingBlobSidecarsForBlock() .forEach( blobIdentifier -> requiredBlobSidecarDroppedSubscribers.deliver( diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTrackerTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTrackerTest.java index 275fb2fcada..4e239efc967 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTrackerTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTrackerTest.java @@ -65,11 +65,11 @@ public class BlockBlobSidecarsTrackerTest { @Test void isNotCompletedJustAfterCreation() { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); SafeFutureAssert.assertThatSafeFuture(blockBlobSidecarsTracker.getCompletionFuture()) .isNotCompleted(); - assertThat(blockBlobSidecarsTracker.getMissingBlobSidecars()).isEmpty(); + assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock()).isEmpty(); assertThat(blockBlobSidecarsTracker.getBlock()).isEmpty(); assertThat(blockBlobSidecarsTracker.getBlobSidecars()).isEmpty(); assertThat(blockBlobSidecarsTracker.getSlotAndBlockRoot()).isEqualTo(slotAndBlockRoot); @@ -81,12 +81,12 @@ void isNotCompletedJustAfterCreation() { @Test void setBlock_shouldAcceptCorrectBlock() { BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); blockBlobSidecarsTracker.setBlock(block); SafeFutureAssert.assertThatSafeFuture(blockBlobSidecarsTracker.getCompletionFuture()) .isNotCompleted(); - assertThat(blockBlobSidecarsTracker.getMissingBlobSidecars()) + assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock()) .containsExactlyInAnyOrderElementsOf(blobIdentifiersForBlock); assertThat(blockBlobSidecarsTracker.getBlock()).isEqualTo(Optional.of(block)); assertThat(blockBlobSidecarsTracker.getBlobSidecars()).isEmpty(); @@ -95,7 +95,7 @@ void setBlock_shouldAcceptCorrectBlock() { @Test void setBlock_shouldThrowWithWrongBlock() { BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(dataStructureUtil.randomSlotAndBlockRoot(), maxBlobsPerBlock); + new BlockBlobSidecarsTracker(dataStructureUtil.randomSlotAndBlockRoot()); assertThatThrownBy(() -> blockBlobSidecarsTracker.setBlock(block)) .isInstanceOf(IllegalArgumentException.class); } @@ -103,7 +103,7 @@ void setBlock_shouldThrowWithWrongBlock() { @Test void setBlock_shouldAcceptBlockTwice() { BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); blockBlobSidecarsTracker.setBlock(block); blockBlobSidecarsTracker.setBlock(block); assertThat(blockBlobSidecarsTracker.getBlock()).isEqualTo(Optional.of(block)); @@ -115,7 +115,7 @@ void setBlock_immediatelyCompletesWithBlockWithoutBlobs() { final SlotAndBlockRoot slotAndBlockRoot = block.getSlotAndBlockRoot(); final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); final SafeFuture completionFuture = blockBlobSidecarsTracker.getCompletionFuture(); SafeFutureAssert.assertThatSafeFuture(completionFuture).isNotCompleted(); @@ -125,7 +125,7 @@ void setBlock_immediatelyCompletesWithBlockWithoutBlobs() { SafeFutureAssert.assertThatSafeFuture(completionFuture).isCompleted(); assertThat(blockBlobSidecarsTracker.isComplete()).isTrue(); - assertThat(blockBlobSidecarsTracker.getMissingBlobSidecars()).isEmpty(); + assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock()).isEmpty(); assertThat(blockBlobSidecarsTracker.getBlobSidecars()).isEmpty(); } @@ -135,7 +135,7 @@ void getCompletionFuture_returnsIndependentFutures() { final SlotAndBlockRoot slotAndBlockRoot = block.getSlotAndBlockRoot(); final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); final SafeFuture completionFuture1 = blockBlobSidecarsTracker.getCompletionFuture(); final SafeFuture completionFuture2 = blockBlobSidecarsTracker.getCompletionFuture(); final SafeFuture completionFuture3 = blockBlobSidecarsTracker.getCompletionFuture(); @@ -168,7 +168,7 @@ void getCompletionFuture_returnsIndependentFutures() { @Test void add_shouldWorkTillCompletionWhenAddingBlobsBeforeBlockIsSet() { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock.plus(1)); + new BlockBlobSidecarsTracker(slotAndBlockRoot); final BlobSidecar toAdd = blobSidecarsForBlock.get(0); final Map added = new HashMap<>(); final SafeFuture completionFuture = blockBlobSidecarsTracker.getCompletionFuture(); @@ -183,7 +183,7 @@ void add_shouldWorkTillCompletionWhenAddingBlobsBeforeBlockIsSet() { .collect(Collectors.toSet()); SafeFutureAssert.assertThatSafeFuture(completionFuture).isNotCompleted(); - assertThat(blockBlobSidecarsTracker.getMissingBlobSidecars()) + assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock()) .containsExactlyInAnyOrderElementsOf(potentialMissingBlobs); assertThat(blockBlobSidecarsTracker.getBlobSidecars()) .containsExactlyInAnyOrderEntriesOf(added); @@ -194,7 +194,7 @@ void add_shouldWorkTillCompletionWhenAddingBlobsBeforeBlockIsSet() { // now we know the block and we know about missing blobs final List stillMissing = blobIdentifiersForBlock.subList(1, blobIdentifiersForBlock.size()); - assertThat(blockBlobSidecarsTracker.getMissingBlobSidecars()) + assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock()) .containsExactlyInAnyOrderElementsOf(stillMissing); SafeFutureAssert.assertThatSafeFuture(completionFuture).isNotCompleted(); @@ -224,7 +224,7 @@ void add_shouldWorkTillCompletionWhenAddingBlobsBeforeBlockIsSet() { @Test void add_shouldWorkWhenBlockIsSetFirst() { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); final SafeFuture completionFuture = blockBlobSidecarsTracker.getCompletionFuture(); blockBlobSidecarsTracker.setBlock(block); @@ -238,7 +238,7 @@ void add_shouldWorkWhenBlockIsSetFirst() { added.put(toAdd.getIndex(), toAdd); blockBlobSidecarsTracker.add(toAdd); - assertThat(blockBlobSidecarsTracker.getMissingBlobSidecars()) + assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock()) .containsExactlyInAnyOrderElementsOf(stillMissing); SafeFutureAssert.assertThatSafeFuture(completionFuture).isNotCompleted(); assertThat(blockBlobSidecarsTracker.getBlobSidecars()) @@ -249,7 +249,7 @@ void add_shouldWorkWhenBlockIsSetFirst() { @Test void add_shouldThrowWhenAddingInconsistentBlobSidecar() { BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(dataStructureUtil.randomSlotAndBlockRoot(), maxBlobsPerBlock); + new BlockBlobSidecarsTracker(dataStructureUtil.randomSlotAndBlockRoot()); assertThatThrownBy(() -> blockBlobSidecarsTracker.add(dataStructureUtil.randomBlobSidecar())) .isInstanceOf(IllegalArgumentException.class); } @@ -257,16 +257,16 @@ void add_shouldThrowWhenAddingInconsistentBlobSidecar() { @Test void add_shouldAcceptAcceptSameBlobSidecarTwice() { BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); blockBlobSidecarsTracker.setBlock(block); blockBlobSidecarsTracker.setBlock(block); assertThat(blockBlobSidecarsTracker.getBlock()).isEqualTo(Optional.of(block)); } @Test - void getMissingBlobSidecars_shouldReturnPartialBlobsIdentifierWhenBlockIsUnknown() { + void getMissingBlobSidecars_ForBlock_shouldReturnPartialBlobsIdentifierWhenBlockIsUnknown() { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); final BlobSidecar toAdd = blobSidecarsForBlock.get(2); blockBlobSidecarsTracker.add(toAdd); @@ -276,69 +276,14 @@ void getMissingBlobSidecars_shouldReturnPartialBlobsIdentifierWhenBlockIsUnknown .filter(blobIdentifier -> !blobIdentifier.getIndex().equals(UInt64.valueOf(2))) .collect(Collectors.toList()); - assertThat(blockBlobSidecarsTracker.getMissingBlobSidecars()) + assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock()) .containsExactlyInAnyOrderElementsOf(knownMissing); } @Test - void getUnusedBlobSidecarsForBlock_shouldReturnShouldFailIfBlockIsUnknown() { + void getMissingBlobSidecars_ForBlock_shouldRespectMaxBlobsPerBlock() { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); - - assertThatThrownBy(blockBlobSidecarsTracker::getUnusedBlobSidecarsForBlock) - .isInstanceOf(IllegalStateException.class); - } - - @Test - void getUnusedBlobSidecarsForBlock_shouldReturnEmptySetIfBlockIsFull() { - final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); - - blockBlobSidecarsTracker.setBlock(block); - - assertThat(blockBlobSidecarsTracker.getUnusedBlobSidecarsForBlock()).isEmpty(); - } - - @Test - void getUnusedBlobSidecarsForBlock_shouldReturnUnusedBlobSpace() { - final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock.plus(2)); - - blockBlobSidecarsTracker.setBlock(block); - - final Set expectedUnusedBlobs = - UInt64.range(maxBlobsPerBlock, maxBlobsPerBlock.plus(2)) - .map(index -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), index)) - .collect(Collectors.toSet()); - - assertThat(blockBlobSidecarsTracker.getUnusedBlobSidecarsForBlock()) - .containsExactlyInAnyOrderElementsOf(expectedUnusedBlobs); - } - - @Test - void getUnusedBlobSidecarsForBlock_shouldReturnAllMaxBlobsPerBlockIfBlockIsEmpty() { - - final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlockWithEmptyCommitments(); - final SlotAndBlockRoot slotAndBlockRoot = block.getSlotAndBlockRoot(); - - final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock.plus(2)); - - blockBlobSidecarsTracker.setBlock(block); - - final Set expectedUnusedBlobs = - UInt64.range(UInt64.ZERO, maxBlobsPerBlock.plus(2)) - .map(index -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), index)) - .collect(Collectors.toSet()); - - assertThat(blockBlobSidecarsTracker.getUnusedBlobSidecarsForBlock()) - .containsExactlyInAnyOrderElementsOf(expectedUnusedBlobs); - } - - @Test - void getMissingBlobSidecars_shouldRespectMaxBlobsPerBlock() { - final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); final BlobSidecar toAdd = dataStructureUtil .createRandomBlobSidecarBuilder() @@ -351,14 +296,14 @@ void getMissingBlobSidecars_shouldRespectMaxBlobsPerBlock() { final List knownMissing = blobIdentifiersForBlock.subList(0, maxBlobsPerBlock.intValue()); - assertThat(blockBlobSidecarsTracker.getMissingBlobSidecars()) + assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock()) .containsExactlyInAnyOrderElementsOf(knownMissing); } @Test void shouldNotIgnoreExcessiveBlobSidecarWhenBlockIsUnknownAndWePruneItLater() { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); final BlobSidecar legitBlobSidecar = createBlobSidecar(UInt64.valueOf(2)); final BlobSidecar excessiveBlobSidecar1 = createBlobSidecar(maxBlobsPerBlock); @@ -382,7 +327,7 @@ void shouldNotIgnoreExcessiveBlobSidecarWhenBlockIsUnknownAndWePruneItLater() { @Test void add_shouldIgnoreExcessiveBlobSidecarWhenBlockIsKnown() { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); blockBlobSidecarsTracker.setBlock(block); @@ -400,7 +345,7 @@ void add_shouldIgnoreExcessiveBlobSidecarWhenBlockIsKnown() { @Test void enableBlockImportOnCompletion_shouldImportOnlyOnceWhenCalled() { BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); + new BlockBlobSidecarsTracker(slotAndBlockRoot); blockBlobSidecarsTracker.setBlock(block); diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java index 2dad121a9eb..40884b71d4d 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java @@ -463,7 +463,7 @@ public void onCompletedBlockAndBlobSidecars_shouldCreateTrackerIgnoringHistorica assertThat(blockBlobSidecarsTracker.getBlobSidecars().values()) .containsExactlyInAnyOrderElementsOf(blobSidecars); assertThat(blockBlobSidecarsTracker.getBlock()).isPresent(); - assertThat(blockBlobSidecarsTracker.isRpcFetchTriggered()).isFalse(); + assertThat(blockBlobSidecarsTracker.isRpcBlockFetchTriggered()).isFalse(); assertThatSafeFuture(blockBlobSidecarsTracker.getCompletionFuture()).isCompleted(); assertBlobSidecarsCount(blobSidecars.size()); @@ -493,7 +493,7 @@ public void onCompletedBlockAndBlobSidecars_shouldNotTriggerFetch() { assertThat(blockBlobSidecarsTracker.getBlobSidecars().values()) .containsExactlyInAnyOrderElementsOf(blobSidecars); assertThat(blockBlobSidecarsTracker.getBlock()).isPresent(); - assertThat(blockBlobSidecarsTracker.isRpcFetchTriggered()).isFalse(); + assertThat(blockBlobSidecarsTracker.isRpcBlockFetchTriggered()).isFalse(); assertThatSafeFuture(blockBlobSidecarsTracker.getCompletionFuture()).isNotCompleted(); assertBlobSidecarsCount(0); @@ -511,7 +511,7 @@ public void onCompletedBlockAndBlobSidecars_shouldNotTriggerFetch() { blockBlobSidecarsTrackersPool.getOrCreateBlockBlobSidecarsTracker(block); assertThat(blockBlobSidecarsTracker.getBlock()).isPresent(); - assertThat(blockBlobSidecarsTracker.isRpcFetchTriggered()).isFalse(); + assertThat(blockBlobSidecarsTracker.isRpcBlockFetchTriggered()).isFalse(); assertThatSafeFuture(blockBlobSidecarsTracker.getCompletionFuture()).isNotCompleted(); assertBlobSidecarsCount(0); @@ -670,7 +670,7 @@ void shouldFetchMissingBlobSidecarsFromLocalELFirst() { Optional.of( (slotAndRoot) -> { when(tracker.add(any())).thenReturn(true); - when(tracker.getMissingBlobSidecars()) + when(tracker.getMissingBlobSidecarsForBlock()) .thenAnswer(__ -> missingBlobIdentifiers.stream()); when(tracker.getBlock()).thenReturn(Optional.of(block)); return tracker; @@ -694,7 +694,7 @@ void shouldFetchMissingBlobSidecarsFromLocalELFirst() { assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); // local el fetch triggered - verify(tracker).setLocalElFetchTriggered(); + verify(tracker).setLocalElBlobsFetchTriggered(); // prepare partial response of 3 blobAndProofs final List> blobAndProofsFromEL = @@ -734,7 +734,8 @@ void shouldFetchMissingBlobSidecarsViaRPCAfterLocalEL() { Optional.of( (slotAndRoot) -> { BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getMissingBlobSidecars()).thenAnswer(__ -> missingBlobs.stream()); + when(tracker.getMissingBlobSidecarsForBlock()) + .thenAnswer(__ -> missingBlobs.stream()); when(tracker.getBlock()).thenReturn(Optional.of(block)); return tracker; }); @@ -770,7 +771,8 @@ void shouldFetchMissingBlobSidecarsViaRPCWhenELLookupFails() { Optional.of( (slotAndRoot) -> { BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getMissingBlobSidecars()).thenAnswer(__ -> missingBlobs.stream()); + when(tracker.getMissingBlobSidecarsForBlock()) + .thenAnswer(__ -> missingBlobs.stream()); when(tracker.getBlock()).thenReturn(Optional.of(block)); return tracker; }); @@ -810,7 +812,7 @@ void shouldFetchMissingBlockAndBlobSidecars() { final BlockBlobSidecarsTracker mockedTracker = mock(BlockBlobSidecarsTracker.class); when(mockedTracker.getBlock()).thenReturn(Optional.empty()); - when(mockedTracker.getMissingBlobSidecars()).thenReturn(missingBlobs.stream()); + when(mockedTracker.getMissingBlobSidecarsForBlock()).thenReturn(missingBlobs.stream()); when(mockedTracker.getSlotAndBlockRoot()).thenReturn(block.getSlotAndBlockRoot()); mockedTrackersFactory = Optional.of((__) -> mockedTracker); @@ -821,8 +823,8 @@ void shouldFetchMissingBlockAndBlobSidecars() { asyncRunner.executeQueuedActions(); - verify(mockedTracker).setRpcFetchTriggered(); - verify(mockedTracker, never()).setLocalElFetchTriggered(); + verify(mockedTracker).setRpcBlockFetchTriggered(); + verify(mockedTracker, never()).setLocalElBlobsFetchTriggered(); assertThat(requiredBlockRootEvents).containsExactly(block.getRoot()); assertThat(requiredBlobSidecarEvents).containsExactlyElementsOf(missingBlobs); @@ -858,9 +860,7 @@ void shouldDropBlobSidecarsThatHasBeenFetchedButNotPresentInBlock() { when(tracker.getBlock()).thenReturn(Optional.empty()); when(tracker.getSlotAndBlockRoot()).thenReturn(slotAndBlockRoot); when(tracker.setBlock(block)).thenReturn(true); - when(tracker.isRpcFetchTriggered()).thenReturn(true); - when(tracker.getUnusedBlobSidecarsForBlock()) - .thenReturn(blobsNotPresentInBlock.stream()); + when(tracker.isRpcBlockFetchTriggered()).thenReturn(true); return tracker; }); @@ -895,9 +895,7 @@ void shouldNotDropUnusedBlobSidecarsIfFetchingHasNotOccurred() { when(tracker.getBlock()).thenReturn(Optional.empty()); when(tracker.getSlotAndBlockRoot()).thenReturn(slotAndBlockRoot); when(tracker.setBlock(block)).thenReturn(true); - when(tracker.isRpcFetchTriggered()).thenReturn(false); - when(tracker.getUnusedBlobSidecarsForBlock()) - .thenReturn(blobsNotUserInBlock.stream()); + when(tracker.isRpcBlockFetchTriggered()).thenReturn(false); return tracker; }); @@ -945,10 +943,10 @@ void shouldDropPossiblyFetchedBlobSidecars() { Optional.of( (slotAndRoot) -> { BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getMissingBlobSidecars()).thenReturn(missingBlobs.stream()); + when(tracker.getMissingBlobSidecarsForBlock()).thenReturn(missingBlobs.stream()); when(tracker.getBlock()).thenReturn(Optional.of(block)); when(tracker.getSlotAndBlockRoot()).thenReturn(block.getSlotAndBlockRoot()); - when(tracker.isRpcFetchTriggered()).thenReturn(true); + when(tracker.isRpcBlockFetchTriggered()).thenReturn(true); return tracker; }); @@ -987,12 +985,13 @@ void shouldTryToFetchFromLocalELWhenBlockArrivesAfterRPCFetch() { mockedTrackersFactory = Optional.of( (slotAndRoot) -> { - when(tracker.getMissingBlobSidecars()).thenAnswer(__ -> missingBlobs.stream()); + when(tracker.getMissingBlobSidecarsForBlock()) + .thenAnswer(__ -> missingBlobs.stream()); when(tracker.getBlock()).thenReturn(Optional.empty()); when(tracker.setBlock(any())).thenReturn(true); when(tracker.getSlotAndBlockRoot()).thenReturn(block.getSlotAndBlockRoot()); - when(tracker.isRpcFetchTriggered()).thenReturn(true); - when(tracker.isLocalElFetchTriggered()).thenReturn(false); + when(tracker.isRpcBlockFetchTriggered()).thenReturn(true); + when(tracker.isLocalElBlobsFetchTriggered()).thenReturn(false); return tracker; }); @@ -1001,7 +1000,7 @@ void shouldTryToFetchFromLocalELWhenBlockArrivesAfterRPCFetch() { assertThat(asyncRunner.hasDelayedActions()).isTrue(); asyncRunner.executeQueuedActions(); - verify(tracker, never()).setLocalElFetchTriggered(); + verify(tracker, never()).setLocalElBlobsFetchTriggered(); when(tracker.getBlock()).thenReturn(Optional.of(block)); @@ -1011,7 +1010,7 @@ void shouldTryToFetchFromLocalELWhenBlockArrivesAfterRPCFetch() { blockBlobSidecarsTrackersPool.onNewBlock(block, Optional.empty()); - verify(tracker).setLocalElFetchTriggered(); + verify(tracker).setLocalElBlobsFetchTriggered(); verify(executionLayer).engineGetBlobs(any(), any()); } @@ -1032,7 +1031,7 @@ void shouldDropPossiblyFetchedBlock() { when(tracker.getBlock()).thenReturn(Optional.empty()); when(tracker.getSlotAndBlockRoot()) .thenReturn(signedBeaconBlock.getSlotAndBlockRoot()); - when(tracker.isRpcFetchTriggered()).thenReturn(true); + when(tracker.isRpcBlockFetchTriggered()).thenReturn(true); return tracker; }); @@ -1067,7 +1066,7 @@ void shouldNotDropPossiblyFetchedBlockIfFetchHasNotOccurred() { when(tracker.getBlock()).thenReturn(Optional.empty()); when(tracker.getSlotAndBlockRoot()) .thenReturn(signedBeaconBlock.getSlotAndBlockRoot()); - when(tracker.isRpcFetchTriggered()).thenReturn(false); + when(tracker.isRpcBlockFetchTriggered()).thenReturn(false); return tracker; }); @@ -1188,7 +1187,7 @@ void getAllRequiredBlobSidecars_shouldReturnAllRequiredBlobSidecars() { Optional.of( (slotAndRoot) -> { BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getMissingBlobSidecars()).thenReturn(missingBlobs1.stream()); + when(tracker.getMissingBlobSidecarsForBlock()).thenReturn(missingBlobs1.stream()); when(tracker.getBlock()).thenReturn(Optional.of(block1)); return tracker; }); @@ -1206,7 +1205,7 @@ void getAllRequiredBlobSidecars_shouldReturnAllRequiredBlobSidecars() { Optional.of( (slotAndRoot) -> { BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getMissingBlobSidecars()).thenReturn(missingBlobs2.stream()); + when(tracker.getMissingBlobSidecarsForBlock()).thenReturn(missingBlobs2.stream()); when(tracker.getBlock()).thenReturn(Optional.of(block2)); return tracker; }); @@ -1400,8 +1399,6 @@ private BlockBlobSidecarsTracker trackerFactory(final SlotAndBlockRoot slotAndBl if (mockedTrackersFactory.isPresent()) { return mockedTrackersFactory.get().apply(slotAndBlockRoot); } - return new BlockBlobSidecarsTracker( - slotAndBlockRoot, - UInt64.valueOf(spec.getMaxBlobsPerBlockForHighestMilestone().orElseThrow())); + return new BlockBlobSidecarsTracker(slotAndBlockRoot); } } From 554693791037e98c0aea7d077308300f5deceaa2 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Mon, 16 Dec 2024 16:42:23 +0000 Subject: [PATCH 02/14] rebase fixes --- .../deneb/helpers/MiscHelpersDeneb.java | 24 ++++++++++++------- .../blobs/BlobSidecarManagerImpl.java | 4 +--- .../BlockBlobSidecarsTrackersPoolImpl.java | 2 +- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/versions/deneb/helpers/MiscHelpersDeneb.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/versions/deneb/helpers/MiscHelpersDeneb.java index bea7c222df7..87953843804 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/versions/deneb/helpers/MiscHelpersDeneb.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/versions/deneb/helpers/MiscHelpersDeneb.java @@ -279,15 +279,21 @@ public BlobSidecar constructBlobSidecarFromBlobAndProof( final SszKZGCommitment sszKZGCommitment = beaconBlockBodyDeneb.getBlobKzgCommitments().get(blobIdentifier.getIndex().intValue()); - return blobSidecarSchema.create( - blobIdentifier.getIndex(), - blobAndProof.blob(), - sszKZGCommitment, - new SszKZGProof(blobAndProof.proof()), - signedBeaconBlockHeader, - computeKzgCommitmentInclusionProof(blobIdentifier.getIndex(), beaconBlockBodyDeneb)); - - + final BlobSidecar blobSidecar = + blobSidecarSchema.create( + blobIdentifier.getIndex(), + blobAndProof.blob(), + sszKZGCommitment, + new SszKZGProof(blobAndProof.proof()), + signedBeaconBlockHeader, + computeKzgCommitmentInclusionProof(blobIdentifier.getIndex(), beaconBlockBodyDeneb)); + + blobSidecar.markSignatureAsValidated(); + blobSidecar.markKzgCommitmentInclusionProofAsValidated(); + // assume kzg validation done by local EL + blobSidecar.markKzgAsValidated(); + + return blobSidecar; } public boolean verifyBlobKzgCommitmentInclusionProof(final BlobSidecar blobSidecar) { diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerImpl.java index fc20d0c60e8..22609e00b69 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerImpl.java @@ -66,9 +66,7 @@ public BlobSidecarManagerImpl( invalidBlobSidecarRoots, (tracker) -> new ForkChoiceBlobSidecarsAvailabilityChecker(spec, recentChainData, tracker, kzg), - // we don't care to set maxBlobsPerBlock since it isn't used with this immediate validation - // flow - (block) -> new BlockBlobSidecarsTracker(block.getSlotAndBlockRoot(), UInt64.ZERO)); + (block) -> new BlockBlobSidecarsTracker(block.getSlotAndBlockRoot())); } @VisibleForTesting diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index d4aedb221fd..87000cc5da0 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -493,7 +493,7 @@ private BlockBlobSidecarsTracker internalOnNewBlock( // if we attempted to fetch this block via RPC, we missed the opportunity to // complete the blob sidecars via local EL and RPC (since the block is required to // be known) Let's try now - if (!existingTracker.isCompleted()) { + if (!existingTracker.isComplete()) { fetchMissingContent(slotAndBlockRoot).finish(this::logMissingContentFetchFailure); } } From 7101d48b229ea078ee9c68d0efb7a594c7df4412 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Tue, 17 Dec 2024 11:29:14 +0000 Subject: [PATCH 03/14] remove delay when fetching blobs and block is known --- .../blobs/BlockBlobSidecarsTracker.java | 2 +- .../BlockBlobSidecarsTrackersPoolImpl.java | 110 ++++++++++-------- ...BlockBlobSidecarsTrackersPoolImplTest.java | 23 ++-- 3 files changed, 77 insertions(+), 58 deletions(-) diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java index 6d3c9f95d21..314be521041 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java @@ -314,7 +314,7 @@ private void printDebugTimings(final Map debugTimings) { .append(debugTimings.get(LOCAL_EL_BLOBS_FETCH_TIMING_IDX) - creationTime) .append("ms - "); } else { - timingsReport.append("Local EL fetch wasn't required - "); + timingsReport.append("Local EL blobs fetch wasn't required - "); } if (debugTimings.containsKey(RPC_BLOCK_FETCH_TIMING_IDX)) { diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index 87000cc5da0..a7efb8567b7 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -71,6 +71,11 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis implements BlockBlobSidecarsTrackersPool { private static final Logger LOG = LogManager.getLogger(); + enum TrackerObject { + BLOCK, + BLOB_SIDECAR; + } + static final String COUNTER_BLOCK_TYPE = "block"; static final String COUNTER_SIDECAR_TYPE = "blob_sidecar"; @@ -89,6 +94,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis static final String GAUGE_BLOB_SIDECARS_LABEL = "blob_sidecars"; static final String GAUGE_BLOB_SIDECARS_TRACKERS_LABEL = "blob_sidecars_trackers"; + // Block fetching delay timings static final UInt64 MAX_WAIT_RELATIVE_TO_ATT_DUE_MILLIS = UInt64.valueOf(1500); static final UInt64 MIN_WAIT_MILLIS = UInt64.valueOf(500); static final UInt64 TARGET_WAIT_MILLIS = UInt64.valueOf(1000); @@ -218,12 +224,25 @@ public synchronized void onNewBlobSidecar( final SlotAndBlockRoot slotAndBlockRoot = blobSidecar.getSlotAndBlockRoot(); - final BlockBlobSidecarsTracker blobSidecarsTracker = - getOrCreateBlobSidecarsTracker( - slotAndBlockRoot, - newTracker -> onFirstSeen(slotAndBlockRoot, Optional.of(remoteOrigin)), - existingTracker -> {}); + getOrCreateBlobSidecarsTracker( + slotAndBlockRoot, + newTracker -> { + addBlobSidecarToTracker(newTracker, slotAndBlockRoot, blobSidecar, remoteOrigin); + onFirstSeen(slotAndBlockRoot, TrackerObject.BLOB_SIDECAR, Optional.of(remoteOrigin)); + }, + existingTracker -> + addBlobSidecarToTracker(existingTracker, slotAndBlockRoot, blobSidecar, remoteOrigin)); + + if (orderedBlobSidecarsTrackers.add(slotAndBlockRoot)) { + sizeGauge.set(orderedBlobSidecarsTrackers.size(), GAUGE_BLOB_SIDECARS_TRACKERS_LABEL); + } + } + private void addBlobSidecarToTracker( + final BlockBlobSidecarsTracker blobSidecarsTracker, + final SlotAndBlockRoot slotAndBlockRoot, + final BlobSidecar blobSidecar, + final RemoteOrigin remoteOrigin) { if (blobSidecarsTracker.add(blobSidecar)) { sizeGauge.set(++totalBlobSidecars, GAUGE_BLOB_SIDECARS_LABEL); countBlobSidecar(remoteOrigin); @@ -234,10 +253,6 @@ public synchronized void onNewBlobSidecar( } else { countDuplicateBlobSidecar(remoteOrigin); } - - if (orderedBlobSidecarsTrackers.add(slotAndBlockRoot)) { - sizeGauge.set(orderedBlobSidecarsTrackers.size(), GAUGE_BLOB_SIDECARS_TRACKERS_LABEL); - } } private void publishRecoveredBlobSidecar(final BlobSidecar blobSidecar) { @@ -478,7 +493,7 @@ private BlockBlobSidecarsTracker internalOnNewBlock( newTracker -> { newTracker.setBlock(block); countBlock(remoteOrigin); - onFirstSeen(slotAndBlockRoot, remoteOrigin); + onFirstSeen(slotAndBlockRoot, TrackerObject.BLOCK, remoteOrigin); }, existingTracker -> { if (!existingTracker.setBlock(block)) { @@ -494,7 +509,7 @@ private BlockBlobSidecarsTracker internalOnNewBlock( // complete the blob sidecars via local EL and RPC (since the block is required to // be known) Let's try now if (!existingTracker.isComplete()) { - fetchMissingContent(slotAndBlockRoot).finish(this::logMissingContentFetchFailure); + fetchMissingContent(slotAndBlockRoot); } } }); @@ -566,37 +581,28 @@ private void makeRoomForNewTracker() { } } + @SuppressWarnings("FutureReturnValueIgnored") private void onFirstSeen( - final SlotAndBlockRoot slotAndBlockRoot, final Optional remoteOrigin) { + final SlotAndBlockRoot slotAndBlockRoot, + final TrackerObject trackerObject, + final Optional remoteOrigin) { final boolean isLocalBlockProduction = remoteOrigin.map(ro -> ro.equals(LOCAL_PROPOSAL)).orElse(false); if (isLocalBlockProduction) { return; } - - final Duration fetchDelay = calculateFetchDelay(slotAndBlockRoot); - - asyncRunner - .runAfterDelay(() -> fetchMissingContent(slotAndBlockRoot), fetchDelay) - .finish(this::logMissingContentFetchFailure); - } - - private SafeFuture fetchMissingContent(final SlotAndBlockRoot slotAndBlockRoot) { - return fetchMissingBlobsFromLocalEL(slotAndBlockRoot) - .handleException(this::logLocalElBlobsLookupFailure) - .thenRun(() -> fetchMissingContentFromRemotePeers(slotAndBlockRoot)); - } - - private void logLocalElBlobsLookupFailure(final Throwable error) { - LOG.warn("Local EL blobs lookup failed: {}", getRootCauseMessage(error)); - } - - private void logMissingContentFetchFailure(final Throwable error) { - LOG.error("An error occurred while attempting to fetch missing content.", error); + switch (trackerObject) { + case BLOB_SIDECAR -> { + final Duration blockFetchDelay = calculateBlockFetchDelay(slotAndBlockRoot); + asyncRunner.runAfterDelay(() -> fetchMissingContent(slotAndBlockRoot), blockFetchDelay); + } + // no delay for attempting to fetch blobs for when the block is first seen + case BLOCK -> fetchMissingContent(slotAndBlockRoot); + } } @VisibleForTesting - Duration calculateFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) { + Duration calculateBlockFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) { final UInt64 slot = slotAndBlockRoot.getSlot(); if (slot.isLessThan(getCurrentSlot())) { @@ -626,6 +632,16 @@ Duration calculateFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) { return Duration.ofMillis(finalTime.minus(nowMillis).intValue()); } + private void fetchMissingContent(final SlotAndBlockRoot slotAndBlockRoot) { + fetchMissingBlobsFromLocalEL(slotAndBlockRoot) + .handleException( + error -> LOG.warn("Local EL blobs lookup failed: {}", getRootCauseMessage(error))) + .thenRun(() -> fetchMissingContentFromRemotePeers(slotAndBlockRoot)) + .finish( + error -> + LOG.error("An error occurred while attempting to fetch missing content.", error)); + } + private synchronized SafeFuture fetchMissingBlobsFromLocalEL( final SlotAndBlockRoot slotAndBlockRoot) { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = @@ -711,7 +727,7 @@ private synchronized void fetchMissingContentFromRemotePeers( } if (blockBlobSidecarsTracker.getBlock().isEmpty()) { - + // fetch missing block blockBlobSidecarsTracker.setRpcBlockFetchTriggered(); poolStatsCounters.labels(COUNTER_BLOCK_TYPE, COUNTER_RPC_FETCH_SUBTYPE).inc(); @@ -721,6 +737,7 @@ private synchronized void fetchMissingContentFromRemotePeers( return; } + // fetch missing blob sidecars blockBlobSidecarsTracker.setRpcBlobsFetchTriggered(); blockBlobSidecarsTracker @@ -735,24 +752,21 @@ private synchronized void fetchMissingContentFromRemotePeers( private void dropMissingContent(final BlockBlobSidecarsTracker blockBlobSidecarsTracker) { - if (!blockBlobSidecarsTracker.isRpcBlockFetchTriggered() - && !blockBlobSidecarsTracker.isRpcBlobsFetchTriggered()) { - return; - } - - if (blockBlobSidecarsTracker.getBlock().isEmpty()) { + if (blockBlobSidecarsTracker.isRpcBlockFetchTriggered() + && blockBlobSidecarsTracker.getBlock().isEmpty()) { requiredBlockRootDroppedSubscribers.deliver( RequiredBlockRootDroppedSubscriber::onRequiredBlockRootDropped, blockBlobSidecarsTracker.getSlotAndBlockRoot().getBlockRoot()); - return; } - blockBlobSidecarsTracker - .getMissingBlobSidecarsForBlock() - .forEach( - blobIdentifier -> - requiredBlobSidecarDroppedSubscribers.deliver( - RequiredBlobSidecarDroppedSubscriber::onRequiredBlobSidecarDropped, - blobIdentifier)); + if (blockBlobSidecarsTracker.isRpcBlobsFetchTriggered()) { + blockBlobSidecarsTracker + .getMissingBlobSidecarsForBlock() + .forEach( + blobIdentifier -> + requiredBlobSidecarDroppedSubscribers.deliver( + RequiredBlobSidecarDroppedSubscriber::onRequiredBlobSidecarDropped, + blobIdentifier)); + } } } diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java index 40884b71d4d..23db2505e8f 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java @@ -1092,14 +1092,15 @@ void shouldRespectTargetWhenBlockIsEarly() { // blocks arrives at slot start timeProvider.advanceTimeBySeconds(startSlotInSeconds.longValue()); - final Duration fetchDelay = blockBlobSidecarsTrackersPool.calculateFetchDelay(slotAndBlockRoot); + final Duration fetchDelay = + blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot); // we can wait the full target assertThat(fetchDelay).isEqualTo(Duration.ofMillis(TARGET_WAIT_MILLIS.longValue())); } @Test - void calculateFetchDelay_shouldRespectMinimumWhenBlockIsLate() { + void calculateBlockFetchDelay_shouldRespectMinimumWhenBlockIsLate() { final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32()); @@ -1111,14 +1112,15 @@ void calculateFetchDelay_shouldRespectMinimumWhenBlockIsLate() { // blocks arrives 200ms before attestation due timeProvider.advanceTimeByMillis(startSlotInMillis.plus(3_800).longValue()); - final Duration fetchDelay = blockBlobSidecarsTrackersPool.calculateFetchDelay(slotAndBlockRoot); + final Duration fetchDelay = + blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot); // we can wait the full target assertThat(fetchDelay).isEqualTo(Duration.ofMillis(MIN_WAIT_MILLIS.longValue())); } @Test - void calculateFetchDelay_shouldRespectTargetWhenBlockIsVeryLate() { + void calculateBlockFetchDelay_shouldRespectTargetWhenBlockIsVeryLate() { final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32()); @@ -1129,14 +1131,15 @@ void calculateFetchDelay_shouldRespectTargetWhenBlockIsVeryLate() { // blocks arrives 1s after attestation due timeProvider.advanceTimeBySeconds(startSlotInSeconds.plus(5).longValue()); - final Duration fetchDelay = blockBlobSidecarsTrackersPool.calculateFetchDelay(slotAndBlockRoot); + final Duration fetchDelay = + blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot); // we can wait the full target assertThat(fetchDelay).isEqualTo(Duration.ofMillis(TARGET_WAIT_MILLIS.longValue())); } @Test - void calculateFetchDelay_shouldRespectAttestationDueLimit() { + void calculateBlockFetchDelay_shouldRespectAttestationDueLimit() { final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32()); @@ -1156,7 +1159,8 @@ void calculateFetchDelay_shouldRespectAttestationDueLimit() { timeProvider.advanceTimeByMillis(blockArrivalTimeMillis.longValue()); - final Duration fetchDelay = blockBlobSidecarsTrackersPool.calculateFetchDelay(slotAndBlockRoot); + final Duration fetchDelay = + blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot); // we can only wait 200ms less than target assertThat(fetchDelay) @@ -1165,11 +1169,12 @@ void calculateFetchDelay_shouldRespectAttestationDueLimit() { } @Test - void calculateFetchDelay_shouldReturnZeroIfSlotIsOld() { + void calculateBlockFetchDelay_shouldReturnZeroIfSlotIsOld() { final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot.minus(1), dataStructureUtil.randomBytes32()); - final Duration fetchDelay = blockBlobSidecarsTrackersPool.calculateFetchDelay(slotAndBlockRoot); + final Duration fetchDelay = + blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot); assertThat(fetchDelay).isEqualTo(Duration.ZERO); } From c0e47d794b0bf03ea14a4fffb1ceaaf544ce1c9c Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Tue, 17 Dec 2024 11:47:07 +0000 Subject: [PATCH 04/14] fix spotless/assemble --- .../util/BlockBlobSidecarsTrackersPoolImpl.java | 4 ++-- .../util/BlockBlobSidecarsTrackersPoolImplTest.java | 5 ----- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index a7efb8567b7..e1b4aebcf2a 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -73,7 +73,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis enum TrackerObject { BLOCK, - BLOB_SIDECAR; + BLOB_SIDECAR } static final String COUNTER_BLOCK_TYPE = "block"; @@ -596,7 +596,7 @@ private void onFirstSeen( final Duration blockFetchDelay = calculateBlockFetchDelay(slotAndBlockRoot); asyncRunner.runAfterDelay(() -> fetchMissingContent(slotAndBlockRoot), blockFetchDelay); } - // no delay for attempting to fetch blobs for when the block is first seen + // no delay for attempting to fetch blobs for when the block is first seen case BLOCK -> fetchMissingContent(slotAndBlockRoot); } } diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java index 23db2505e8f..f6def954e12 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java @@ -883,11 +883,6 @@ void shouldNotDropUnusedBlobSidecarsIfFetchingHasNotOccurred() { .index(UInt64.valueOf(2)) .build(); - final Set blobsNotUserInBlock = - Set.of( - new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), UInt64.valueOf(2)), - new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), UInt64.valueOf(3))); - mockedTrackersFactory = Optional.of( (slotAndRoot) -> { From 8aaa658086a5c7f1fec17af752611aba77b334b8 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Tue, 17 Dec 2024 14:56:10 +0000 Subject: [PATCH 05/14] Fix build --- .../blobs/BlockBlobSidecarsTracker.java | 2 +- .../BlockBlobSidecarsTrackersPoolImpl.java | 16 +-- .../blobs/BlockBlobSidecarsTrackerTest.java | 55 +++-------- ...BlockBlobSidecarsTrackersPoolImplTest.java | 98 +++---------------- 4 files changed, 33 insertions(+), 138 deletions(-) diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java index 314be521041..b16088bf650 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java @@ -110,7 +110,7 @@ public Optional getBlobSidecar(final UInt64 index) { return Optional.ofNullable(blobSidecars.get(index)); } - public Stream getMissingBlobSidecarsForBlock() { + public Stream getMissingBlobSidecars() { final Optional blockCommitmentsCount = getBlockKzgCommitmentsCount(); checkState(blockCommitmentsCount.isPresent(), "Block must me known to call this method"); diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index e1b4aebcf2a..b3be558d5f7 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -347,7 +347,7 @@ public synchronized void onCompletedBlockAndBlobSidecars( LOG.error( "Tracker for block {} is supposed to be completed but it is not. Missing blob sidecars: {}", block.toLogString(), - blobSidecarsTracker.getMissingBlobSidecarsForBlock().count()); + blobSidecarsTracker.getMissingBlobSidecars().count()); } if (orderedBlobSidecarsTrackers.add(slotAndBlockRoot)) { @@ -435,7 +435,7 @@ public synchronized Set getAllRequiredBlobSidecars() { if (tracker.getBlock().isEmpty()) { return Stream.empty(); } - return tracker.getMissingBlobSidecarsForBlock(); + return tracker.getMissingBlobSidecars(); }) .collect(Collectors.toSet()); } @@ -597,7 +597,7 @@ private void onFirstSeen( asyncRunner.runAfterDelay(() -> fetchMissingContent(slotAndBlockRoot), blockFetchDelay); } // no delay for attempting to fetch blobs for when the block is first seen - case BLOCK -> fetchMissingContent(slotAndBlockRoot); + case BLOCK -> asyncRunner.runAsync(() -> fetchMissingContent(slotAndBlockRoot)); } } @@ -632,6 +632,7 @@ Duration calculateBlockFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) { return Duration.ofMillis(finalTime.minus(nowMillis).intValue()); } + /** Fetch missing block (when block is unknown) or fetch missing blob sidecars via EL and RPC */ private void fetchMissingContent(final SlotAndBlockRoot slotAndBlockRoot) { fetchMissingBlobsFromLocalEL(slotAndBlockRoot) .handleException( @@ -654,7 +655,7 @@ private synchronized SafeFuture fetchMissingBlobsFromLocalEL( } final List missingBlobsIdentifiers = - blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock().toList(); + blockBlobSidecarsTracker.getMissingBlobSidecars().toList(); final SpecVersion specVersion = spec.atSlot(slotAndBlockRoot.getSlot()); final MiscHelpersDeneb miscHelpersDeneb = @@ -727,7 +728,7 @@ private synchronized void fetchMissingContentFromRemotePeers( } if (blockBlobSidecarsTracker.getBlock().isEmpty()) { - // fetch missing block + blockBlobSidecarsTracker.setRpcBlockFetchTriggered(); poolStatsCounters.labels(COUNTER_BLOCK_TYPE, COUNTER_RPC_FETCH_SUBTYPE).inc(); @@ -737,11 +738,10 @@ private synchronized void fetchMissingContentFromRemotePeers( return; } - // fetch missing blob sidecars blockBlobSidecarsTracker.setRpcBlobsFetchTriggered(); blockBlobSidecarsTracker - .getMissingBlobSidecarsForBlock() + .getMissingBlobSidecars() .forEach( blobIdentifier -> { poolStatsCounters.labels(COUNTER_SIDECAR_TYPE, COUNTER_RPC_FETCH_SUBTYPE).inc(); @@ -761,7 +761,7 @@ private void dropMissingContent(final BlockBlobSidecarsTracker blockBlobSidecars if (blockBlobSidecarsTracker.isRpcBlobsFetchTriggered()) { blockBlobSidecarsTracker - .getMissingBlobSidecarsForBlock() + .getMissingBlobSidecars() .forEach( blobIdentifier -> requiredBlobSidecarDroppedSubscribers.deliver( diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTrackerTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTrackerTest.java index 4e239efc967..757c19ad0e1 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTrackerTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTrackerTest.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.junit.jupiter.api.Test; @@ -69,7 +68,6 @@ void isNotCompletedJustAfterCreation() { SafeFutureAssert.assertThatSafeFuture(blockBlobSidecarsTracker.getCompletionFuture()) .isNotCompleted(); - assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock()).isEmpty(); assertThat(blockBlobSidecarsTracker.getBlock()).isEmpty(); assertThat(blockBlobSidecarsTracker.getBlobSidecars()).isEmpty(); assertThat(blockBlobSidecarsTracker.getSlotAndBlockRoot()).isEqualTo(slotAndBlockRoot); @@ -86,7 +84,7 @@ void setBlock_shouldAcceptCorrectBlock() { SafeFutureAssert.assertThatSafeFuture(blockBlobSidecarsTracker.getCompletionFuture()) .isNotCompleted(); - assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock()) + assertThat(blockBlobSidecarsTracker.getMissingBlobSidecars()) .containsExactlyInAnyOrderElementsOf(blobIdentifiersForBlock); assertThat(blockBlobSidecarsTracker.getBlock()).isEqualTo(Optional.of(block)); assertThat(blockBlobSidecarsTracker.getBlobSidecars()).isEmpty(); @@ -125,7 +123,7 @@ void setBlock_immediatelyCompletesWithBlockWithoutBlobs() { SafeFutureAssert.assertThatSafeFuture(completionFuture).isCompleted(); assertThat(blockBlobSidecarsTracker.isComplete()).isTrue(); - assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock()).isEmpty(); + assertThat(blockBlobSidecarsTracker.getMissingBlobSidecars()).isEmpty(); assertThat(blockBlobSidecarsTracker.getBlobSidecars()).isEmpty(); } @@ -169,22 +167,17 @@ void getCompletionFuture_returnsIndependentFutures() { void add_shouldWorkTillCompletionWhenAddingBlobsBeforeBlockIsSet() { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = new BlockBlobSidecarsTracker(slotAndBlockRoot); - final BlobSidecar toAdd = blobSidecarsForBlock.get(0); + final BlobSidecar toAdd = blobSidecarsForBlock.getFirst(); final Map added = new HashMap<>(); final SafeFuture completionFuture = blockBlobSidecarsTracker.getCompletionFuture(); added.put(toAdd.getIndex(), toAdd); blockBlobSidecarsTracker.add(toAdd); - // we don't know the block, missing blobs are max blobs minus the blob we already have - final Set potentialMissingBlobs = - UInt64.range(UInt64.valueOf(1), maxBlobsPerBlock.plus(1)) - .map(index -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), index)) - .collect(Collectors.toSet()); - SafeFutureAssert.assertThatSafeFuture(completionFuture).isNotCompleted(); - assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock()) - .containsExactlyInAnyOrderElementsOf(potentialMissingBlobs); + assertThatThrownBy(blockBlobSidecarsTracker::getMissingBlobSidecars) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Block must me known to call this method"); assertThat(blockBlobSidecarsTracker.getBlobSidecars()) .containsExactlyInAnyOrderEntriesOf(added); @@ -194,7 +187,7 @@ void add_shouldWorkTillCompletionWhenAddingBlobsBeforeBlockIsSet() { // now we know the block and we know about missing blobs final List stillMissing = blobIdentifiersForBlock.subList(1, blobIdentifiersForBlock.size()); - assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock()) + assertThat(blockBlobSidecarsTracker.getMissingBlobSidecars()) .containsExactlyInAnyOrderElementsOf(stillMissing); SafeFutureAssert.assertThatSafeFuture(completionFuture).isNotCompleted(); @@ -238,7 +231,7 @@ void add_shouldWorkWhenBlockIsSetFirst() { added.put(toAdd.getIndex(), toAdd); blockBlobSidecarsTracker.add(toAdd); - assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock()) + assertThat(blockBlobSidecarsTracker.getMissingBlobSidecars()) .containsExactlyInAnyOrderElementsOf(stillMissing); SafeFutureAssert.assertThatSafeFuture(completionFuture).isNotCompleted(); assertThat(blockBlobSidecarsTracker.getBlobSidecars()) @@ -264,40 +257,16 @@ void add_shouldAcceptAcceptSameBlobSidecarTwice() { } @Test - void getMissingBlobSidecars_ForBlock_shouldReturnPartialBlobsIdentifierWhenBlockIsUnknown() { + void getMissingBlobSidecars_shouldThrowWhenBlockIsUnknown() { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = new BlockBlobSidecarsTracker(slotAndBlockRoot); final BlobSidecar toAdd = blobSidecarsForBlock.get(2); blockBlobSidecarsTracker.add(toAdd); - final List knownMissing = - blobIdentifiersForBlock.stream() - .filter(blobIdentifier -> !blobIdentifier.getIndex().equals(UInt64.valueOf(2))) - .collect(Collectors.toList()); - - assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock()) - .containsExactlyInAnyOrderElementsOf(knownMissing); - } - - @Test - void getMissingBlobSidecars_ForBlock_shouldRespectMaxBlobsPerBlock() { - final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot); - final BlobSidecar toAdd = - dataStructureUtil - .createRandomBlobSidecarBuilder() - .signedBeaconBlockHeader(block.asHeader()) - .index(UInt64.valueOf(100)) - .build(); - - blockBlobSidecarsTracker.add(toAdd); - - final List knownMissing = - blobIdentifiersForBlock.subList(0, maxBlobsPerBlock.intValue()); - - assertThat(blockBlobSidecarsTracker.getMissingBlobSidecarsForBlock()) - .containsExactlyInAnyOrderElementsOf(knownMissing); + assertThatThrownBy(blockBlobSidecarsTracker::getMissingBlobSidecars) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Block must me known to call this method"); } @Test diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java index f6def954e12..93c7494e151 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java @@ -15,6 +15,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -123,6 +124,8 @@ public void setup() { blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecarDropped( requiredBlobSidecarDroppedEvents::add); blockBlobSidecarsTrackersPool.subscribeNewBlobSidecar(newBlobSidecarEvents::add); + when(executionLayer.engineGetBlobs(any(), eq(currentSlot))) + .thenReturn(SafeFuture.completedFuture(List.of())); when(blobSidecarPublisher.apply(any())).thenReturn(SafeFuture.COMPLETE); setSlot(currentSlot); } @@ -670,7 +673,7 @@ void shouldFetchMissingBlobSidecarsFromLocalELFirst() { Optional.of( (slotAndRoot) -> { when(tracker.add(any())).thenReturn(true); - when(tracker.getMissingBlobSidecarsForBlock()) + when(tracker.getMissingBlobSidecars()) .thenAnswer(__ -> missingBlobIdentifiers.stream()); when(tracker.getBlock()).thenReturn(Optional.of(block)); return tracker; @@ -734,8 +737,7 @@ void shouldFetchMissingBlobSidecarsViaRPCAfterLocalEL() { Optional.of( (slotAndRoot) -> { BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getMissingBlobSidecarsForBlock()) - .thenAnswer(__ -> missingBlobs.stream()); + when(tracker.getMissingBlobSidecars()).thenAnswer(__ -> missingBlobs.stream()); when(tracker.getBlock()).thenReturn(Optional.of(block)); return tracker; }); @@ -771,8 +773,7 @@ void shouldFetchMissingBlobSidecarsViaRPCWhenELLookupFails() { Optional.of( (slotAndRoot) -> { BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getMissingBlobSidecarsForBlock()) - .thenAnswer(__ -> missingBlobs.stream()); + when(tracker.getMissingBlobSidecars()).thenAnswer(__ -> missingBlobs.stream()); when(tracker.getBlock()).thenReturn(Optional.of(block)); return tracker; }); @@ -796,7 +797,7 @@ void shouldFetchMissingBlobSidecarsViaRPCWhenELLookupFails() { } @Test - void shouldFetchMissingBlockAndBlobSidecars() { + void shouldFetchMissingBlock() { final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(currentSlot); final BlobSidecar blobSidecar = dataStructureUtil @@ -805,14 +806,8 @@ void shouldFetchMissingBlockAndBlobSidecars() { .index(UInt64.valueOf(2)) .build(); - final Set missingBlobs = - Set.of( - new BlobIdentifier(block.getRoot(), UInt64.ONE), - new BlobIdentifier(block.getRoot(), UInt64.ZERO)); - final BlockBlobSidecarsTracker mockedTracker = mock(BlockBlobSidecarsTracker.class); when(mockedTracker.getBlock()).thenReturn(Optional.empty()); - when(mockedTracker.getMissingBlobSidecarsForBlock()).thenReturn(missingBlobs.stream()); when(mockedTracker.getSlotAndBlockRoot()).thenReturn(block.getSlotAndBlockRoot()); mockedTrackersFactory = Optional.of((__) -> mockedTracker); @@ -827,78 +822,10 @@ void shouldFetchMissingBlockAndBlobSidecars() { verify(mockedTracker, never()).setLocalElBlobsFetchTriggered(); assertThat(requiredBlockRootEvents).containsExactly(block.getRoot()); - assertThat(requiredBlobSidecarEvents).containsExactlyElementsOf(missingBlobs); assertStats("block", "rpc_fetch", 1); - assertStats("blob_sidecar", "rpc_fetch", missingBlobs.size()); assertThat(requiredBlockRootDroppedEvents).isEmpty(); - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); - } - - @Test - void shouldDropBlobSidecarsThatHasBeenFetchedButNotPresentInBlock() { - final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(currentSlot); - - final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, block.getRoot()); - final BlobSidecar blobSidecar = - dataStructureUtil - .createRandomBlobSidecarBuilder() - .signedBeaconBlockHeader(block.asHeader()) - .index(UInt64.valueOf(2)) - .build(); - - final Set blobsNotPresentInBlock = - Set.of( - new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), UInt64.valueOf(2)), - new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), UInt64.valueOf(3))); - - mockedTrackersFactory = - Optional.of( - (slotAndRoot) -> { - BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getBlock()).thenReturn(Optional.empty()); - when(tracker.getSlotAndBlockRoot()).thenReturn(slotAndBlockRoot); - when(tracker.setBlock(block)).thenReturn(true); - when(tracker.isRpcBlockFetchTriggered()).thenReturn(true); - return tracker; - }); - - blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar, RemoteOrigin.GOSSIP); - - blockBlobSidecarsTrackersPool.onNewBlock(block, Optional.empty()); - - assertThat(requiredBlobSidecarDroppedEvents).containsExactlyElementsOf(blobsNotPresentInBlock); - } - - @Test - void shouldNotDropUnusedBlobSidecarsIfFetchingHasNotOccurred() { - final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(currentSlot); - - final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, block.getRoot()); - final BlobSidecar blobSidecar = - dataStructureUtil - .createRandomBlobSidecarBuilder() - .signedBeaconBlockHeader(block.asHeader()) - .index(UInt64.valueOf(2)) - .build(); - - mockedTrackersFactory = - Optional.of( - (slotAndRoot) -> { - BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getBlock()).thenReturn(Optional.empty()); - when(tracker.getSlotAndBlockRoot()).thenReturn(slotAndBlockRoot); - when(tracker.setBlock(block)).thenReturn(true); - when(tracker.isRpcBlockFetchTriggered()).thenReturn(false); - return tracker; - }); - - blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar, RemoteOrigin.GOSSIP); - - blockBlobSidecarsTrackersPool.onNewBlock(block, Optional.empty()); - - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); } @Test @@ -938,10 +865,10 @@ void shouldDropPossiblyFetchedBlobSidecars() { Optional.of( (slotAndRoot) -> { BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getMissingBlobSidecarsForBlock()).thenReturn(missingBlobs.stream()); + when(tracker.getMissingBlobSidecars()).thenAnswer(__ -> missingBlobs.stream()); when(tracker.getBlock()).thenReturn(Optional.of(block)); when(tracker.getSlotAndBlockRoot()).thenReturn(block.getSlotAndBlockRoot()); - when(tracker.isRpcBlockFetchTriggered()).thenReturn(true); + when(tracker.isRpcBlobsFetchTriggered()).thenReturn(true); return tracker; }); @@ -980,8 +907,7 @@ void shouldTryToFetchFromLocalELWhenBlockArrivesAfterRPCFetch() { mockedTrackersFactory = Optional.of( (slotAndRoot) -> { - when(tracker.getMissingBlobSidecarsForBlock()) - .thenAnswer(__ -> missingBlobs.stream()); + when(tracker.getMissingBlobSidecars()).thenAnswer(__ -> missingBlobs.stream()); when(tracker.getBlock()).thenReturn(Optional.empty()); when(tracker.setBlock(any())).thenReturn(true); when(tracker.getSlotAndBlockRoot()).thenReturn(block.getSlotAndBlockRoot()); @@ -1187,7 +1113,7 @@ void getAllRequiredBlobSidecars_shouldReturnAllRequiredBlobSidecars() { Optional.of( (slotAndRoot) -> { BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getMissingBlobSidecarsForBlock()).thenReturn(missingBlobs1.stream()); + when(tracker.getMissingBlobSidecars()).thenReturn(missingBlobs1.stream()); when(tracker.getBlock()).thenReturn(Optional.of(block1)); return tracker; }); @@ -1205,7 +1131,7 @@ void getAllRequiredBlobSidecars_shouldReturnAllRequiredBlobSidecars() { Optional.of( (slotAndRoot) -> { BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getMissingBlobSidecarsForBlock()).thenReturn(missingBlobs2.stream()); + when(tracker.getMissingBlobSidecars()).thenReturn(missingBlobs2.stream()); when(tracker.getBlock()).thenReturn(Optional.of(block2)); return tracker; }); From 0f68f7e9c568853814924841f334d1f89c0b4c33 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Tue, 17 Dec 2024 15:09:52 +0000 Subject: [PATCH 06/14] changes --- CHANGELOG.md | 1 + .../util/BlockBlobSidecarsTrackersPoolImpl.java | 8 +++----- .../util/BlockBlobSidecarsTrackersPoolImplTest.java | 7 +++++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d7628c3eec..a6cc88b16f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,5 +10,6 @@ ### Additions and Improvements - Optimized blobs validation pipeline +- Remove delay when fetching blobs from the local EL on block arrival ### Bug Fixes \ No newline at end of file diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index b3be558d5f7..298d69f8f04 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -504,13 +504,11 @@ private BlockBlobSidecarsTracker internalOnNewBlock( countBlock(remoteOrigin); - if (existingTracker.isRpcBlockFetchTriggered()) { + if (existingTracker.isRpcBlockFetchTriggered() && !existingTracker.isComplete()) { // if we attempted to fetch this block via RPC, we missed the opportunity to // complete the blob sidecars via local EL and RPC (since the block is required to // be known) Let's try now - if (!existingTracker.isComplete()) { - fetchMissingContent(slotAndBlockRoot); - } + asyncRunner.runAsync(() -> fetchMissingContent(slotAndBlockRoot)); } }); @@ -596,7 +594,7 @@ private void onFirstSeen( final Duration blockFetchDelay = calculateBlockFetchDelay(slotAndBlockRoot); asyncRunner.runAfterDelay(() -> fetchMissingContent(slotAndBlockRoot), blockFetchDelay); } - // no delay for attempting to fetch blobs for when the block is first seen + // no delay for attempting to fetch blobs for when the block is first seen case BLOCK -> asyncRunner.runAsync(() -> fetchMissingContent(slotAndBlockRoot)); } } diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java index 93c7494e151..82952104728 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java @@ -887,7 +887,7 @@ void shouldDropPossiblyFetchedBlobSidecars() { } @Test - void shouldTryToFetchFromLocalELWhenBlockArrivesAfterRPCFetch() { + void shouldTryToFetchBlobSidecarsWhenBlockArrivesAfterRPCFetch() { final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(currentSlot); final Set missingBlobs = @@ -912,7 +912,7 @@ void shouldTryToFetchFromLocalELWhenBlockArrivesAfterRPCFetch() { when(tracker.setBlock(any())).thenReturn(true); when(tracker.getSlotAndBlockRoot()).thenReturn(block.getSlotAndBlockRoot()); when(tracker.isRpcBlockFetchTriggered()).thenReturn(true); - when(tracker.isLocalElBlobsFetchTriggered()).thenReturn(false); + when(tracker.isLocalElBlobsFetchTriggered()).thenReturn(true); return tracker; }); @@ -931,6 +931,9 @@ void shouldTryToFetchFromLocalELWhenBlockArrivesAfterRPCFetch() { blockBlobSidecarsTrackersPool.onNewBlock(block, Optional.empty()); + assertThat(asyncRunner.hasDelayedActions()).isTrue(); + asyncRunner.executeQueuedActions(); + verify(tracker).setLocalElBlobsFetchTriggered(); verify(executionLayer).engineGetBlobs(any(), any()); } From 31a145ce8a23f93df8d2004791e629ae409c0146 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Tue, 17 Dec 2024 15:15:32 +0000 Subject: [PATCH 07/14] fix build --- .../util/BlockBlobSidecarsTrackersPoolImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index 298d69f8f04..36c1abbf5ff 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -483,6 +483,7 @@ public synchronized int getTotalBlobSidecarsTrackers() { return blockBlobSidecarsTrackers.size(); } + @SuppressWarnings("FutureReturnValueIgnored") private BlockBlobSidecarsTracker internalOnNewBlock( final SignedBeaconBlock block, final Optional remoteOrigin) { final SlotAndBlockRoot slotAndBlockRoot = block.getSlotAndBlockRoot(); @@ -594,7 +595,7 @@ private void onFirstSeen( final Duration blockFetchDelay = calculateBlockFetchDelay(slotAndBlockRoot); asyncRunner.runAfterDelay(() -> fetchMissingContent(slotAndBlockRoot), blockFetchDelay); } - // no delay for attempting to fetch blobs for when the block is first seen + // no delay for attempting to fetch blobs for when the block is first seen case BLOCK -> asyncRunner.runAsync(() -> fetchMissingContent(slotAndBlockRoot)); } } From 0a2440cd947b691ea9a09f30e81c8ed8d5362365 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Wed, 18 Dec 2024 16:22:28 +0000 Subject: [PATCH 08/14] delay for RPC --- .../blobs/BlockBlobSidecarsTracker.java | 2 +- .../BlockBlobSidecarsTrackersPoolImpl.java | 50 ++++++++++--------- ...BlockBlobSidecarsTrackersPoolImplTest.java | 21 ++++---- 3 files changed, 39 insertions(+), 34 deletions(-) diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java index b16088bf650..ebb01c940fd 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java @@ -321,7 +321,7 @@ private void printDebugTimings(final Map debugTimings) { timingsReport .append("RPC block fetch delay ") .append(debugTimings.get(RPC_BLOCK_FETCH_TIMING_IDX) - creationTime) - .append("ms"); + .append("ms - "); } else { timingsReport.append("RPC block fetch wasn't required"); } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index 36c1abbf5ff..1e6048eb610 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -94,7 +94,7 @@ enum TrackerObject { static final String GAUGE_BLOB_SIDECARS_LABEL = "blob_sidecars"; static final String GAUGE_BLOB_SIDECARS_TRACKERS_LABEL = "blob_sidecars_trackers"; - // Block fetching delay timings + // RPC fetching delay timings static final UInt64 MAX_WAIT_RELATIVE_TO_ATT_DUE_MILLIS = UInt64.valueOf(1500); static final UInt64 MIN_WAIT_MILLIS = UInt64.valueOf(500); static final UInt64 TARGET_WAIT_MILLIS = UInt64.valueOf(1000); @@ -509,7 +509,12 @@ private BlockBlobSidecarsTracker internalOnNewBlock( // if we attempted to fetch this block via RPC, we missed the opportunity to // complete the blob sidecars via local EL and RPC (since the block is required to // be known) Let's try now - asyncRunner.runAsync(() -> fetchMissingContent(slotAndBlockRoot)); + asyncRunner.runAsync( + () -> + fetchMissingBlobsFromLocalEL(slotAndBlockRoot) + .handleException(this::logLocalElBlobsLookupFailure) + .thenRun(() -> fetchMissingBlockOrBlobsFromRPC(slotAndBlockRoot)) + .finish(this::logBlockOrBlobsRPCFailure)); } }); @@ -571,7 +576,7 @@ private BlockBlobSidecarsTracker getOrCreateBlobSidecarsTracker( } private void makeRoomForNewTracker() { - while (blockBlobSidecarsTrackers.size() > (maxTrackers - 1)) { + while (blockBlobSidecarsTrackers.size() > maxTrackers - 1) { final SlotAndBlockRoot toRemove = orderedBlobSidecarsTrackers.pollFirst(); if (toRemove == null) { break; @@ -590,18 +595,20 @@ private void onFirstSeen( if (isLocalBlockProduction) { return; } - switch (trackerObject) { - case BLOB_SIDECAR -> { - final Duration blockFetchDelay = calculateBlockFetchDelay(slotAndBlockRoot); - asyncRunner.runAfterDelay(() -> fetchMissingContent(slotAndBlockRoot), blockFetchDelay); - } - // no delay for attempting to fetch blobs for when the block is first seen - case BLOCK -> asyncRunner.runAsync(() -> fetchMissingContent(slotAndBlockRoot)); + if (trackerObject == TrackerObject.BLOCK) { + // run local EL blobs retrieval with no delay + asyncRunner + .runAsync(() -> fetchMissingBlobsFromLocalEL(slotAndBlockRoot)) + .finish(this::logLocalElBlobsLookupFailure); } + final Duration rpcFetchDelay = calculateRpcFetchDelay(slotAndBlockRoot); + asyncRunner + .runAfterDelay(() -> fetchMissingBlockOrBlobsFromRPC(slotAndBlockRoot), rpcFetchDelay) + .finish(this::logBlockOrBlobsRPCFailure); } @VisibleForTesting - Duration calculateBlockFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) { + Duration calculateRpcFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) { final UInt64 slot = slotAndBlockRoot.getSlot(); if (slot.isLessThan(getCurrentSlot())) { @@ -631,17 +638,6 @@ Duration calculateBlockFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) { return Duration.ofMillis(finalTime.minus(nowMillis).intValue()); } - /** Fetch missing block (when block is unknown) or fetch missing blob sidecars via EL and RPC */ - private void fetchMissingContent(final SlotAndBlockRoot slotAndBlockRoot) { - fetchMissingBlobsFromLocalEL(slotAndBlockRoot) - .handleException( - error -> LOG.warn("Local EL blobs lookup failed: {}", getRootCauseMessage(error))) - .thenRun(() -> fetchMissingContentFromRemotePeers(slotAndBlockRoot)) - .finish( - error -> - LOG.error("An error occurred while attempting to fetch missing content.", error)); - } - private synchronized SafeFuture fetchMissingBlobsFromLocalEL( final SlotAndBlockRoot slotAndBlockRoot) { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = @@ -717,7 +713,11 @@ private synchronized SafeFuture fetchMissingBlobsFromLocalEL( }); } - private synchronized void fetchMissingContentFromRemotePeers( + private void logLocalElBlobsLookupFailure(final Throwable error) { + LOG.warn("Local EL blobs lookup failed: {}", getRootCauseMessage(error)); + } + + private synchronized void fetchMissingBlockOrBlobsFromRPC( final SlotAndBlockRoot slotAndBlockRoot) { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = blockBlobSidecarsTrackers.get(slotAndBlockRoot.getBlockRoot()); @@ -749,6 +749,10 @@ private synchronized void fetchMissingContentFromRemotePeers( }); } + private void logBlockOrBlobsRPCFailure(final Throwable error) { + LOG.error("An error occurred while attempting to fetch block or blobs via RPC.", error); + } + private void dropMissingContent(final BlockBlobSidecarsTracker blockBlobSidecarsTracker) { if (blockBlobSidecarsTracker.isRpcBlockFetchTriggered() diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java index 82952104728..65b0c20c6d6 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java @@ -674,7 +674,8 @@ void shouldFetchMissingBlobSidecarsFromLocalELFirst() { (slotAndRoot) -> { when(tracker.add(any())).thenReturn(true); when(tracker.getMissingBlobSidecars()) - .thenAnswer(__ -> missingBlobIdentifiers.stream()); + .thenAnswer(__ -> missingBlobIdentifiers.stream()) + .thenAnswer(__ -> Stream.empty()); when(tracker.getBlock()).thenReturn(Optional.of(block)); return tracker; }); @@ -1017,14 +1018,14 @@ void shouldRespectTargetWhenBlockIsEarly() { timeProvider.advanceTimeBySeconds(startSlotInSeconds.longValue()); final Duration fetchDelay = - blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot); + blockBlobSidecarsTrackersPool.calculateRpcFetchDelay(slotAndBlockRoot); // we can wait the full target assertThat(fetchDelay).isEqualTo(Duration.ofMillis(TARGET_WAIT_MILLIS.longValue())); } @Test - void calculateBlockFetchDelay_shouldRespectMinimumWhenBlockIsLate() { + void calculateBlockFetchDelay_shouldRespectMinimumWhenRpcIsLate() { final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32()); @@ -1037,14 +1038,14 @@ void calculateBlockFetchDelay_shouldRespectMinimumWhenBlockIsLate() { timeProvider.advanceTimeByMillis(startSlotInMillis.plus(3_800).longValue()); final Duration fetchDelay = - blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot); + blockBlobSidecarsTrackersPool.calculateRpcFetchDelay(slotAndBlockRoot); // we can wait the full target assertThat(fetchDelay).isEqualTo(Duration.ofMillis(MIN_WAIT_MILLIS.longValue())); } @Test - void calculateBlockFetchDelay_shouldRespectTargetWhenBlockIsVeryLate() { + void calculateBlockFetchDelay_shouldRespectTargetWhenRpcIsVeryLate() { final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32()); @@ -1056,14 +1057,14 @@ void calculateBlockFetchDelay_shouldRespectTargetWhenBlockIsVeryLate() { timeProvider.advanceTimeBySeconds(startSlotInSeconds.plus(5).longValue()); final Duration fetchDelay = - blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot); + blockBlobSidecarsTrackersPool.calculateRpcFetchDelay(slotAndBlockRoot); // we can wait the full target assertThat(fetchDelay).isEqualTo(Duration.ofMillis(TARGET_WAIT_MILLIS.longValue())); } @Test - void calculateBlockFetchDelay_shouldRespectAttestationDueLimit() { + void calculateRpcFetchDelay_shouldRespectAttestationDueLimit() { final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32()); @@ -1084,7 +1085,7 @@ void calculateBlockFetchDelay_shouldRespectAttestationDueLimit() { timeProvider.advanceTimeByMillis(blockArrivalTimeMillis.longValue()); final Duration fetchDelay = - blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot); + blockBlobSidecarsTrackersPool.calculateRpcFetchDelay(slotAndBlockRoot); // we can only wait 200ms less than target assertThat(fetchDelay) @@ -1093,12 +1094,12 @@ void calculateBlockFetchDelay_shouldRespectAttestationDueLimit() { } @Test - void calculateBlockFetchDelay_shouldReturnZeroIfSlotIsOld() { + void calculateRpcFetchDelay_shouldReturnZeroIfSlotIsOld() { final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot.minus(1), dataStructureUtil.randomBytes32()); final Duration fetchDelay = - blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot); + blockBlobSidecarsTrackersPool.calculateRpcFetchDelay(slotAndBlockRoot); assertThat(fetchDelay).isEqualTo(Duration.ZERO); } From 25b00d8cda5097535d6610379898da5e4646cd6a Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Wed, 18 Dec 2024 16:41:18 +0000 Subject: [PATCH 09/14] fix --- .../teku/statetransition/blobs/BlockBlobSidecarsTracker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java index ebb01c940fd..a49b91fa1da 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java @@ -323,7 +323,7 @@ private void printDebugTimings(final Map debugTimings) { .append(debugTimings.get(RPC_BLOCK_FETCH_TIMING_IDX) - creationTime) .append("ms - "); } else { - timingsReport.append("RPC block fetch wasn't required"); + timingsReport.append("RPC block fetch wasn't required - "); } if (debugTimings.containsKey(RPC_BLOBS_FETCH_TIMING_IDX)) { From 9d1d5d9b6ecc1a740d8a122d5b0430dad001e18f Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Thu, 19 Dec 2024 09:40:26 +0000 Subject: [PATCH 10/14] improvement --- .../util/BlockBlobSidecarsTrackersPoolImpl.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index 1e6048eb610..b9ac1b32ca4 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -505,15 +505,20 @@ private BlockBlobSidecarsTracker internalOnNewBlock( countBlock(remoteOrigin); - if (existingTracker.isRpcBlockFetchTriggered() && !existingTracker.isComplete()) { - // if we attempted to fetch this block via RPC, we missed the opportunity to - // complete the blob sidecars via local EL and RPC (since the block is required to - // be known) Let's try now + if (!existingTracker.isComplete()) { + // we missed the opportunity to complete the blob sidecars via local EL and RPC + // (since the block is required to be known) Let's try now asyncRunner.runAsync( () -> fetchMissingBlobsFromLocalEL(slotAndBlockRoot) .handleException(this::logLocalElBlobsLookupFailure) - .thenRun(() -> fetchMissingBlockOrBlobsFromRPC(slotAndBlockRoot)) + .thenRun( + () -> { + // respect the RPC fetch delay + if (existingTracker.isRpcBlockFetchTriggered()) { + fetchMissingBlockOrBlobsFromRPC(slotAndBlockRoot); + } + }) .finish(this::logBlockOrBlobsRPCFailure)); } }); From 05487574f3ea461b9f063715fb4af67a4a974fae Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Thu, 19 Dec 2024 16:35:30 +0000 Subject: [PATCH 11/14] improvement --- .../BlockBlobSidecarsTrackersPoolImpl.java | 36 ++++++++----------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index b9ac1b32ca4..4c266caa2c0 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -71,11 +71,6 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis implements BlockBlobSidecarsTrackersPool { private static final Logger LOG = LogManager.getLogger(); - enum TrackerObject { - BLOCK, - BLOB_SIDECAR - } - static final String COUNTER_BLOCK_TYPE = "block"; static final String COUNTER_SIDECAR_TYPE = "blob_sidecar"; @@ -228,7 +223,7 @@ public synchronized void onNewBlobSidecar( slotAndBlockRoot, newTracker -> { addBlobSidecarToTracker(newTracker, slotAndBlockRoot, blobSidecar, remoteOrigin); - onFirstSeen(slotAndBlockRoot, TrackerObject.BLOB_SIDECAR, Optional.of(remoteOrigin)); + onFirstSeen(slotAndBlockRoot, Optional.of(remoteOrigin)); }, existingTracker -> addBlobSidecarToTracker(existingTracker, slotAndBlockRoot, blobSidecar, remoteOrigin)); @@ -494,7 +489,7 @@ private BlockBlobSidecarsTracker internalOnNewBlock( newTracker -> { newTracker.setBlock(block); countBlock(remoteOrigin); - onFirstSeen(slotAndBlockRoot, TrackerObject.BLOCK, remoteOrigin); + onFirstSeen(slotAndBlockRoot, remoteOrigin); }, existingTracker -> { if (!existingTracker.setBlock(block)) { @@ -514,7 +509,7 @@ private BlockBlobSidecarsTracker internalOnNewBlock( .handleException(this::logLocalElBlobsLookupFailure) .thenRun( () -> { - // respect the RPC fetch delay + // only run if RPC block fetch has happened if (existingTracker.isRpcBlockFetchTriggered()) { fetchMissingBlockOrBlobsFromRPC(slotAndBlockRoot); } @@ -592,24 +587,23 @@ private void makeRoomForNewTracker() { @SuppressWarnings("FutureReturnValueIgnored") private void onFirstSeen( - final SlotAndBlockRoot slotAndBlockRoot, - final TrackerObject trackerObject, - final Optional remoteOrigin) { + final SlotAndBlockRoot slotAndBlockRoot, final Optional remoteOrigin) { final boolean isLocalBlockProduction = remoteOrigin.map(ro -> ro.equals(LOCAL_PROPOSAL)).orElse(false); if (isLocalBlockProduction) { return; } - if (trackerObject == TrackerObject.BLOCK) { - // run local EL blobs retrieval with no delay - asyncRunner - .runAsync(() -> fetchMissingBlobsFromLocalEL(slotAndBlockRoot)) - .finish(this::logLocalElBlobsLookupFailure); - } - final Duration rpcFetchDelay = calculateRpcFetchDelay(slotAndBlockRoot); - asyncRunner - .runAfterDelay(() -> fetchMissingBlockOrBlobsFromRPC(slotAndBlockRoot), rpcFetchDelay) - .finish(this::logBlockOrBlobsRPCFailure); + // delay RPC fetching + final SafeFuture rpcFetchDelay = + asyncRunner.getDelayedFuture(calculateRpcFetchDelay(slotAndBlockRoot)); + + asyncRunner.runAsync( + () -> + fetchMissingBlobsFromLocalEL(slotAndBlockRoot) + .handleException(this::logLocalElBlobsLookupFailure) + .thenCompose(__ -> rpcFetchDelay) + .thenRun(() -> fetchMissingBlockOrBlobsFromRPC(slotAndBlockRoot)) + .handleException(this::logBlockOrBlobsRPCFailure)); } @VisibleForTesting From 59674c68b800f4556b648fadd9c2f8d5e85b45ab Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Thu, 19 Dec 2024 16:41:29 +0000 Subject: [PATCH 12/14] comment improvement --- .../util/BlockBlobSidecarsTrackersPoolImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index 4c266caa2c0..deb0fd32e4d 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -509,7 +509,8 @@ private BlockBlobSidecarsTracker internalOnNewBlock( .handleException(this::logLocalElBlobsLookupFailure) .thenRun( () -> { - // only run if RPC block fetch has happened + // only run if RPC block fetch has happened ( no blobs RPC fetch + // has occurred) if (existingTracker.isRpcBlockFetchTriggered()) { fetchMissingBlockOrBlobsFromRPC(slotAndBlockRoot); } From 5d276c4b685bf3b1cef914f49ab11fad6eb077dd Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Thu, 19 Dec 2024 16:43:28 +0000 Subject: [PATCH 13/14] consistency --- .../statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index deb0fd32e4d..e6b73a893c2 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -604,7 +604,7 @@ private void onFirstSeen( .handleException(this::logLocalElBlobsLookupFailure) .thenCompose(__ -> rpcFetchDelay) .thenRun(() -> fetchMissingBlockOrBlobsFromRPC(slotAndBlockRoot)) - .handleException(this::logBlockOrBlobsRPCFailure)); + .finish(this::logBlockOrBlobsRPCFailure)); } @VisibleForTesting From 0c806a9dd4ed62230116b569fb6642d827b2c8f5 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Fri, 20 Dec 2024 08:55:43 +0000 Subject: [PATCH 14/14] small log --- .../statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index e6b73a893c2..f68ca13ac73 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -600,6 +600,7 @@ private void onFirstSeen( asyncRunner.runAsync( () -> + // fetch blobs from EL with no delay fetchMissingBlobsFromLocalEL(slotAndBlockRoot) .handleException(this::logLocalElBlobsLookupFailure) .thenCompose(__ -> rpcFetchDelay)