Skip to content

Commit

Permalink
delay for RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Dec 19, 2024
1 parent 31a145c commit 0a2440c
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ private void printDebugTimings(final Map<UInt64, Long> debugTimings) {
timingsReport
.append("RPC block fetch delay ")
.append(debugTimings.get(RPC_BLOCK_FETCH_TIMING_IDX) - creationTime)
.append("ms");
.append("ms - ");
} else {
timingsReport.append("RPC block fetch wasn't required");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ enum TrackerObject {
static final String GAUGE_BLOB_SIDECARS_LABEL = "blob_sidecars";
static final String GAUGE_BLOB_SIDECARS_TRACKERS_LABEL = "blob_sidecars_trackers";

// Block fetching delay timings
// RPC fetching delay timings
static final UInt64 MAX_WAIT_RELATIVE_TO_ATT_DUE_MILLIS = UInt64.valueOf(1500);
static final UInt64 MIN_WAIT_MILLIS = UInt64.valueOf(500);
static final UInt64 TARGET_WAIT_MILLIS = UInt64.valueOf(1000);
Expand Down Expand Up @@ -509,7 +509,12 @@ private BlockBlobSidecarsTracker internalOnNewBlock(
// if we attempted to fetch this block via RPC, we missed the opportunity to
// complete the blob sidecars via local EL and RPC (since the block is required to
// be known) Let's try now
asyncRunner.runAsync(() -> fetchMissingContent(slotAndBlockRoot));
asyncRunner.runAsync(
() ->
fetchMissingBlobsFromLocalEL(slotAndBlockRoot)
.handleException(this::logLocalElBlobsLookupFailure)
.thenRun(() -> fetchMissingBlockOrBlobsFromRPC(slotAndBlockRoot))
.finish(this::logBlockOrBlobsRPCFailure));
}
});

Expand Down Expand Up @@ -571,7 +576,7 @@ private BlockBlobSidecarsTracker getOrCreateBlobSidecarsTracker(
}

