Skip to content

Commit

Permalink
Batch call for retrieving blobs from RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Dec 21, 2024
1 parent 7551460 commit 482f9e3
Show file tree
Hide file tree
Showing 23 changed files with 695 additions and 760 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ZERO;

import java.util.List;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.beacon.sync.events.SyncState;
import tech.pegasys.teku.beacon.sync.events.SyncingStatus;
Expand Down Expand Up @@ -106,12 +107,13 @@ public void subscribeBlobSidecarFetched(final BlobSidecarSubscriber subscriber)
}

@Override
public void requestRecentBlobSidecar(final BlobIdentifier blobIdentifier) {
public void requestRecentBlobSidecars(
final Bytes32 blockRoot, final List<BlobIdentifier> blobIdentifiers) {
// No-op
}

@Override
public void cancelRecentBlobSidecarRequest(final BlobIdentifier blobIdentifier) {
public void cancelRecentBlobSidecarsRequest(final Bytes32 blockRoot) {
// No-op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.beacon.sync.fetch;

import java.util.List;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.networking.eth2.peers.Eth2Peer;
Expand All @@ -34,8 +35,10 @@ public FetchBlockTask createFetchBlockTask(
}

@Override
public FetchBlobSidecarTask createFetchBlobSidecarTask(
final BlobIdentifier blobIdentifier, final Optional<Eth2Peer> preferredPeer) {
return new FetchBlobSidecarTask(eth2Network, preferredPeer, blobIdentifier);
public FetchBlobSidecarsTask createFetchBlobSidecarsTask(
final Bytes32 blockRoot,
final List<BlobIdentifier> blobIdentifiers,
final Optional<Eth2Peer> preferredPeer) {
return new FetchBlobSidecarsTask(eth2Network, preferredPeer, blockRoot, blobIdentifiers);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.beacon.sync.fetch;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.beacon.sync.fetch.FetchResult.Status;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.networking.eth2.peers.Eth2Peer;
import tech.pegasys.teku.networking.p2p.network.P2PNetwork;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseHandler;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;

public class FetchBlobSidecarsTask extends AbstractFetchTask<Bytes32, List<BlobSidecar>> {

private static final Logger LOG = LogManager.getLogger();

private final Bytes32 blockRoot;
private final List<BlobIdentifier> blobIdentifiers;

FetchBlobSidecarsTask(
final P2PNetwork<Eth2Peer> eth2Network,
final Bytes32 blockRoot,
final List<BlobIdentifier> blobIdentifiers) {
super(eth2Network, Optional.empty());
this.blockRoot = blockRoot;
this.blobIdentifiers = blobIdentifiers;
}

public FetchBlobSidecarsTask(
final P2PNetwork<Eth2Peer> eth2Network,
final Optional<Eth2Peer> preferredPeer,
final Bytes32 blockRoot,
final List<BlobIdentifier> blobIdentifiers) {
super(eth2Network, preferredPeer);
this.blockRoot = blockRoot;
this.blobIdentifiers = blobIdentifiers;
}

@Override
public Bytes32 getKey() {
return blockRoot;
}

@Override
SafeFuture<FetchResult<List<BlobSidecar>>> fetch(final Eth2Peer peer) {
final SafeFuture<FetchResult<List<BlobSidecar>>> fetchResult = new SafeFuture<>();
final List<BlobSidecar> blobSidecars = new ArrayList<>();
peer.requestBlobSidecarsByRoot(
blobIdentifiers,
new RpcResponseHandler<>() {
@Override
public void onCompleted(final Optional<? extends Throwable> error) {
error.ifPresentOrElse(
err -> {
logFetchError(peer, err);
fetchResult.complete(FetchResult.createFailed(peer, Status.FETCH_FAILED));
},
() -> fetchResult.complete(FetchResult.createSuccessful(peer, blobSidecars)));
}

@Override
public SafeFuture<?> onResponse(final BlobSidecar response) {
blobSidecars.add(response);
return SafeFuture.COMPLETE;
}
})
.finish(
err -> {
logFetchError(peer, err);
fetchResult.complete(FetchResult.createFailed(peer, Status.FETCH_FAILED));
});
return fetchResult;
}

private void logFetchError(final Eth2Peer peer, final Throwable err) {
LOG.error(
String.format(
"Failed to fetch %d blob sidecars for block root %s from peer %s",
blobIdentifiers.size(), blockRoot, peer.getId()),
err);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.beacon.sync.fetch;

import java.util.List;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.networking.eth2.peers.Eth2Peer;
Expand All @@ -26,10 +27,11 @@ default FetchBlockTask createFetchBlockTask(final Bytes32 blockRoot) {

FetchBlockTask createFetchBlockTask(Bytes32 blockRoot, Optional<Eth2Peer> preferredPeer);

default FetchBlobSidecarTask createFetchBlobSidecarTask(final BlobIdentifier blobIdentifier) {
return createFetchBlobSidecarTask(blobIdentifier, Optional.empty());
default FetchBlobSidecarsTask createFetchBlobSidecarsTask(
Bytes32 blockRoot, List<BlobIdentifier> blobIdentifiers) {
return createFetchBlobSidecarsTask(blockRoot, blobIdentifiers, Optional.empty());
}

FetchBlobSidecarTask createFetchBlobSidecarTask(
BlobIdentifier blobIdentifier, Optional<Eth2Peer> preferredPeer);
FetchBlobSidecarsTask createFetchBlobSidecarsTask(
Bytes32 blockRoot, List<BlobIdentifier> blobIdentifiers, Optional<Eth2Peer> preferredPeer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,5 @@ private String getTaskName(final T task) {
return task.getClass().getSimpleName();
}

public abstract T createTask(K key);

public abstract void processFetchedResult(T task, R result);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,30 @@

package tech.pegasys.teku.beacon.sync.gossip.blobs;

import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.beacon.sync.fetch.FetchBlobSidecarTask;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.beacon.sync.fetch.FetchBlobSidecarsTask;
import tech.pegasys.teku.beacon.sync.fetch.FetchTaskFactory;
import tech.pegasys.teku.beacon.sync.forward.ForwardSync;
import tech.pegasys.teku.beacon.sync.gossip.AbstractFetchService;
import tech.pegasys.teku.beacon.sync.gossip.blocks.RecentBlocksFetchService;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.subscribers.Subscribers;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;

public class RecentBlobSidecarsFetchService
extends AbstractFetchService<BlobIdentifier, FetchBlobSidecarTask, BlobSidecar>
extends AbstractFetchService<Bytes32, FetchBlobSidecarsTask, List<BlobSidecar>>
implements RecentBlobSidecarsFetcher {

private static final Logger LOG = LogManager.getLogger();

private static final int MAX_CONCURRENT_REQUESTS = 3;

private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;
private final ForwardSync forwardSync;
private final FetchTaskFactory fetchTaskFactory;
Expand All @@ -57,17 +60,13 @@ public static RecentBlobSidecarsFetchService create(
final AsyncRunner asyncRunner,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final ForwardSync forwardSync,
final FetchTaskFactory fetchTaskFactory,
final Spec spec) {
final int maxConcurrentRequests =
RecentBlocksFetchService.MAX_CONCURRENT_REQUESTS
* spec.getMaxBlobsPerBlockForHighestMilestone().orElse(1);
final FetchTaskFactory fetchTaskFactory) {
return new RecentBlobSidecarsFetchService(
asyncRunner,
blockBlobSidecarsTrackersPool,
forwardSync,
fetchTaskFactory,
maxConcurrentRequests);
MAX_CONCURRENT_REQUESTS);
}

@Override
Expand All @@ -87,34 +86,62 @@ public void subscribeBlobSidecarFetched(final BlobSidecarSubscriber subscriber)
}

@Override
public void requestRecentBlobSidecar(final BlobIdentifier blobIdentifier) {
public void requestRecentBlobSidecars(
final Bytes32 blockRoot, final List<BlobIdentifier> blobIdentifiers) {
if (forwardSync.isSyncActive()) {
// Forward sync already in progress, assume it will fetch any missing blob sidecars
return;
}
if (blockBlobSidecarsTrackersPool.containsBlobSidecar(blobIdentifier)) {
// We've already got this blob sidecar
final List<BlobIdentifier> requiredBlobIdentifiers =
blobIdentifiers.stream()
.filter(
blobIdentifier ->
!blockBlobSidecarsTrackersPool.containsBlobSidecar(blobIdentifier))
.toList();
if (requiredBlobIdentifiers.isEmpty()) {
// We already have all required blob sidecars
return;
}
final FetchBlobSidecarTask task = createTask(blobIdentifier);
if (allTasks.putIfAbsent(blobIdentifier, task) != null) {
final FetchBlobSidecarsTask task =
fetchTaskFactory.createFetchBlobSidecarsTask(blockRoot, requiredBlobIdentifiers);
if (allTasks.putIfAbsent(blockRoot, task) != null) {
// We're already tracking this task
task.cancel();
return;
}
LOG.trace("Queue blob sidecar to be fetched: {}", blobIdentifier);
LOG.trace("Queue blob sidecars to be fetched: {}", requiredBlobIdentifiers);
queueTask(task);
}

@Override
public void cancelRecentBlobSidecarRequest(final BlobIdentifier blobIdentifier) {
cancelRequest(blobIdentifier);
public void cancelRecentBlobSidecarsRequest(final Bytes32 blockRoot) {
cancelRequest(blockRoot);
}

@Override
public void processFetchedResult(
final FetchBlobSidecarsTask task, final List<BlobSidecar> result) {
result.forEach(
blobSidecar -> {
LOG.trace("Successfully fetched blob sidecar: {}", result);
blobSidecarSubscribers.forEach(s -> s.onBlobSidecar(blobSidecar));
});
// After retrieved blob sidecars have been processed, stop tracking it
removeTask(task);
}

@Override
public void onBlockValidated(final SignedBeaconBlock block) {}

@Override
public void onBlockImported(final SignedBeaconBlock block, final boolean executionOptimistic) {
cancelRecentBlobSidecarsRequest(block.getRoot());
}

private void setupSubscribers() {
blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecar(this::requestRecentBlobSidecar);
blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecarDropped(
this::cancelRecentBlobSidecarRequest);
blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecars(this::requestRecentBlobSidecars);
blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecarsDropped(
this::cancelRecentBlobSidecarsRequest);
forwardSync.subscribeToSyncChanges(this::onSyncStatusChanged);
}

Expand All @@ -126,19 +153,6 @@ private void onSyncStatusChanged(final boolean syncActive) {
// We may have ignored these requested blob sidecars while the sync was in progress
blockBlobSidecarsTrackersPool
.getAllRequiredBlobSidecars()
.forEach(this::requestRecentBlobSidecar);
}

@Override
public FetchBlobSidecarTask createTask(final BlobIdentifier key) {
return fetchTaskFactory.createFetchBlobSidecarTask(key);
}

@Override
public void processFetchedResult(final FetchBlobSidecarTask task, final BlobSidecar result) {
LOG.trace("Successfully fetched blob sidecar: {}", result);
blobSidecarSubscribers.forEach(s -> s.onBlobSidecar(result));
// After retrieved blob sidecar has been processed, stop tracking it
removeTask(task);
.forEach(this::requestRecentBlobSidecars);
}
}
Loading

0 comments on commit 482f9e3

Please sign in to comment.