From 0a2440cd947b691ea9a09f30e81c8ed8d5362365 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Wed, 18 Dec 2024 16:22:28 +0000 Subject: [PATCH] delay for RPC --- .../blobs/BlockBlobSidecarsTracker.java | 2 +- .../BlockBlobSidecarsTrackersPoolImpl.java | 50 ++++++++++--------- ...BlockBlobSidecarsTrackersPoolImplTest.java | 21 ++++---- 3 files changed, 39 insertions(+), 34 deletions(-) diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java index b16088bf650..ebb01c940fd 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java @@ -321,7 +321,7 @@ private void printDebugTimings(final Map debugTimings) { timingsReport .append("RPC block fetch delay ") .append(debugTimings.get(RPC_BLOCK_FETCH_TIMING_IDX) - creationTime) - .append("ms"); + .append("ms - "); } else { timingsReport.append("RPC block fetch wasn't required"); } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index 36c1abbf5ff..1e6048eb610 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -94,7 +94,7 @@ enum TrackerObject { static final String GAUGE_BLOB_SIDECARS_LABEL = "blob_sidecars"; static final String GAUGE_BLOB_SIDECARS_TRACKERS_LABEL = "blob_sidecars_trackers"; - // Block fetching delay timings + // RPC fetching delay timings static final UInt64 MAX_WAIT_RELATIVE_TO_ATT_DUE_MILLIS = UInt64.valueOf(1500); static final UInt64 MIN_WAIT_MILLIS = UInt64.valueOf(500); static final UInt64 TARGET_WAIT_MILLIS = UInt64.valueOf(1000); @@ -509,7 +509,12 @@ private BlockBlobSidecarsTracker internalOnNewBlock( // if we attempted to fetch this block via RPC, we missed the opportunity to // complete the blob sidecars via local EL and RPC (since the block is required to // be known) Let's try now - asyncRunner.runAsync(() -> fetchMissingContent(slotAndBlockRoot)); + asyncRunner.runAsync( + () -> + fetchMissingBlobsFromLocalEL(slotAndBlockRoot) + .handleException(this::logLocalElBlobsLookupFailure) + .thenRun(() -> fetchMissingBlockOrBlobsFromRPC(slotAndBlockRoot)) + .finish(this::logBlockOrBlobsRPCFailure)); } }); @@ -571,7 +576,7 @@ private BlockBlobSidecarsTracker getOrCreateBlobSidecarsTracker( } private void makeRoomForNewTracker() { - while (blockBlobSidecarsTrackers.size() > (maxTrackers - 1)) { + while (blockBlobSidecarsTrackers.size() > maxTrackers - 1) { final SlotAndBlockRoot toRemove = orderedBlobSidecarsTrackers.pollFirst(); if (toRemove == null) { break; @@ -590,18 +595,20 @@ private void onFirstSeen( if (isLocalBlockProduction) { return; } - switch (trackerObject) { - case BLOB_SIDECAR -> { - final Duration blockFetchDelay = calculateBlockFetchDelay(slotAndBlockRoot); - asyncRunner.runAfterDelay(() -> fetchMissingContent(slotAndBlockRoot), blockFetchDelay); - } - // no delay for attempting to fetch blobs for when the block is first seen - case BLOCK -> asyncRunner.runAsync(() -> fetchMissingContent(slotAndBlockRoot)); + if (trackerObject == TrackerObject.BLOCK) { + // run local EL blobs retrieval with no delay + asyncRunner + .runAsync(() -> fetchMissingBlobsFromLocalEL(slotAndBlockRoot)) + .finish(this::logLocalElBlobsLookupFailure); } + final Duration rpcFetchDelay = calculateRpcFetchDelay(slotAndBlockRoot); + asyncRunner + .runAfterDelay(() -> fetchMissingBlockOrBlobsFromRPC(slotAndBlockRoot), rpcFetchDelay) + .finish(this::logBlockOrBlobsRPCFailure); } @VisibleForTesting - Duration calculateBlockFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) { + Duration calculateRpcFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) { final UInt64 slot = slotAndBlockRoot.getSlot(); if (slot.isLessThan(getCurrentSlot())) { @@ -631,17 +638,6 @@ Duration calculateBlockFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) { return Duration.ofMillis(finalTime.minus(nowMillis).intValue()); } - /** Fetch missing block (when block is unknown) or fetch missing blob sidecars via EL and RPC */ - private void fetchMissingContent(final SlotAndBlockRoot slotAndBlockRoot) { - fetchMissingBlobsFromLocalEL(slotAndBlockRoot) - .handleException( - error -> LOG.warn("Local EL blobs lookup failed: {}", getRootCauseMessage(error))) - .thenRun(() -> fetchMissingContentFromRemotePeers(slotAndBlockRoot)) - .finish( - error -> - LOG.error("An error occurred while attempting to fetch missing content.", error)); - } - private synchronized SafeFuture fetchMissingBlobsFromLocalEL( final SlotAndBlockRoot slotAndBlockRoot) { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = @@ -717,7 +713,11 @@ private synchronized SafeFuture fetchMissingBlobsFromLocalEL( }); } - private synchronized void fetchMissingContentFromRemotePeers( + private void logLocalElBlobsLookupFailure(final Throwable error) { + LOG.warn("Local EL blobs lookup failed: {}", getRootCauseMessage(error)); + } + + private synchronized void fetchMissingBlockOrBlobsFromRPC( final SlotAndBlockRoot slotAndBlockRoot) { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = blockBlobSidecarsTrackers.get(slotAndBlockRoot.getBlockRoot()); @@ -749,6 +749,10 @@ private synchronized void fetchMissingContentFromRemotePeers( }); } + private void logBlockOrBlobsRPCFailure(final Throwable error) { + LOG.error("An error occurred while attempting to fetch block or blobs via RPC.", error); + } + private void dropMissingContent(final BlockBlobSidecarsTracker blockBlobSidecarsTracker) { if (blockBlobSidecarsTracker.isRpcBlockFetchTriggered() diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java index 82952104728..65b0c20c6d6 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java @@ -674,7 +674,8 @@ void shouldFetchMissingBlobSidecarsFromLocalELFirst() { (slotAndRoot) -> { when(tracker.add(any())).thenReturn(true); when(tracker.getMissingBlobSidecars()) - .thenAnswer(__ -> missingBlobIdentifiers.stream()); + .thenAnswer(__ -> missingBlobIdentifiers.stream()) + .thenAnswer(__ -> Stream.empty()); when(tracker.getBlock()).thenReturn(Optional.of(block)); return tracker; }); @@ -1017,14 +1018,14 @@ void shouldRespectTargetWhenBlockIsEarly() { timeProvider.advanceTimeBySeconds(startSlotInSeconds.longValue()); final Duration fetchDelay = - blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot); + blockBlobSidecarsTrackersPool.calculateRpcFetchDelay(slotAndBlockRoot); // we can wait the full target assertThat(fetchDelay).isEqualTo(Duration.ofMillis(TARGET_WAIT_MILLIS.longValue())); } @Test - void calculateBlockFetchDelay_shouldRespectMinimumWhenBlockIsLate() { + void calculateBlockFetchDelay_shouldRespectMinimumWhenRpcIsLate() { final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32()); @@ -1037,14 +1038,14 @@ void calculateBlockFetchDelay_shouldRespectMinimumWhenBlockIsLate() { timeProvider.advanceTimeByMillis(startSlotInMillis.plus(3_800).longValue()); final Duration fetchDelay = - blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot); + blockBlobSidecarsTrackersPool.calculateRpcFetchDelay(slotAndBlockRoot); // we can wait the full target assertThat(fetchDelay).isEqualTo(Duration.ofMillis(MIN_WAIT_MILLIS.longValue())); } @Test - void calculateBlockFetchDelay_shouldRespectTargetWhenBlockIsVeryLate() { + void calculateBlockFetchDelay_shouldRespectTargetWhenRpcIsVeryLate() { final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32()); @@ -1056,14 +1057,14 @@ void calculateBlockFetchDelay_shouldRespectTargetWhenBlockIsVeryLate() { timeProvider.advanceTimeBySeconds(startSlotInSeconds.plus(5).longValue()); final Duration fetchDelay = - blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot); + blockBlobSidecarsTrackersPool.calculateRpcFetchDelay(slotAndBlockRoot); // we can wait the full target assertThat(fetchDelay).isEqualTo(Duration.ofMillis(TARGET_WAIT_MILLIS.longValue())); } @Test - void calculateBlockFetchDelay_shouldRespectAttestationDueLimit() { + void calculateRpcFetchDelay_shouldRespectAttestationDueLimit() { final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32()); @@ -1084,7 +1085,7 @@ void calculateBlockFetchDelay_shouldRespectAttestationDueLimit() { timeProvider.advanceTimeByMillis(blockArrivalTimeMillis.longValue()); final Duration fetchDelay = - blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot); + blockBlobSidecarsTrackersPool.calculateRpcFetchDelay(slotAndBlockRoot); // we can only wait 200ms less than target assertThat(fetchDelay) @@ -1093,12 +1094,12 @@ void calculateBlockFetchDelay_shouldRespectAttestationDueLimit() { } @Test - void calculateBlockFetchDelay_shouldReturnZeroIfSlotIsOld() { + void calculateRpcFetchDelay_shouldReturnZeroIfSlotIsOld() { final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot.minus(1), dataStructureUtil.randomBytes32()); final Duration fetchDelay = - blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot); + blockBlobSidecarsTrackersPool.calculateRpcFetchDelay(slotAndBlockRoot); assertThat(fetchDelay).isEqualTo(Duration.ZERO); }