From e5794737e317dbc673c4e955f2f2485c819cb862 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Tue, 17 Dec 2024 11:29:14 +0000 Subject: [PATCH] remove delay when fetching blobs and block is known --- .../blobs/BlockBlobSidecarsTracker.java | 2 +- .../BlockBlobSidecarsTrackersPoolImpl.java | 110 ++++++++++-------- ...BlockBlobSidecarsTrackersPoolImplTest.java | 23 ++-- 3 files changed, 77 insertions(+), 58 deletions(-) diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java index 6d3c9f95d21..314be521041 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java @@ -314,7 +314,7 @@ private void printDebugTimings(final Map debugTimings) { .append(debugTimings.get(LOCAL_EL_BLOBS_FETCH_TIMING_IDX) - creationTime) .append("ms - "); } else { - timingsReport.append("Local EL fetch wasn't required - "); + timingsReport.append("Local EL blobs fetch wasn't required - "); } if (debugTimings.containsKey(RPC_BLOCK_FETCH_TIMING_IDX)) { diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index 87000cc5da0..a7efb8567b7 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -71,6 +71,11 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis implements BlockBlobSidecarsTrackersPool { private static final Logger LOG = LogManager.getLogger(); + enum TrackerObject { + BLOCK, + BLOB_SIDECAR; + } + static final String COUNTER_BLOCK_TYPE = "block"; static final String COUNTER_SIDECAR_TYPE = "blob_sidecar"; @@ -89,6 +94,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis static final String GAUGE_BLOB_SIDECARS_LABEL = "blob_sidecars"; static final String GAUGE_BLOB_SIDECARS_TRACKERS_LABEL = "blob_sidecars_trackers"; + // Block fetching delay timings static final UInt64 MAX_WAIT_RELATIVE_TO_ATT_DUE_MILLIS = UInt64.valueOf(1500); static final UInt64 MIN_WAIT_MILLIS = UInt64.valueOf(500); static final UInt64 TARGET_WAIT_MILLIS = UInt64.valueOf(1000); @@ -218,12 +224,25 @@ public synchronized void onNewBlobSidecar( final SlotAndBlockRoot slotAndBlockRoot = blobSidecar.getSlotAndBlockRoot(); - final BlockBlobSidecarsTracker blobSidecarsTracker = - getOrCreateBlobSidecarsTracker( - slotAndBlockRoot, - newTracker -> onFirstSeen(slotAndBlockRoot, Optional.of(remoteOrigin)), - existingTracker -> {}); + getOrCreateBlobSidecarsTracker( + slotAndBlockRoot, + newTracker -> { + addBlobSidecarToTracker(newTracker, slotAndBlockRoot, blobSidecar, remoteOrigin); + onFirstSeen(slotAndBlockRoot, TrackerObject.BLOB_SIDECAR, Optional.of(remoteOrigin)); + }, + existingTracker -> + addBlobSidecarToTracker(existingTracker, slotAndBlockRoot, blobSidecar, remoteOrigin)); + + if (orderedBlobSidecarsTrackers.add(slotAndBlockRoot)) { + sizeGauge.set(orderedBlobSidecarsTrackers.size(), GAUGE_BLOB_SIDECARS_TRACKERS_LABEL); + } + } + private void addBlobSidecarToTracker( + final BlockBlobSidecarsTracker blobSidecarsTracker, + final SlotAndBlockRoot slotAndBlockRoot, + final BlobSidecar blobSidecar, + final RemoteOrigin remoteOrigin) { if (blobSidecarsTracker.add(blobSidecar)) { sizeGauge.set(++totalBlobSidecars, GAUGE_BLOB_SIDECARS_LABEL); countBlobSidecar(remoteOrigin); @@ -234,10 +253,6 @@ public synchronized void onNewBlobSidecar( } else { countDuplicateBlobSidecar(remoteOrigin); } - - if (orderedBlobSidecarsTrackers.add(slotAndBlockRoot)) { - sizeGauge.set(orderedBlobSidecarsTrackers.size(), GAUGE_BLOB_SIDECARS_TRACKERS_LABEL); - } } private void publishRecoveredBlobSidecar(final BlobSidecar blobSidecar) { @@ -478,7 +493,7 @@ private BlockBlobSidecarsTracker internalOnNewBlock( newTracker -> { newTracker.setBlock(block); countBlock(remoteOrigin); - onFirstSeen(slotAndBlockRoot, remoteOrigin); + onFirstSeen(slotAndBlockRoot, TrackerObject.BLOCK, remoteOrigin); }, existingTracker -> { if (!existingTracker.setBlock(block)) { @@ -494,7 +509,7 @@ private BlockBlobSidecarsTracker internalOnNewBlock( // complete the blob sidecars via local EL and RPC (since the block is required to // be known) Let's try now if (!existingTracker.isComplete()) { - fetchMissingContent(slotAndBlockRoot).finish(this::logMissingContentFetchFailure); + fetchMissingContent(slotAndBlockRoot); } } }); @@ -566,37 +581,28 @@ private void makeRoomForNewTracker() { } } + @SuppressWarnings("FutureReturnValueIgnored") private void onFirstSeen( - final SlotAndBlockRoot slotAndBlockRoot, final Optional remoteOrigin) { + final SlotAndBlockRoot slotAndBlockRoot, + final TrackerObject trackerObject, + final Optional remoteOrigin) { final boolean isLocalBlockProduction = remoteOrigin.map(ro -> ro.equals(LOCAL_PROPOSAL)).orElse(false); if (isLocalBlockProduction) { return; } - - final Duration fetchDelay = calculateFetchDelay(slotAndBlockRoot); - - asyncRunner - .runAfterDelay(() -> fetchMissingContent(slotAndBlockRoot), fetchDelay) - .finish(this::logMissingContentFetchFailure); - } - - private SafeFuture fetchMissingContent(final SlotAndBlockRoot slotAndBlockRoot) { - return fetchMissingBlobsFromLocalEL(slotAndBlockRoot) - .handleException(this::logLocalElBlobsLookupFailure) - .thenRun(() -> fetchMissingContentFromRemotePeers(slotAndBlockRoot)); - } - - private void logLocalElBlobsLookupFailure(final Throwable error) { - LOG.warn("Local EL blobs lookup failed: {}", getRootCauseMessage(error)); - } - - private void logMissingContentFetchFailure(final Throwable error) { - LOG.error("An error occurred while attempting to fetch missing content.", error); + switch (trackerObject) { + case BLOB_SIDECAR -> { + final Duration blockFetchDelay = calculateBlockFetchDelay(slotAndBlockRoot); + asyncRunner.runAfterDelay(() -> fetchMissingContent(slotAndBlockRoot), blockFetchDelay); + } + // no delay for attempting to fetch blobs for when the block is first seen + case BLOCK -> fetchMissingContent(slotAndBlockRoot); + } } @VisibleForTesting - Duration calculateFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) { + Duration calculateBlockFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) { final UInt64 slot = slotAndBlockRoot.getSlot(); if (slot.isLessThan(getCurrentSlot())) { @@ -626,6 +632,16 @@ Duration calculateFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) { return Duration.ofMillis(finalTime.minus(nowMillis).intValue()); } + private void fetchMissingContent(final SlotAndBlockRoot slotAndBlockRoot) { + fetchMissingBlobsFromLocalEL(slotAndBlockRoot) + .handleException( + error -> LOG.warn("Local EL blobs lookup failed: {}", getRootCauseMessage(error))) + .thenRun(() -> fetchMissingContentFromRemotePeers(slotAndBlockRoot)) + .finish( + error -> + LOG.error("An error occurred while attempting to fetch missing content.", error)); + } + private synchronized SafeFuture fetchMissingBlobsFromLocalEL( final SlotAndBlockRoot slotAndBlockRoot) { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = @@ -711,7 +727,7 @@ private synchronized void fetchMissingContentFromRemotePeers( } if (blockBlobSidecarsTracker.getBlock().isEmpty()) { - + // fetch missing block blockBlobSidecarsTracker.setRpcBlockFetchTriggered(); poolStatsCounters.labels(COUNTER_BLOCK_TYPE, COUNTER_RPC_FETCH_SUBTYPE).inc(); @@ -721,6 +737,7 @@ private synchronized void fetchMissingContentFromRemotePeers( return; } + // fetch missing blob sidecars blockBlobSidecarsTracker.setRpcBlobsFetchTriggered(); blockBlobSidecarsTracker @@ -735,24 +752,21 @@ private synchronized void fetchMissingContentFromRemotePeers( private void dropMissingContent(final BlockBlobSidecarsTracker blockBlobSidecarsTracker) { - if (!blockBlobSidecarsTracker.isRpcBlockFetchTriggered() - && !blockBlobSidecarsTracker.isRpcBlobsFetchTriggered()) { - return; - } - - if (blockBlobSidecarsTracker.getBlock().isEmpty()) { + if (blockBlobSidecarsTracker.isRpcBlockFetchTriggered() + && blockBlobSidecarsTracker.getBlock().isEmpty()) { requiredBlockRootDroppedSubscribers.deliver( RequiredBlockRootDroppedSubscriber::onRequiredBlockRootDropped, blockBlobSidecarsTracker.getSlotAndBlockRoot().getBlockRoot()); - return; } - blockBlobSidecarsTracker - .getMissingBlobSidecarsForBlock() - .forEach( - blobIdentifier -> - requiredBlobSidecarDroppedSubscribers.deliver( - RequiredBlobSidecarDroppedSubscriber::onRequiredBlobSidecarDropped, - blobIdentifier)); + if (blockBlobSidecarsTracker.isRpcBlobsFetchTriggered()) { + blockBlobSidecarsTracker + .getMissingBlobSidecarsForBlock() + .forEach( + blobIdentifier -> + requiredBlobSidecarDroppedSubscribers.deliver( + RequiredBlobSidecarDroppedSubscriber::onRequiredBlobSidecarDropped, + blobIdentifier)); + } } } diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java index 40884b71d4d..23db2505e8f 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java @@ -1092,14 +1092,15 @@ void shouldRespectTargetWhenBlockIsEarly() { // blocks arrives at slot start timeProvider.advanceTimeBySeconds(startSlotInSeconds.longValue()); - final Duration fetchDelay = blockBlobSidecarsTrackersPool.calculateFetchDelay(slotAndBlockRoot); + final Duration fetchDelay = + blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot); // we can wait the full target assertThat(fetchDelay).isEqualTo(Duration.ofMillis(TARGET_WAIT_MILLIS.longValue())); } @Test - void calculateFetchDelay_shouldRespectMinimumWhenBlockIsLate() { + void calculateBlockFetchDelay_shouldRespectMinimumWhenBlockIsLate() { final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32()); @@ -1111,14 +1112,15 @@ void calculateFetchDelay_shouldRespectMinimumWhenBlockIsLate() { // blocks arrives 200ms before attestation due timeProvider.advanceTimeByMillis(startSlotInMillis.plus(3_800).longValue()); - final Duration fetchDelay = blockBlobSidecarsTrackersPool.calculateFetchDelay(slotAndBlockRoot); + final Duration fetchDelay = + blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot); // we can wait the full target assertThat(fetchDelay).isEqualTo(Duration.ofMillis(MIN_WAIT_MILLIS.longValue())); } @Test - void calculateFetchDelay_shouldRespectTargetWhenBlockIsVeryLate() { + void calculateBlockFetchDelay_shouldRespectTargetWhenBlockIsVeryLate() { final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32()); @@ -1129,14 +1131,15 @@ void calculateFetchDelay_shouldRespectTargetWhenBlockIsVeryLate() { // blocks arrives 1s after attestation due timeProvider.advanceTimeBySeconds(startSlotInSeconds.plus(5).longValue()); - final Duration fetchDelay = blockBlobSidecarsTrackersPool.calculateFetchDelay(slotAndBlockRoot); + final Duration fetchDelay = + blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot); // we can wait the full target assertThat(fetchDelay).isEqualTo(Duration.ofMillis(TARGET_WAIT_MILLIS.longValue())); } @Test - void calculateFetchDelay_shouldRespectAttestationDueLimit() { + void calculateBlockFetchDelay_shouldRespectAttestationDueLimit() { final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32()); @@ -1156,7 +1159,8 @@ void calculateFetchDelay_shouldRespectAttestationDueLimit() { timeProvider.advanceTimeByMillis(blockArrivalTimeMillis.longValue()); - final Duration fetchDelay = blockBlobSidecarsTrackersPool.calculateFetchDelay(slotAndBlockRoot); + final Duration fetchDelay = + blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot); // we can only wait 200ms less than target assertThat(fetchDelay) @@ -1165,11 +1169,12 @@ void calculateFetchDelay_shouldRespectAttestationDueLimit() { } @Test - void calculateFetchDelay_shouldReturnZeroIfSlotIsOld() { + void calculateBlockFetchDelay_shouldReturnZeroIfSlotIsOld() { final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot.minus(1), dataStructureUtil.randomBytes32()); - final Duration fetchDelay = blockBlobSidecarsTrackersPool.calculateFetchDelay(slotAndBlockRoot); + final Duration fetchDelay = + blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot); assertThat(fetchDelay).isEqualTo(Duration.ZERO); }