private void makeRoomForNewTracker() {
while (blockBlobSidecarsTrackers.size() > (maxTrackers - 1)) {
while (blockBlobSidecarsTrackers.size() > maxTrackers - 1) {
final SlotAndBlockRoot toRemove = orderedBlobSidecarsTrackers.pollFirst();
if (toRemove == null) {
break;
Expand All @@ -590,18 +595,20 @@ private void onFirstSeen(
if (isLocalBlockProduction) {
return;
}
switch (trackerObject) {
case BLOB_SIDECAR -> {
final Duration blockFetchDelay = calculateBlockFetchDelay(slotAndBlockRoot);
asyncRunner.runAfterDelay(() -> fetchMissingContent(slotAndBlockRoot), blockFetchDelay);
}
// no delay for attempting to fetch blobs for when the block is first seen
case BLOCK -> asyncRunner.runAsync(() -> fetchMissingContent(slotAndBlockRoot));
if (trackerObject == TrackerObject.BLOCK) {
// run local EL blobs retrieval with no delay
asyncRunner
.runAsync(() -> fetchMissingBlobsFromLocalEL(slotAndBlockRoot))
.finish(this::logLocalElBlobsLookupFailure);
}
final Duration rpcFetchDelay = calculateRpcFetchDelay(slotAndBlockRoot);
asyncRunner
.runAfterDelay(() -> fetchMissingBlockOrBlobsFromRPC(slotAndBlockRoot), rpcFetchDelay)
.finish(this::logBlockOrBlobsRPCFailure);
}

@VisibleForTesting
Duration calculateBlockFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) {
Duration calculateRpcFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) {
final UInt64 slot = slotAndBlockRoot.getSlot();

if (slot.isLessThan(getCurrentSlot())) {
Expand Down Expand Up @@ -631,17 +638,6 @@ Duration calculateBlockFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) {
return Duration.ofMillis(finalTime.minus(nowMillis).intValue());
}

/** Fetch missing block (when block is unknown) or fetch missing blob sidecars via EL and RPC */
private void fetchMissingContent(final SlotAndBlockRoot slotAndBlockRoot) {
fetchMissingBlobsFromLocalEL(slotAndBlockRoot)
.handleException(
error -> LOG.warn("Local EL blobs lookup failed: {}", getRootCauseMessage(error)))
.thenRun(() -> fetchMissingContentFromRemotePeers(slotAndBlockRoot))
.finish(
error ->
LOG.error("An error occurred while attempting to fetch missing content.", error));
}

private synchronized SafeFuture<Void> fetchMissingBlobsFromLocalEL(
final SlotAndBlockRoot slotAndBlockRoot) {
final BlockBlobSidecarsTracker blockBlobSidecarsTracker =
Expand Down Expand Up @@ -717,7 +713,11 @@ private synchronized SafeFuture<Void> fetchMissingBlobsFromLocalEL(
});
}

private synchronized void fetchMissingContentFromRemotePeers(
private void logLocalElBlobsLookupFailure(final Throwable error) {
LOG.warn("Local EL blobs lookup failed: {}", getRootCauseMessage(error));
}

private synchronized void fetchMissingBlockOrBlobsFromRPC(
final SlotAndBlockRoot slotAndBlockRoot) {
final BlockBlobSidecarsTracker blockBlobSidecarsTracker =
blockBlobSidecarsTrackers.get(slotAndBlockRoot.getBlockRoot());
Expand Down Expand Up @@ -749,6 +749,10 @@ private synchronized void fetchMissingContentFromRemotePeers(
});
}

private void logBlockOrBlobsRPCFailure(final Throwable error) {
LOG.error("An error occurred while attempting to fetch block or blobs via RPC.", error);
}

private void dropMissingContent(final BlockBlobSidecarsTracker blockBlobSidecarsTracker) {

if (blockBlobSidecarsTracker.isRpcBlockFetchTriggered()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,8 @@ void shouldFetchMissingBlobSidecarsFromLocalELFirst() {
(slotAndRoot) -> {
when(tracker.add(any())).thenReturn(true);
when(tracker.getMissingBlobSidecars())
.thenAnswer(__ -> missingBlobIdentifiers.stream());
.thenAnswer(__ -> missingBlobIdentifiers.stream())
.thenAnswer(__ -> Stream.empty());
when(tracker.getBlock()).thenReturn(Optional.of(block));
return tracker;
});
Expand Down Expand Up @@ -1017,14 +1018,14 @@ void shouldRespectTargetWhenBlockIsEarly() {
timeProvider.advanceTimeBySeconds(startSlotInSeconds.longValue());

final Duration fetchDelay =
blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot);
blockBlobSidecarsTrackersPool.calculateRpcFetchDelay(slotAndBlockRoot);

// we can wait the full target
assertThat(fetchDelay).isEqualTo(Duration.ofMillis(TARGET_WAIT_MILLIS.longValue()));
}

@Test
void calculateBlockFetchDelay_shouldRespectMinimumWhenBlockIsLate() {
void calculateBlockFetchDelay_shouldRespectMinimumWhenRpcIsLate() {
final SlotAndBlockRoot slotAndBlockRoot =
new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32());

Expand All @@ -1037,14 +1038,14 @@ void calculateBlockFetchDelay_shouldRespectMinimumWhenBlockIsLate() {
timeProvider.advanceTimeByMillis(startSlotInMillis.plus(3_800).longValue());

final Duration fetchDelay =
blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot);
blockBlobSidecarsTrackersPool.calculateRpcFetchDelay(slotAndBlockRoot);

// we can wait the full target
assertThat(fetchDelay).isEqualTo(Duration.ofMillis(MIN_WAIT_MILLIS.longValue()));
}

@Test
void calculateBlockFetchDelay_shouldRespectTargetWhenBlockIsVeryLate() {
void calculateBlockFetchDelay_shouldRespectTargetWhenRpcIsVeryLate() {
final SlotAndBlockRoot slotAndBlockRoot =
new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32());

Expand All @@ -1056,14 +1057,14 @@ void calculateBlockFetchDelay_shouldRespectTargetWhenBlockIsVeryLate() {
timeProvider.advanceTimeBySeconds(startSlotInSeconds.plus(5).longValue());

final Duration fetchDelay =
blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot);
blockBlobSidecarsTrackersPool.calculateRpcFetchDelay(slotAndBlockRoot);

// we can wait the full target
assertThat(fetchDelay).isEqualTo(Duration.ofMillis(TARGET_WAIT_MILLIS.longValue()));
}

@Test
void calculateBlockFetchDelay_shouldRespectAttestationDueLimit() {
void calculateRpcFetchDelay_shouldRespectAttestationDueLimit() {
final SlotAndBlockRoot slotAndBlockRoot =
new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32());

Expand All @@ -1084,7 +1085,7 @@ void calculateBlockFetchDelay_shouldRespectAttestationDueLimit() {
timeProvider.advanceTimeByMillis(blockArrivalTimeMillis.longValue());

final Duration fetchDelay =
blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot);
blockBlobSidecarsTrackersPool.calculateRpcFetchDelay(slotAndBlockRoot);

// we can only wait 200ms less than target
assertThat(fetchDelay)
Expand All @@ -1093,12 +1094,12 @@ void calculateBlockFetchDelay_shouldRespectAttestationDueLimit() {
}

@Test
void calculateBlockFetchDelay_shouldReturnZeroIfSlotIsOld() {
void calculateRpcFetchDelay_shouldReturnZeroIfSlotIsOld() {
final SlotAndBlockRoot slotAndBlockRoot =
new SlotAndBlockRoot(currentSlot.minus(1), dataStructureUtil.randomBytes32());

final Duration fetchDelay =
blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot);
blockBlobSidecarsTrackersPool.calculateRpcFetchDelay(slotAndBlockRoot);

assertThat(fetchDelay).isEqualTo(Duration.ZERO);
}
Expand Down

0 comments on commit 0a2440c

Please sign in to comment.