Skip to content

Commit

Permalink
remove delay when fetching blobs and block is known
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Dec 17, 2024
1 parent 9126d62 commit e579473
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ private void printDebugTimings(final Map<UInt64, Long> debugTimings) {
.append(debugTimings.get(LOCAL_EL_BLOBS_FETCH_TIMING_IDX) - creationTime)
.append("ms - ");
} else {
timingsReport.append("Local EL fetch wasn't required - ");
timingsReport.append("Local EL blobs fetch wasn't required - ");
}

if (debugTimings.containsKey(RPC_BLOCK_FETCH_TIMING_IDX)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
implements BlockBlobSidecarsTrackersPool {
private static final Logger LOG = LogManager.getLogger();

enum TrackerObject {
BLOCK,
BLOB_SIDECAR;
}

static final String COUNTER_BLOCK_TYPE = "block";
static final String COUNTER_SIDECAR_TYPE = "blob_sidecar";

Expand All @@ -89,6 +94,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
static final String GAUGE_BLOB_SIDECARS_LABEL = "blob_sidecars";
static final String GAUGE_BLOB_SIDECARS_TRACKERS_LABEL = "blob_sidecars_trackers";

// Block fetching delay timings
static final UInt64 MAX_WAIT_RELATIVE_TO_ATT_DUE_MILLIS = UInt64.valueOf(1500);
static final UInt64 MIN_WAIT_MILLIS = UInt64.valueOf(500);
static final UInt64 TARGET_WAIT_MILLIS = UInt64.valueOf(1000);
Expand Down Expand Up @@ -218,12 +224,25 @@ public synchronized void onNewBlobSidecar(

final SlotAndBlockRoot slotAndBlockRoot = blobSidecar.getSlotAndBlockRoot();

final BlockBlobSidecarsTracker blobSidecarsTracker =
getOrCreateBlobSidecarsTracker(
slotAndBlockRoot,
newTracker -> onFirstSeen(slotAndBlockRoot, Optional.of(remoteOrigin)),
existingTracker -> {});
getOrCreateBlobSidecarsTracker(
slotAndBlockRoot,
newTracker -> {
addBlobSidecarToTracker(newTracker, slotAndBlockRoot, blobSidecar, remoteOrigin);
onFirstSeen(slotAndBlockRoot, TrackerObject.BLOB_SIDECAR, Optional.of(remoteOrigin));
},
existingTracker ->
addBlobSidecarToTracker(existingTracker, slotAndBlockRoot, blobSidecar, remoteOrigin));

if (orderedBlobSidecarsTrackers.add(slotAndBlockRoot)) {
sizeGauge.set(orderedBlobSidecarsTrackers.size(), GAUGE_BLOB_SIDECARS_TRACKERS_LABEL);
}
}

private void addBlobSidecarToTracker(
final BlockBlobSidecarsTracker blobSidecarsTracker,
final SlotAndBlockRoot slotAndBlockRoot,
final BlobSidecar blobSidecar,
final RemoteOrigin remoteOrigin) {
if (blobSidecarsTracker.add(blobSidecar)) {
sizeGauge.set(++totalBlobSidecars, GAUGE_BLOB_SIDECARS_LABEL);
countBlobSidecar(remoteOrigin);
Expand All @@ -234,10 +253,6 @@ public synchronized void onNewBlobSidecar(
} else {
countDuplicateBlobSidecar(remoteOrigin);
}

if (orderedBlobSidecarsTrackers.add(slotAndBlockRoot)) {
sizeGauge.set(orderedBlobSidecarsTrackers.size(), GAUGE_BLOB_SIDECARS_TRACKERS_LABEL);
}
}

private void publishRecoveredBlobSidecar(final BlobSidecar blobSidecar) {
Expand Down Expand Up @@ -478,7 +493,7 @@ private BlockBlobSidecarsTracker internalOnNewBlock(
newTracker -> {
newTracker.setBlock(block);
countBlock(remoteOrigin);
onFirstSeen(slotAndBlockRoot, remoteOrigin);
onFirstSeen(slotAndBlockRoot, TrackerObject.BLOCK, remoteOrigin);
},
existingTracker -> {
if (!existingTracker.setBlock(block)) {
Expand All @@ -494,7 +509,7 @@ private BlockBlobSidecarsTracker internalOnNewBlock(
// complete the blob sidecars via local EL and RPC (since the block is required to
// be known) Let's try now
if (!existingTracker.isComplete()) {
fetchMissingContent(slotAndBlockRoot).finish(this::logMissingContentFetchFailure);
fetchMissingContent(slotAndBlockRoot);
}
}
});
Expand Down Expand Up @@ -566,37 +581,28 @@ private void makeRoomForNewTracker() {
}
}

@SuppressWarnings("FutureReturnValueIgnored")
private void onFirstSeen(
final SlotAndBlockRoot slotAndBlockRoot, final Optional<RemoteOrigin> remoteOrigin) {
final SlotAndBlockRoot slotAndBlockRoot,
final TrackerObject trackerObject,
final Optional<RemoteOrigin> remoteOrigin) {
final boolean isLocalBlockProduction =
remoteOrigin.map(ro -> ro.equals(LOCAL_PROPOSAL)).orElse(false);
if (isLocalBlockProduction) {
return;
}

final Duration fetchDelay = calculateFetchDelay(slotAndBlockRoot);

asyncRunner
.runAfterDelay(() -> fetchMissingContent(slotAndBlockRoot), fetchDelay)
.finish(this::logMissingContentFetchFailure);
}

private SafeFuture<Void> fetchMissingContent(final SlotAndBlockRoot slotAndBlockRoot) {
return fetchMissingBlobsFromLocalEL(slotAndBlockRoot)
.handleException(this::logLocalElBlobsLookupFailure)
.thenRun(() -> fetchMissingContentFromRemotePeers(slotAndBlockRoot));
}

private void logLocalElBlobsLookupFailure(final Throwable error) {
LOG.warn("Local EL blobs lookup failed: {}", getRootCauseMessage(error));
}

private void logMissingContentFetchFailure(final Throwable error) {
LOG.error("An error occurred while attempting to fetch missing content.", error);
switch (trackerObject) {
case BLOB_SIDECAR -> {
final Duration blockFetchDelay = calculateBlockFetchDelay(slotAndBlockRoot);
asyncRunner.runAfterDelay(() -> fetchMissingContent(slotAndBlockRoot), blockFetchDelay);
}
// no delay for attempting to fetch blobs for when the block is first seen
case BLOCK -> fetchMissingContent(slotAndBlockRoot);
}
}

@VisibleForTesting
Duration calculateFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) {
Duration calculateBlockFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) {
final UInt64 slot = slotAndBlockRoot.getSlot();

if (slot.isLessThan(getCurrentSlot())) {
Expand Down Expand Up @@ -626,6 +632,16 @@ Duration calculateFetchDelay(final SlotAndBlockRoot slotAndBlockRoot) {
return Duration.ofMillis(finalTime.minus(nowMillis).intValue());
}

private void fetchMissingContent(final SlotAndBlockRoot slotAndBlockRoot) {
fetchMissingBlobsFromLocalEL(slotAndBlockRoot)
.handleException(
error -> LOG.warn("Local EL blobs lookup failed: {}", getRootCauseMessage(error)))
.thenRun(() -> fetchMissingContentFromRemotePeers(slotAndBlockRoot))
.finish(
error ->
LOG.error("An error occurred while attempting to fetch missing content.", error));
}

private synchronized SafeFuture<Void> fetchMissingBlobsFromLocalEL(
final SlotAndBlockRoot slotAndBlockRoot) {
final BlockBlobSidecarsTracker blockBlobSidecarsTracker =
Expand Down Expand Up @@ -711,7 +727,7 @@ private synchronized void fetchMissingContentFromRemotePeers(
}

if (blockBlobSidecarsTracker.getBlock().isEmpty()) {

// fetch missing block
blockBlobSidecarsTracker.setRpcBlockFetchTriggered();

poolStatsCounters.labels(COUNTER_BLOCK_TYPE, COUNTER_RPC_FETCH_SUBTYPE).inc();
Expand All @@ -721,6 +737,7 @@ private synchronized void fetchMissingContentFromRemotePeers(
return;
}

// fetch missing blob sidecars
blockBlobSidecarsTracker.setRpcBlobsFetchTriggered();

blockBlobSidecarsTracker
Expand All @@ -735,24 +752,21 @@ private synchronized void fetchMissingContentFromRemotePeers(

private void dropMissingContent(final BlockBlobSidecarsTracker blockBlobSidecarsTracker) {

if (!blockBlobSidecarsTracker.isRpcBlockFetchTriggered()
&& !blockBlobSidecarsTracker.isRpcBlobsFetchTriggered()) {
return;
}

if (blockBlobSidecarsTracker.getBlock().isEmpty()) {
if (blockBlobSidecarsTracker.isRpcBlockFetchTriggered()
&& blockBlobSidecarsTracker.getBlock().isEmpty()) {
requiredBlockRootDroppedSubscribers.deliver(
RequiredBlockRootDroppedSubscriber::onRequiredBlockRootDropped,
blockBlobSidecarsTracker.getSlotAndBlockRoot().getBlockRoot());
return;
}

blockBlobSidecarsTracker
.getMissingBlobSidecarsForBlock()
.forEach(
blobIdentifier ->
requiredBlobSidecarDroppedSubscribers.deliver(
RequiredBlobSidecarDroppedSubscriber::onRequiredBlobSidecarDropped,
blobIdentifier));
if (blockBlobSidecarsTracker.isRpcBlobsFetchTriggered()) {
blockBlobSidecarsTracker
.getMissingBlobSidecarsForBlock()
.forEach(
blobIdentifier ->
requiredBlobSidecarDroppedSubscribers.deliver(
RequiredBlobSidecarDroppedSubscriber::onRequiredBlobSidecarDropped,
blobIdentifier));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1092,14 +1092,15 @@ void shouldRespectTargetWhenBlockIsEarly() {
// blocks arrives at slot start
timeProvider.advanceTimeBySeconds(startSlotInSeconds.longValue());

final Duration fetchDelay = blockBlobSidecarsTrackersPool.calculateFetchDelay(slotAndBlockRoot);
final Duration fetchDelay =
blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot);

// we can wait the full target
assertThat(fetchDelay).isEqualTo(Duration.ofMillis(TARGET_WAIT_MILLIS.longValue()));
}

@Test
void calculateFetchDelay_shouldRespectMinimumWhenBlockIsLate() {
void calculateBlockFetchDelay_shouldRespectMinimumWhenBlockIsLate() {
final SlotAndBlockRoot slotAndBlockRoot =
new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32());

Expand All @@ -1111,14 +1112,15 @@ void calculateFetchDelay_shouldRespectMinimumWhenBlockIsLate() {
// blocks arrives 200ms before attestation due
timeProvider.advanceTimeByMillis(startSlotInMillis.plus(3_800).longValue());

final Duration fetchDelay = blockBlobSidecarsTrackersPool.calculateFetchDelay(slotAndBlockRoot);
final Duration fetchDelay =
blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot);

// we can wait the full target
assertThat(fetchDelay).isEqualTo(Duration.ofMillis(MIN_WAIT_MILLIS.longValue()));
}

@Test
void calculateFetchDelay_shouldRespectTargetWhenBlockIsVeryLate() {
void calculateBlockFetchDelay_shouldRespectTargetWhenBlockIsVeryLate() {
final SlotAndBlockRoot slotAndBlockRoot =
new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32());

Expand All @@ -1129,14 +1131,15 @@ void calculateFetchDelay_shouldRespectTargetWhenBlockIsVeryLate() {
// blocks arrives 1s after attestation due
timeProvider.advanceTimeBySeconds(startSlotInSeconds.plus(5).longValue());

final Duration fetchDelay = blockBlobSidecarsTrackersPool.calculateFetchDelay(slotAndBlockRoot);
final Duration fetchDelay =
blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot);

// we can wait the full target
assertThat(fetchDelay).isEqualTo(Duration.ofMillis(TARGET_WAIT_MILLIS.longValue()));
}

@Test
void calculateFetchDelay_shouldRespectAttestationDueLimit() {
void calculateBlockFetchDelay_shouldRespectAttestationDueLimit() {
final SlotAndBlockRoot slotAndBlockRoot =
new SlotAndBlockRoot(currentSlot, dataStructureUtil.randomBytes32());

Expand All @@ -1156,7 +1159,8 @@ void calculateFetchDelay_shouldRespectAttestationDueLimit() {

timeProvider.advanceTimeByMillis(blockArrivalTimeMillis.longValue());

final Duration fetchDelay = blockBlobSidecarsTrackersPool.calculateFetchDelay(slotAndBlockRoot);
final Duration fetchDelay =
blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot);

// we can only wait 200ms less than target
assertThat(fetchDelay)
Expand All @@ -1165,11 +1169,12 @@ void calculateFetchDelay_shouldRespectAttestationDueLimit() {
}

@Test
void calculateFetchDelay_shouldReturnZeroIfSlotIsOld() {
void calculateBlockFetchDelay_shouldReturnZeroIfSlotIsOld() {
final SlotAndBlockRoot slotAndBlockRoot =
new SlotAndBlockRoot(currentSlot.minus(1), dataStructureUtil.randomBytes32());

final Duration fetchDelay = blockBlobSidecarsTrackersPool.calculateFetchDelay(slotAndBlockRoot);
final Duration fetchDelay =
blockBlobSidecarsTrackersPool.calculateBlockFetchDelay(slotAndBlockRoot);

assertThat(fetchDelay).isEqualTo(Duration.ZERO);
}
Expand Down

0 comments on commit e579473

Please sign in to comment.