From 482f9e38755ddf7d52f62c05c2b0a2234ec860b2 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Sat, 21 Dec 2024 09:52:21 +0000 Subject: [PATCH] Batch call for retrieving blobs from RPC --- .../teku/beacon/sync/NoopSyncService.java | 6 +- .../sync/fetch/DefaultFetchTaskFactory.java | 9 +- .../sync/fetch/FetchBlobSidecarTask.java | 69 ----- .../sync/fetch/FetchBlobSidecarsTask.java | 99 +++++++ .../beacon/sync/fetch/FetchTaskFactory.java | 10 +- .../sync/gossip/AbstractFetchService.java | 2 - .../blobs/RecentBlobSidecarsFetchService.java | 84 +++--- .../blobs/RecentBlobSidecarsFetcher.java | 28 +- .../blocks/RecentBlocksFetchService.java | 9 +- .../fetch/DefaultFetchTaskFactoryTest.java | 8 +- .../sync/fetch/FetchBlobSidecarTaskTest.java | 230 --------------- .../sync/fetch/FetchBlobSidecarsTaskTest.java | 265 ++++++++++++++++++ .../RecentBlobSidecarsFetchServiceTest.java | 159 +++++++---- .../forkchoice/StubBlobSidecarManager.java | 6 - .../blobs/BlobSidecarManager.java | 10 - .../blobs/BlobSidecarManagerImpl.java | 11 - .../blobs/BlockBlobSidecarsTracker.java | 22 +- .../blobs/BlockBlobSidecarsTrackersPool.java | 48 ++-- .../BlockBlobSidecarsTrackersPoolImpl.java | 91 +++--- .../blobs/BlobSidecarManagerTest.java | 12 - .../blobs/BlockBlobSidecarsTrackerTest.java | 57 +--- ...BlockBlobSidecarsTrackersPoolImplTest.java | 211 +++++--------- .../beaconchain/BeaconChainController.java | 9 +- 23 files changed, 695 insertions(+), 760 deletions(-) delete mode 100644 beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarTask.java create mode 100644 beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarsTask.java delete mode 100644 beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarTaskTest.java create mode 100644 beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarsTaskTest.java diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/NoopSyncService.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/NoopSyncService.java index 4e342290802..184dae2d09c 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/NoopSyncService.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/NoopSyncService.java @@ -15,6 +15,7 @@ import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ZERO; +import java.util.List; import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.beacon.sync.events.SyncState; import tech.pegasys.teku.beacon.sync.events.SyncingStatus; @@ -106,12 +107,13 @@ public void subscribeBlobSidecarFetched(final BlobSidecarSubscriber subscriber) } @Override - public void requestRecentBlobSidecar(final BlobIdentifier blobIdentifier) { + public void requestRecentBlobSidecars( + final Bytes32 blockRoot, final List blobIdentifiers) { // No-op } @Override - public void cancelRecentBlobSidecarRequest(final BlobIdentifier blobIdentifier) { + public void cancelRecentBlobSidecarsRequest(final Bytes32 blockRoot) { // No-op } diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/DefaultFetchTaskFactory.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/DefaultFetchTaskFactory.java index 4f35274c530..5cf5921b257 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/DefaultFetchTaskFactory.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/DefaultFetchTaskFactory.java @@ -13,6 +13,7 @@ package tech.pegasys.teku.beacon.sync.fetch; +import java.util.List; import java.util.Optional; import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.networking.eth2.peers.Eth2Peer; @@ -34,8 +35,10 @@ public FetchBlockTask createFetchBlockTask( } @Override - public FetchBlobSidecarTask createFetchBlobSidecarTask( - final BlobIdentifier blobIdentifier, final Optional preferredPeer) { - return new FetchBlobSidecarTask(eth2Network, preferredPeer, blobIdentifier); + public FetchBlobSidecarsTask createFetchBlobSidecarsTask( + final Bytes32 blockRoot, + final List blobIdentifiers, + final Optional preferredPeer) { + return new FetchBlobSidecarsTask(eth2Network, preferredPeer, blockRoot, blobIdentifiers); } } diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarTask.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarTask.java deleted file mode 100644 index 40d34ae544c..00000000000 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarTask.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright Consensys Software Inc., 2023 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.beacon.sync.fetch; - -import java.util.Optional; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import tech.pegasys.teku.beacon.sync.fetch.FetchResult.Status; -import tech.pegasys.teku.infrastructure.async.SafeFuture; -import tech.pegasys.teku.networking.eth2.peers.Eth2Peer; -import tech.pegasys.teku.networking.p2p.network.P2PNetwork; -import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; - -public class FetchBlobSidecarTask extends AbstractFetchTask { - - private static final Logger LOG = LogManager.getLogger(); - - private final BlobIdentifier blobIdentifier; - - FetchBlobSidecarTask( - final P2PNetwork eth2Network, final BlobIdentifier blobIdentifier) { - super(eth2Network, Optional.empty()); - this.blobIdentifier = blobIdentifier; - } - - public FetchBlobSidecarTask( - final P2PNetwork eth2Network, - final Optional preferredPeer, - final BlobIdentifier blobIdentifier) { - super(eth2Network, preferredPeer); - this.blobIdentifier = blobIdentifier; - } - - @Override - public BlobIdentifier getKey() { - return blobIdentifier; - } - - @Override - SafeFuture> fetch(final Eth2Peer peer) { - return peer.requestBlobSidecarByRoot(blobIdentifier) - .thenApply( - maybeBlobSidecar -> - maybeBlobSidecar - .map(blobSidecar -> FetchResult.createSuccessful(peer, blobSidecar)) - .orElseGet(() -> FetchResult.createFailed(peer, Status.FETCH_FAILED))) - .exceptionally( - err -> { - LOG.debug( - String.format( - "Failed to fetch blob sidecar by identifier %s from peer %s", - blobIdentifier, peer.getId()), - err); - return FetchResult.createFailed(peer, Status.FETCH_FAILED); - }); - } -} diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarsTask.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarsTask.java new file mode 100644 index 00000000000..477a109a784 --- /dev/null +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarsTask.java @@ -0,0 +1,99 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.beacon.sync.fetch; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.beacon.sync.fetch.FetchResult.Status; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.networking.eth2.peers.Eth2Peer; +import tech.pegasys.teku.networking.p2p.network.P2PNetwork; +import tech.pegasys.teku.networking.p2p.rpc.RpcResponseHandler; +import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; + +public class FetchBlobSidecarsTask extends AbstractFetchTask> { + + private static final Logger LOG = LogManager.getLogger(); + + private final Bytes32 blockRoot; + private final List blobIdentifiers; + + FetchBlobSidecarsTask( + final P2PNetwork eth2Network, + final Bytes32 blockRoot, + final List blobIdentifiers) { + super(eth2Network, Optional.empty()); + this.blockRoot = blockRoot; + this.blobIdentifiers = blobIdentifiers; + } + + public FetchBlobSidecarsTask( + final P2PNetwork eth2Network, + final Optional preferredPeer, + final Bytes32 blockRoot, + final List blobIdentifiers) { + super(eth2Network, preferredPeer); + this.blockRoot = blockRoot; + this.blobIdentifiers = blobIdentifiers; + } + + @Override + public Bytes32 getKey() { + return blockRoot; + } + + @Override + SafeFuture>> fetch(final Eth2Peer peer) { + final SafeFuture>> fetchResult = new SafeFuture<>(); + final List blobSidecars = new ArrayList<>(); + peer.requestBlobSidecarsByRoot( + blobIdentifiers, + new RpcResponseHandler<>() { + @Override + public void onCompleted(final Optional error) { + error.ifPresentOrElse( + err -> { + logFetchError(peer, err); + fetchResult.complete(FetchResult.createFailed(peer, Status.FETCH_FAILED)); + }, + () -> fetchResult.complete(FetchResult.createSuccessful(peer, blobSidecars))); + } + + @Override + public SafeFuture onResponse(final BlobSidecar response) { + blobSidecars.add(response); + return SafeFuture.COMPLETE; + } + }) + .finish( + err -> { + logFetchError(peer, err); + fetchResult.complete(FetchResult.createFailed(peer, Status.FETCH_FAILED)); + }); + return fetchResult; + } + + private void logFetchError(final Eth2Peer peer, final Throwable err) { + LOG.error( + String.format( + "Failed to fetch %d blob sidecars for block root %s from peer %s", + blobIdentifiers.size(), blockRoot, peer.getId()), + err); + } +} diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/FetchTaskFactory.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/FetchTaskFactory.java index f9fd8832b6d..1d5fef932ee 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/FetchTaskFactory.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/FetchTaskFactory.java @@ -13,6 +13,7 @@ package tech.pegasys.teku.beacon.sync.fetch; +import java.util.List; import java.util.Optional; import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.networking.eth2.peers.Eth2Peer; @@ -26,10 +27,11 @@ default FetchBlockTask createFetchBlockTask(final Bytes32 blockRoot) { FetchBlockTask createFetchBlockTask(Bytes32 blockRoot, Optional preferredPeer); - default FetchBlobSidecarTask createFetchBlobSidecarTask(final BlobIdentifier blobIdentifier) { - return createFetchBlobSidecarTask(blobIdentifier, Optional.empty()); + default FetchBlobSidecarsTask createFetchBlobSidecarsTask( + Bytes32 blockRoot, List blobIdentifiers) { + return createFetchBlobSidecarsTask(blockRoot, blobIdentifiers, Optional.empty()); } - FetchBlobSidecarTask createFetchBlobSidecarTask( - BlobIdentifier blobIdentifier, Optional preferredPeer); + FetchBlobSidecarsTask createFetchBlobSidecarsTask( + Bytes32 blockRoot, List blobIdentifiers, Optional preferredPeer); } diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/AbstractFetchService.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/AbstractFetchService.java index fa407ea6e34..3272337ca39 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/AbstractFetchService.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/AbstractFetchService.java @@ -161,7 +161,5 @@ private String getTaskName(final T task) { return task.getClass().getSimpleName(); } - public abstract T createTask(K key); - public abstract void processFetchedResult(T task, R result); } diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetchService.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetchService.java index f523bcf94f1..5ff3d7f870b 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetchService.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetchService.java @@ -13,27 +13,30 @@ package tech.pegasys.teku.beacon.sync.gossip.blobs; +import java.util.List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import tech.pegasys.teku.beacon.sync.fetch.FetchBlobSidecarTask; +import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.beacon.sync.fetch.FetchBlobSidecarsTask; import tech.pegasys.teku.beacon.sync.fetch.FetchTaskFactory; import tech.pegasys.teku.beacon.sync.forward.ForwardSync; import tech.pegasys.teku.beacon.sync.gossip.AbstractFetchService; -import tech.pegasys.teku.beacon.sync.gossip.blocks.RecentBlocksFetchService; import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.subscribers.Subscribers; -import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool; public class RecentBlobSidecarsFetchService - extends AbstractFetchService + extends AbstractFetchService> implements RecentBlobSidecarsFetcher { private static final Logger LOG = LogManager.getLogger(); + private static final int MAX_CONCURRENT_REQUESTS = 3; + private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool; private final ForwardSync forwardSync; private final FetchTaskFactory fetchTaskFactory; @@ -57,17 +60,13 @@ public static RecentBlobSidecarsFetchService create( final AsyncRunner asyncRunner, final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool, final ForwardSync forwardSync, - final FetchTaskFactory fetchTaskFactory, - final Spec spec) { - final int maxConcurrentRequests = - RecentBlocksFetchService.MAX_CONCURRENT_REQUESTS - * spec.getMaxBlobsPerBlockForHighestMilestone().orElse(1); + final FetchTaskFactory fetchTaskFactory) { return new RecentBlobSidecarsFetchService( asyncRunner, blockBlobSidecarsTrackersPool, forwardSync, fetchTaskFactory, - maxConcurrentRequests); + MAX_CONCURRENT_REQUESTS); } @Override @@ -87,34 +86,62 @@ public void subscribeBlobSidecarFetched(final BlobSidecarSubscriber subscriber) } @Override - public void requestRecentBlobSidecar(final BlobIdentifier blobIdentifier) { + public void requestRecentBlobSidecars( + final Bytes32 blockRoot, final List blobIdentifiers) { if (forwardSync.isSyncActive()) { // Forward sync already in progress, assume it will fetch any missing blob sidecars return; } - if (blockBlobSidecarsTrackersPool.containsBlobSidecar(blobIdentifier)) { - // We've already got this blob sidecar + final List requiredBlobIdentifiers = + blobIdentifiers.stream() + .filter( + blobIdentifier -> + !blockBlobSidecarsTrackersPool.containsBlobSidecar(blobIdentifier)) + .toList(); + if (requiredBlobIdentifiers.isEmpty()) { + // We already have all required blob sidecars return; } - final FetchBlobSidecarTask task = createTask(blobIdentifier); - if (allTasks.putIfAbsent(blobIdentifier, task) != null) { + final FetchBlobSidecarsTask task = + fetchTaskFactory.createFetchBlobSidecarsTask(blockRoot, requiredBlobIdentifiers); + if (allTasks.putIfAbsent(blockRoot, task) != null) { // We're already tracking this task task.cancel(); return; } - LOG.trace("Queue blob sidecar to be fetched: {}", blobIdentifier); + LOG.trace("Queue blob sidecars to be fetched: {}", requiredBlobIdentifiers); queueTask(task); } @Override - public void cancelRecentBlobSidecarRequest(final BlobIdentifier blobIdentifier) { - cancelRequest(blobIdentifier); + public void cancelRecentBlobSidecarsRequest(final Bytes32 blockRoot) { + cancelRequest(blockRoot); + } + + @Override + public void processFetchedResult( + final FetchBlobSidecarsTask task, final List result) { + result.forEach( + blobSidecar -> { + LOG.trace("Successfully fetched blob sidecar: {}", result); + blobSidecarSubscribers.forEach(s -> s.onBlobSidecar(blobSidecar)); + }); + // After retrieved blob sidecars have been processed, stop tracking it + removeTask(task); + } + + @Override + public void onBlockValidated(final SignedBeaconBlock block) {} + + @Override + public void onBlockImported(final SignedBeaconBlock block, final boolean executionOptimistic) { + cancelRecentBlobSidecarsRequest(block.getRoot()); } private void setupSubscribers() { - blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecar(this::requestRecentBlobSidecar); - blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecarDropped( - this::cancelRecentBlobSidecarRequest); + blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecars(this::requestRecentBlobSidecars); + blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecarsDropped( + this::cancelRecentBlobSidecarsRequest); forwardSync.subscribeToSyncChanges(this::onSyncStatusChanged); } @@ -126,19 +153,6 @@ private void onSyncStatusChanged(final boolean syncActive) { // We may have ignored these requested blob sidecars while the sync was in progress blockBlobSidecarsTrackersPool .getAllRequiredBlobSidecars() - .forEach(this::requestRecentBlobSidecar); - } - - @Override - public FetchBlobSidecarTask createTask(final BlobIdentifier key) { - return fetchTaskFactory.createFetchBlobSidecarTask(key); - } - - @Override - public void processFetchedResult(final FetchBlobSidecarTask task, final BlobSidecar result) { - LOG.trace("Successfully fetched blob sidecar: {}", result); - blobSidecarSubscribers.forEach(s -> s.onBlobSidecar(result)); - // After retrieved blob sidecar has been processed, stop tracking it - removeTask(task); + .forEach(this::requestRecentBlobSidecars); } } diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetcher.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetcher.java index be1742cc1f3..549c8e30ed8 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetcher.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetcher.java @@ -13,6 +13,8 @@ package tech.pegasys.teku.beacon.sync.gossip.blobs; +import java.util.List; +import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.beacon.sync.fetch.FetchTaskFactory; import tech.pegasys.teku.beacon.sync.forward.ForwardSyncService; import tech.pegasys.teku.infrastructure.async.AsyncRunner; @@ -20,10 +22,12 @@ import tech.pegasys.teku.service.serviceutils.ServiceFacade; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.SpecMilestone; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool; +import tech.pegasys.teku.statetransition.block.ReceivedBlockEventsChannel; -public interface RecentBlobSidecarsFetcher extends ServiceFacade { +public interface RecentBlobSidecarsFetcher extends ServiceFacade, ReceivedBlockEventsChannel { RecentBlobSidecarsFetcher NOOP = new RecentBlobSidecarsFetcher() { @@ -31,10 +35,11 @@ public interface RecentBlobSidecarsFetcher extends ServiceFacade { public void subscribeBlobSidecarFetched(BlobSidecarSubscriber subscriber) {} @Override - public void requestRecentBlobSidecar(BlobIdentifier blobIdentifier) {} + public void requestRecentBlobSidecars( + final Bytes32 blockRoot, final List blobIdentifiers) {} @Override - public void cancelRecentBlobSidecarRequest(BlobIdentifier blobIdentifier) {} + public void cancelRecentBlobSidecarsRequest(final Bytes32 blockRoot) {} @Override public SafeFuture start() { @@ -50,6 +55,13 @@ public SafeFuture stop() { public boolean isRunning() { return false; } + + @Override + public void onBlockValidated(final SignedBeaconBlock block) {} + + @Override + public void onBlockImported( + final SignedBeaconBlock block, final boolean executionOptimistic) {} }; static RecentBlobSidecarsFetcher create( @@ -62,11 +74,7 @@ static RecentBlobSidecarsFetcher create( if (spec.isMilestoneSupported(SpecMilestone.DENEB)) { recentBlobSidecarsFetcher = RecentBlobSidecarsFetchService.create( - asyncRunner, - blockBlobSidecarsTrackersPool, - forwardSyncService, - fetchTaskFactory, - spec); + asyncRunner, blockBlobSidecarsTrackersPool, forwardSyncService, fetchTaskFactory); } else { recentBlobSidecarsFetcher = RecentBlobSidecarsFetcher.NOOP; } @@ -76,7 +84,7 @@ static RecentBlobSidecarsFetcher create( void subscribeBlobSidecarFetched(BlobSidecarSubscriber subscriber); - void requestRecentBlobSidecar(BlobIdentifier blobIdentifier); + void requestRecentBlobSidecars(Bytes32 blockRoot, List blobIdentifiers); - void cancelRecentBlobSidecarRequest(BlobIdentifier blobIdentifier); + void cancelRecentBlobSidecarsRequest(Bytes32 blockRoot); } diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchService.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchService.java index bc3359dfa86..42920f23053 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchService.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchService.java @@ -34,7 +34,7 @@ public class RecentBlocksFetchService private static final Logger LOG = LogManager.getLogger(); - public static final int MAX_CONCURRENT_REQUESTS = 3; + private static final int MAX_CONCURRENT_REQUESTS = 3; private final ForwardSync forwardSync; private final PendingPool pendingBlockPool; @@ -106,7 +106,7 @@ public void requestRecentBlock(final Bytes32 blockRoot) { // We already have this block, waiting for blobs return; } - final FetchBlockTask task = createTask(blockRoot); + final FetchBlockTask task = fetchTaskFactory.createFetchBlockTask(blockRoot); if (allTasks.putIfAbsent(blockRoot, task) != null) { // We're already tracking this task task.cancel(); @@ -121,11 +121,6 @@ public void cancelRecentBlockRequest(final Bytes32 blockRoot) { cancelRequest(blockRoot); } - @Override - public FetchBlockTask createTask(final Bytes32 key) { - return fetchTaskFactory.createFetchBlockTask(key); - } - @Override public void processFetchedResult(final FetchBlockTask task, final SignedBeaconBlock block) { LOG.trace("Successfully fetched block: {}", block); diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/fetch/DefaultFetchTaskFactoryTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/fetch/DefaultFetchTaskFactoryTest.java index dbbbe09e26b..3299db4e0a6 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/fetch/DefaultFetchTaskFactoryTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/fetch/DefaultFetchTaskFactoryTest.java @@ -16,6 +16,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; +import java.util.List; import org.apache.tuweni.bytes.Bytes32; import org.junit.jupiter.api.Test; import tech.pegasys.teku.infrastructure.unsigned.UInt64; @@ -37,9 +38,10 @@ public void createsFetchBlockTask() { } @Test - public void createsFetchBlobSidecarTask() { - final FetchBlobSidecarTask task = - fetchTaskFactory.createFetchBlobSidecarTask(new BlobIdentifier(Bytes32.ZERO, UInt64.ZERO)); + public void createsFetchBlobSidecarsTask() { + final FetchBlobSidecarsTask task = + fetchTaskFactory.createFetchBlobSidecarsTask( + Bytes32.ZERO, List.of(new BlobIdentifier(Bytes32.ZERO, UInt64.ZERO))); assertThat(task).isNotNull(); } } diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarTaskTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarTaskTest.java deleted file mode 100644 index 57d07ee2dc7..00000000000 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarTaskTest.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Copyright Consensys Software Inc., 2023 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.beacon.sync.fetch; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.when; - -import java.util.Optional; -import org.junit.jupiter.api.Test; -import tech.pegasys.teku.beacon.sync.fetch.FetchResult.Status; -import tech.pegasys.teku.infrastructure.async.SafeFuture; -import tech.pegasys.teku.networking.eth2.peers.Eth2Peer; -import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; - -public class FetchBlobSidecarTaskTest extends AbstractFetchTaskTest { - - @Test - public void run_successful() { - final BlobSidecar blobSidecar = dataStructureUtil.randomBlobSidecar(); - final BlobIdentifier blobIdentifier = - new BlobIdentifier(blobSidecar.getBlockRoot(), blobSidecar.getIndex()); - final FetchBlobSidecarTask task = new FetchBlobSidecarTask(eth2P2PNetwork, blobIdentifier); - assertThat(task.getKey()).isEqualTo(blobIdentifier); - - final Eth2Peer peer = registerNewPeer(1); - when(peer.requestBlobSidecarByRoot(blobIdentifier)) - .thenReturn(SafeFuture.completedFuture(Optional.of(blobSidecar))); - - final SafeFuture> result = task.run(); - assertThat(result).isDone(); - final FetchResult fetchResult = result.getNow(null); - assertThat(fetchResult.getPeer()).hasValue(peer); - assertThat(fetchResult.isSuccessful()).isTrue(); - assertThat(fetchResult.getResult()).hasValue(blobSidecar); - } - - @Test - public void run_noPeers() { - final BlobSidecar blobSidecar = dataStructureUtil.randomBlobSidecar(); - final BlobIdentifier blobIdentifier = - new BlobIdentifier(blobSidecar.getBlockRoot(), blobSidecar.getIndex()); - final FetchBlobSidecarTask task = new FetchBlobSidecarTask(eth2P2PNetwork, blobIdentifier); - - final SafeFuture> result = task.run(); - assertThat(result).isDone(); - final FetchResult fetchResult = result.getNow(null); - assertThat(fetchResult.getPeer()).isEmpty(); - assertThat(fetchResult.isSuccessful()).isFalse(); - assertThat(fetchResult.getStatus()).isEqualTo(Status.NO_AVAILABLE_PEERS); - } - - @Test - public void run_failAndRetryWithNoNewPeers() { - final BlobSidecar blobSidecar = dataStructureUtil.randomBlobSidecar(); - final BlobIdentifier blobIdentifier = - new BlobIdentifier(blobSidecar.getBlockRoot(), blobSidecar.getIndex()); - final FetchBlobSidecarTask task = new FetchBlobSidecarTask(eth2P2PNetwork, blobIdentifier); - - final Eth2Peer peer = registerNewPeer(1); - when(peer.requestBlobSidecarByRoot(blobIdentifier)) - .thenReturn(SafeFuture.failedFuture(new RuntimeException("whoops"))); - - final SafeFuture> result = task.run(); - assertThat(result).isDone(); - final FetchResult fetchResult = result.getNow(null); - assertThat(fetchResult.getPeer()).hasValue(peer); - assertThat(fetchResult.isSuccessful()).isFalse(); - assertThat(fetchResult.getStatus()).isEqualTo(Status.FETCH_FAILED); - assertThat(task.getNumberOfRetries()).isEqualTo(0); - - // Retry - final SafeFuture> result2 = task.run(); - assertThat(result).isDone(); - final FetchResult fetchResult2 = result2.getNow(null); - assertThat(fetchResult2.getPeer()).isEmpty(); - assertThat(fetchResult2.isSuccessful()).isFalse(); - assertThat(fetchResult2.getStatus()).isEqualTo(Status.NO_AVAILABLE_PEERS); - assertThat(task.getNumberOfRetries()).isEqualTo(0); - } - - @Test - public void run_failAndRetryWithNewPeer() { - final BlobSidecar blobSidecar = dataStructureUtil.randomBlobSidecar(); - final BlobIdentifier blobIdentifier = - new BlobIdentifier(blobSidecar.getBlockRoot(), blobSidecar.getIndex()); - final FetchBlobSidecarTask task = new FetchBlobSidecarTask(eth2P2PNetwork, blobIdentifier); - - final Eth2Peer peer = registerNewPeer(1); - when(peer.requestBlobSidecarByRoot(blobIdentifier)) - .thenReturn(SafeFuture.failedFuture(new RuntimeException("whoops"))); - - final SafeFuture> result = task.run(); - assertThat(result).isDone(); - final FetchResult fetchResult = result.getNow(null); - assertThat(fetchResult.getPeer()).hasValue(peer); - assertThat(fetchResult.isSuccessful()).isFalse(); - assertThat(fetchResult.getStatus()).isEqualTo(Status.FETCH_FAILED); - assertThat(task.getNumberOfRetries()).isEqualTo(0); - - // Add another peer - final Eth2Peer peer2 = registerNewPeer(2); - when(peer2.requestBlobSidecarByRoot(blobIdentifier)) - .thenReturn(SafeFuture.completedFuture(Optional.of(blobSidecar))); - - // Retry - final SafeFuture> result2 = task.run(); - assertThat(result).isDone(); - final FetchResult fetchResult2 = result2.getNow(null); - assertThat(fetchResult2.getPeer()).hasValue(peer2); - assertThat(fetchResult2.isSuccessful()).isTrue(); - assertThat(fetchResult2.getResult()).hasValue(blobSidecar); - assertThat(task.getNumberOfRetries()).isEqualTo(1); - } - - @Test - public void run_withMultiplesPeersAvailable() { - final BlobSidecar blobSidecar = dataStructureUtil.randomBlobSidecar(); - final BlobIdentifier blobIdentifier = - new BlobIdentifier(blobSidecar.getBlockRoot(), blobSidecar.getIndex()); - final FetchBlobSidecarTask task = new FetchBlobSidecarTask(eth2P2PNetwork, blobIdentifier); - - final Eth2Peer peer = registerNewPeer(1); - when(peer.requestBlobSidecarByRoot(blobIdentifier)) - .thenReturn(SafeFuture.failedFuture(new RuntimeException("whoops"))); - when(peer.getOutstandingRequests()).thenReturn(1); - // Add another peer - final Eth2Peer peer2 = registerNewPeer(2); - when(peer2.requestBlobSidecarByRoot(blobIdentifier)) - .thenReturn(SafeFuture.completedFuture(Optional.of(blobSidecar))); - when(peer2.getOutstandingRequests()).thenReturn(0); - - // We should choose the peer that is less busy, which successfully returns the blob sidecar - final SafeFuture> result = task.run(); - assertThat(result).isDone(); - final FetchResult fetchResult = result.getNow(null); - assertThat(fetchResult.getPeer()).hasValue(peer2); - assertThat(fetchResult.isSuccessful()).isTrue(); - assertThat(fetchResult.getResult()).hasValue(blobSidecar); - } - - @Test - public void run_withPreferredPeer() { - final Eth2Peer preferredPeer = createNewPeer(1); - final BlobSidecar blobSidecar = dataStructureUtil.randomBlobSidecar(); - final BlobIdentifier blobIdentifier = - new BlobIdentifier(blobSidecar.getBlockRoot(), blobSidecar.getIndex()); - when(preferredPeer.requestBlobSidecarByRoot(blobIdentifier)) - .thenReturn(SafeFuture.completedFuture(Optional.of(blobSidecar))); - - final FetchBlobSidecarTask task = - new FetchBlobSidecarTask(eth2P2PNetwork, Optional.of(preferredPeer), blobIdentifier); - - // Add a peer - registerNewPeer(2); - - final SafeFuture> result = task.run(); - assertThat(result).isDone(); - final FetchResult fetchResult = result.getNow(null); - assertThat(fetchResult.getPeer()).hasValue(preferredPeer); - assertThat(fetchResult.isSuccessful()).isTrue(); - assertThat(fetchResult.getResult()).hasValue(blobSidecar); - } - - @Test - public void run_withRandomPeerWhenFetchingWithPreferredPeerFails() { - final Eth2Peer preferredPeer = createNewPeer(1); - final BlobSidecar blobSidecar = dataStructureUtil.randomBlobSidecar(); - final BlobIdentifier blobIdentifier = - new BlobIdentifier(blobSidecar.getBlockRoot(), blobSidecar.getIndex()); - when(preferredPeer.requestBlobSidecarByRoot(blobIdentifier)) - .thenReturn(SafeFuture.failedFuture(new RuntimeException("whoops"))); - - final FetchBlobSidecarTask task = - new FetchBlobSidecarTask(eth2P2PNetwork, Optional.of(preferredPeer), blobIdentifier); - - final SafeFuture> result = task.run(); - assertThat(result).isDone(); - final FetchResult fetchResult = result.getNow(null); - assertThat(fetchResult.getPeer()).hasValue(preferredPeer); - assertThat(fetchResult.isSuccessful()).isFalse(); - assertThat(fetchResult.getStatus()).isEqualTo(Status.FETCH_FAILED); - assertThat(task.getNumberOfRetries()).isEqualTo(0); - - // Add a peer - final Eth2Peer peer = registerNewPeer(2); - when(peer.requestBlobSidecarByRoot(blobIdentifier)) - .thenReturn(SafeFuture.completedFuture(Optional.of(blobSidecar))); - - // Retry - final SafeFuture> result2 = task.run(); - assertThat(result).isDone(); - final FetchResult fetchResult2 = result2.getNow(null); - assertThat(fetchResult2.getPeer()).hasValue(peer); - assertThat(fetchResult2.isSuccessful()).isTrue(); - assertThat(fetchResult2.getResult()).hasValue(blobSidecar); - assertThat(task.getNumberOfRetries()).isEqualTo(1); - } - - @Test - public void cancel() { - final BlobSidecar blobSidecar = dataStructureUtil.randomBlobSidecar(); - final BlobIdentifier blobIdentifier = - new BlobIdentifier(blobSidecar.getBlockRoot(), blobSidecar.getIndex()); - final FetchBlobSidecarTask task = new FetchBlobSidecarTask(eth2P2PNetwork, blobIdentifier); - - final Eth2Peer peer = registerNewPeer(1); - when(peer.requestBlobSidecarByRoot(blobIdentifier)) - .thenReturn(SafeFuture.completedFuture(Optional.of(blobSidecar))); - - task.cancel(); - final SafeFuture> result = task.run(); - assertThat(result).isDone(); - final FetchResult fetchResult = result.getNow(null); - assertThat(fetchResult.getPeer()).isEmpty(); - assertThat(fetchResult.isSuccessful()).isFalse(); - assertThat(fetchResult.getStatus()).isEqualTo(Status.CANCELLED); - } -} diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarsTaskTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarsTaskTest.java new file mode 100644 index 00000000000..b705e980da7 --- /dev/null +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarsTaskTest.java @@ -0,0 +1,265 @@ +/* + * Copyright Consensys Software Inc., 2023 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.beacon.sync.fetch; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Optional; +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.beacon.sync.fetch.FetchResult.Status; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.networking.eth2.peers.Eth2Peer; +import tech.pegasys.teku.networking.p2p.rpc.RpcResponseHandler; +import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; + +public class FetchBlobSidecarsTaskTest extends AbstractFetchTaskTest { + + @Test + public void run_successful() { + final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlockWithCommitments(3); + final List blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block); + final List blobIdentifiers = getBlobIdentifiers(blobSidecars); + + final FetchBlobSidecarsTask task = + new FetchBlobSidecarsTask(eth2P2PNetwork, block.getRoot(), blobIdentifiers); + assertThat(task.getKey()).isEqualTo(block.getRoot()); + + final Eth2Peer peer = registerNewPeer(1); + mockRpcResponse(peer, blobIdentifiers, blobSidecars); + + final SafeFuture>> result = task.run(); + assertThat(result).isDone(); + final FetchResult> fetchResult = result.getNow(null); + assertThat(fetchResult.getPeer()).hasValue(peer); + assertThat(fetchResult.isSuccessful()).isTrue(); + assertThat(fetchResult.getResult()).hasValue(blobSidecars); + } + + @Test + public void run_noPeers() { + final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlockWithCommitments(3); + final List blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block); + final List blobIdentifiers = getBlobIdentifiers(blobSidecars); + + final FetchBlobSidecarsTask task = + new FetchBlobSidecarsTask(eth2P2PNetwork, block.getRoot(), blobIdentifiers); + + final SafeFuture>> result = task.run(); + assertThat(result).isDone(); + final FetchResult> fetchResult = result.getNow(null); + assertThat(fetchResult.getPeer()).isEmpty(); + assertThat(fetchResult.isSuccessful()).isFalse(); + assertThat(fetchResult.getStatus()).isEqualTo(Status.NO_AVAILABLE_PEERS); + } + + @Test + public void run_failAndRetryWithNoNewPeers() { + final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlockWithCommitments(3); + final List blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block); + final List blobIdentifiers = getBlobIdentifiers(blobSidecars); + + final FetchBlobSidecarsTask task = + new FetchBlobSidecarsTask(eth2P2PNetwork, block.getRoot(), blobIdentifiers); + + final Eth2Peer peer = registerNewPeer(1); + when(peer.requestBlobSidecarsByRoot(eq(blobIdentifiers), any())) + .thenReturn(SafeFuture.failedFuture(new RuntimeException("whoops"))); + + final SafeFuture>> result = task.run(); + assertThat(result).isDone(); + final FetchResult> fetchResult = result.getNow(null); + assertThat(fetchResult.getPeer()).hasValue(peer); + assertThat(fetchResult.isSuccessful()).isFalse(); + assertThat(fetchResult.getStatus()).isEqualTo(Status.FETCH_FAILED); + assertThat(task.getNumberOfRetries()).isEqualTo(0); + + // Retry + final SafeFuture>> result2 = task.run(); + assertThat(result).isDone(); + final FetchResult> fetchResult2 = result2.getNow(null); + assertThat(fetchResult2.getPeer()).isEmpty(); + assertThat(fetchResult2.isSuccessful()).isFalse(); + assertThat(fetchResult2.getStatus()).isEqualTo(Status.NO_AVAILABLE_PEERS); + assertThat(task.getNumberOfRetries()).isEqualTo(0); + } + + @Test + public void run_failAndRetryWithNewPeer() { + final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlockWithCommitments(3); + final List blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block); + final List blobIdentifiers = getBlobIdentifiers(blobSidecars); + + final FetchBlobSidecarsTask task = + new FetchBlobSidecarsTask(eth2P2PNetwork, block.getRoot(), blobIdentifiers); + + final Eth2Peer peer = registerNewPeer(1); + when(peer.requestBlobSidecarsByRoot(eq(blobIdentifiers), any())) + .thenReturn(SafeFuture.failedFuture(new RuntimeException("whoops"))); + + final SafeFuture>> result = task.run(); + assertThat(result).isDone(); + final FetchResult> fetchResult = result.getNow(null); + assertThat(fetchResult.getPeer()).hasValue(peer); + assertThat(fetchResult.isSuccessful()).isFalse(); + assertThat(fetchResult.getStatus()).isEqualTo(Status.FETCH_FAILED); + assertThat(task.getNumberOfRetries()).isEqualTo(0); + + // Add another peer + final Eth2Peer peer2 = registerNewPeer(2); + mockRpcResponse(peer2, blobIdentifiers, blobSidecars); + + // Retry + final SafeFuture>> result2 = task.run(); + assertThat(result).isDone(); + final FetchResult> fetchResult2 = result2.getNow(null); + assertThat(fetchResult2.getPeer()).hasValue(peer2); + assertThat(fetchResult2.isSuccessful()).isTrue(); + assertThat(fetchResult2.getResult()).hasValue(blobSidecars); + assertThat(task.getNumberOfRetries()).isEqualTo(1); + } + + @Test + public void run_withMultiplesPeersAvailable() { + final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlockWithCommitments(3); + final List blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block); + final List blobIdentifiers = getBlobIdentifiers(blobSidecars); + + final FetchBlobSidecarsTask task = + new FetchBlobSidecarsTask(eth2P2PNetwork, block.getRoot(), blobIdentifiers); + + final Eth2Peer peer = registerNewPeer(1); + when(peer.requestBlobSidecarsByRoot(eq(blobIdentifiers), any())) + .thenReturn(SafeFuture.failedFuture(new RuntimeException("whoops"))); + when(peer.getOutstandingRequests()).thenReturn(1); + // Add another peer + final Eth2Peer peer2 = registerNewPeer(2); + mockRpcResponse(peer2, blobIdentifiers, blobSidecars); + when(peer2.getOutstandingRequests()).thenReturn(0); + + // We should choose the peer that is less busy, which successfully returns the blob sidecar + final SafeFuture>> result = task.run(); + assertThat(result).isDone(); + final FetchResult> fetchResult = result.getNow(null); + assertThat(fetchResult.getPeer()).hasValue(peer2); + assertThat(fetchResult.isSuccessful()).isTrue(); + assertThat(fetchResult.getResult()).hasValue(blobSidecars); + } + + @Test + public void run_withPreferredPeer() { + final Eth2Peer preferredPeer = createNewPeer(1); + final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlockWithCommitments(3); + final List blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block); + final List blobIdentifiers = getBlobIdentifiers(blobSidecars); + mockRpcResponse(preferredPeer, blobIdentifiers, blobSidecars); + + final FetchBlobSidecarsTask task = + new FetchBlobSidecarsTask( + eth2P2PNetwork, Optional.of(preferredPeer), block.getRoot(), blobIdentifiers); + + // Add a peer + registerNewPeer(2); + + final SafeFuture>> result = task.run(); + assertThat(result).isDone(); + final FetchResult> fetchResult = result.getNow(null); + assertThat(fetchResult.getPeer()).hasValue(preferredPeer); + assertThat(fetchResult.isSuccessful()).isTrue(); + assertThat(fetchResult.getResult()).hasValue(blobSidecars); + } + + @Test + public void run_withRandomPeerWhenFetchingWithPreferredPeerFails() { + final Eth2Peer preferredPeer = createNewPeer(1); + final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlockWithCommitments(3); + final List blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block); + final List blobIdentifiers = getBlobIdentifiers(blobSidecars); + + when(preferredPeer.requestBlobSidecarsByRoot(eq(blobIdentifiers), any())) + .thenReturn(SafeFuture.failedFuture(new RuntimeException("whoops"))); + + final FetchBlobSidecarsTask task = + new FetchBlobSidecarsTask( + eth2P2PNetwork, Optional.of(preferredPeer), block.getRoot(), blobIdentifiers); + + final SafeFuture>> result = task.run(); + assertThat(result).isDone(); + final FetchResult> fetchResult = result.getNow(null); + assertThat(fetchResult.getPeer()).hasValue(preferredPeer); + assertThat(fetchResult.isSuccessful()).isFalse(); + assertThat(fetchResult.getStatus()).isEqualTo(Status.FETCH_FAILED); + assertThat(task.getNumberOfRetries()).isEqualTo(0); + + // Add a peer + final Eth2Peer peer = registerNewPeer(2); + mockRpcResponse(peer, blobIdentifiers, blobSidecars); + + // Retry + final SafeFuture>> result2 = task.run(); + assertThat(result).isDone(); + final FetchResult> fetchResult2 = result2.getNow(null); + assertThat(fetchResult2.getPeer()).hasValue(peer); + assertThat(fetchResult2.isSuccessful()).isTrue(); + assertThat(fetchResult2.getResult()).hasValue(blobSidecars); + assertThat(task.getNumberOfRetries()).isEqualTo(1); + } + + @Test + public void cancel() { + final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlockWithCommitments(3); + final List blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block); + final List blobIdentifiers = getBlobIdentifiers(blobSidecars); + + final FetchBlobSidecarsTask task = + new FetchBlobSidecarsTask(eth2P2PNetwork, block.getRoot(), blobIdentifiers); + assertThat(task.getKey()).isEqualTo(block.getRoot()); + + final Eth2Peer peer = registerNewPeer(1); + mockRpcResponse(peer, blobIdentifiers, blobSidecars); + + task.cancel(); + final SafeFuture>> result = task.run(); + assertThat(result).isDone(); + final FetchResult> fetchResult = result.getNow(null); + assertThat(fetchResult.getPeer()).isEmpty(); + assertThat(fetchResult.isSuccessful()).isFalse(); + assertThat(fetchResult.getStatus()).isEqualTo(Status.CANCELLED); + } + + private List getBlobIdentifiers(final List blobSidecars) { + return blobSidecars.stream() + .map(sidecar -> new BlobIdentifier(sidecar.getBlockRoot(), sidecar.getIndex())) + .toList(); + } + + private void mockRpcResponse( + final Eth2Peer peer, + final List blobIdentifiers, + List blobSidecars) { + when(peer.requestBlobSidecarsByRoot(eq(blobIdentifiers), any())) + .thenAnswer( + invocationOnMock -> { + final RpcResponseHandler handler = invocationOnMock.getArgument(1); + blobSidecars.forEach(handler::onResponse); + handler.onCompleted(); + return SafeFuture.COMPLETE; + }); + } +} diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetchServiceTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetchServiceTest.java index 3d6d51b9547..32bc02e5ecb 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetchServiceTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetchServiceTest.java @@ -22,15 +22,14 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; +import java.util.Map; +import org.apache.tuweni.bytes.Bytes32; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; -import tech.pegasys.teku.beacon.sync.fetch.FetchBlobSidecarTask; +import tech.pegasys.teku.beacon.sync.fetch.FetchBlobSidecarsTask; import tech.pegasys.teku.beacon.sync.fetch.FetchResult; import tech.pegasys.teku.beacon.sync.fetch.FetchResult.Status; import tech.pegasys.teku.beacon.sync.fetch.FetchTaskFactory; @@ -41,6 +40,7 @@ import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; import tech.pegasys.teku.spec.util.DataStructureUtil; import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool; @@ -62,8 +62,8 @@ class RecentBlobSidecarsFetchServiceTest { private final StubAsyncRunner asyncRunner = new StubAsyncRunner(); - private final List tasks = new ArrayList<>(); - private final List>> taskFutures = new ArrayList<>(); + private final List tasks = new ArrayList<>(); + private final List>>> taskFutures = new ArrayList<>(); private final List importedBlobSidecars = new ArrayList<>(); private RecentBlobSidecarsFetchService recentBlobSidecarsFetcher; @@ -79,7 +79,7 @@ public void setup() { maxConcurrentRequests); lenient() - .when(fetchTaskFactory.createFetchBlobSidecarTask(any())) + .when(fetchTaskFactory.createFetchBlobSidecarsTask(any(), any())) .thenAnswer(this::createMockTask); recentBlobSidecarsFetcher.subscribeBlobSidecarFetched(importedBlobSidecars::add); } @@ -88,57 +88,85 @@ public void setup() { public void fetchSingleBlobSidecarSuccessfully() { final BlobSidecar blobSidecar = dataStructureUtil.randomBlobSidecar(); - recentBlobSidecarsFetcher.requestRecentBlobSidecar( - new BlobIdentifier(blobSidecar.getBlockRoot(), blobSidecar.getIndex())); + recentBlobSidecarsFetcher.requestRecentBlobSidecars( + blobSidecar.getBlockRoot(), + List.of(new BlobIdentifier(blobSidecar.getBlockRoot(), blobSidecar.getIndex()))); assertTaskCounts(1, 1, 0); assertThat(importedBlobSidecars).isEmpty(); - final SafeFuture> future = taskFutures.get(0); - future.complete(FetchResult.createSuccessful(blobSidecar)); + final SafeFuture>> future = taskFutures.getFirst(); + future.complete(FetchResult.createSuccessful(List.of(blobSidecar))); assertThat(importedBlobSidecars).containsExactly(blobSidecar); assertTaskCounts(0, 0, 0); } @Test - public void handleDuplicateRequiredBlobSidecars() { + public void fetchMultipleBlobSidecarsSuccessfully() { + final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlockWithCommitments(3); + final List blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block); + + recentBlobSidecarsFetcher.requestRecentBlobSidecars( + block.getRoot(), + blobSidecars.stream() + .map(sidecar -> new BlobIdentifier(sidecar.getBlockRoot(), sidecar.getIndex())) + .toList()); + + assertTaskCounts(1, 1, 0); + assertThat(importedBlobSidecars).isEmpty(); + + final SafeFuture>> future = taskFutures.getFirst(); + future.complete(FetchResult.createSuccessful(blobSidecars)); + + assertThat(importedBlobSidecars).containsExactlyElementsOf(blobSidecars); + assertTaskCounts(0, 0, 0); + } + + @Test + public void handleRequiredBlobSidecarsWithSameBlockRoot() { final BlobSidecar blobSidecar = dataStructureUtil.randomBlobSidecar(); - final BlobIdentifier blobIdentifier = - new BlobIdentifier(blobSidecar.getBlockRoot(), blobSidecar.getIndex()); + final Bytes32 blockRoot = blobSidecar.getBlockRoot(); + final BlobIdentifier blobIdentifier = dataStructureUtil.randomBlobIdentifier(blockRoot); + final BlobIdentifier anotherBlobIdentifier = dataStructureUtil.randomBlobIdentifier(blockRoot); - recentBlobSidecarsFetcher.requestRecentBlobSidecar(blobIdentifier); - recentBlobSidecarsFetcher.requestRecentBlobSidecar(blobIdentifier); + recentBlobSidecarsFetcher.requestRecentBlobSidecars(blockRoot, List.of(blobIdentifier)); + recentBlobSidecarsFetcher.requestRecentBlobSidecars(blockRoot, List.of(anotherBlobIdentifier)); + // only one task allowed per block root assertTaskCounts(1, 1, 0); assertThat(importedBlobSidecars).isEmpty(); - final SafeFuture> future = taskFutures.get(0); - future.complete(FetchResult.createSuccessful(blobSidecar)); + final SafeFuture>> future = taskFutures.getFirst(); + future.complete(FetchResult.createSuccessful(List.of(blobSidecar))); assertThat(importedBlobSidecars).containsExactly(blobSidecar); assertTaskCounts(0, 0, 0); } @Test - public void ignoreKnownBlobSidecar() { - final BlobIdentifier blobIdentifier = dataStructureUtil.randomBlobIdentifier(); - when(blockBlobSidecarsTrackersPool.containsBlobSidecar(blobIdentifier)).thenReturn(true); - recentBlobSidecarsFetcher.requestRecentBlobSidecar(blobIdentifier); + public void ignoreIfNoBlobSidecarsAreRequired() { + final Bytes32 blockRoot = dataStructureUtil.randomBytes32(); + final BlobIdentifier blobIdentifier = dataStructureUtil.randomBlobIdentifier(blockRoot); + final BlobIdentifier anotherBlobIdentifier = dataStructureUtil.randomBlobIdentifier(blockRoot); + when(blockBlobSidecarsTrackersPool.containsBlobSidecar(any())).thenReturn(true); + recentBlobSidecarsFetcher.requestRecentBlobSidecars( + blockRoot, List.of(blobIdentifier, anotherBlobIdentifier)); assertTaskCounts(0, 0, 0); assertThat(importedBlobSidecars).isEmpty(); } @Test - public void cancelBlobSidecarRequest() { - final BlobIdentifier blobIdentifier = dataStructureUtil.randomBlobIdentifier(); - recentBlobSidecarsFetcher.requestRecentBlobSidecar(blobIdentifier); - recentBlobSidecarsFetcher.cancelRecentBlobSidecarRequest(blobIdentifier); + public void cancelBlobSidecarsRequest() { + final Bytes32 blockRoot = dataStructureUtil.randomBytes32(); + final BlobIdentifier blobIdentifier = dataStructureUtil.randomBlobIdentifier(blockRoot); + recentBlobSidecarsFetcher.requestRecentBlobSidecars(blockRoot, List.of(blobIdentifier)); + recentBlobSidecarsFetcher.cancelRecentBlobSidecarsRequest(blockRoot); - verify(tasks.get(0)).cancel(); + verify(tasks.getFirst()).cancel(); // Manually cancel future - taskFutures.get(0).complete(FetchResult.createFailed(Status.CANCELLED)); + taskFutures.getFirst().complete(FetchResult.createFailed(Status.CANCELLED)); // Task should be removed assertTaskCounts(0, 0, 0); @@ -147,46 +175,49 @@ public void cancelBlobSidecarRequest() { @Test public void fetchSingleBlobSidecarWithRetry() { - final BlobIdentifier blobIdentifier = dataStructureUtil.randomBlobIdentifier(); - recentBlobSidecarsFetcher.requestRecentBlobSidecar(blobIdentifier); + recentBlobSidecarsFetcher.requestRecentBlobSidecars( + dataStructureUtil.randomBytes32(), dataStructureUtil.randomBlobIdentifiers(2)); assertTaskCounts(1, 1, 0); assertThat(importedBlobSidecars).isEmpty(); - final SafeFuture> future = taskFutures.get(0); + final SafeFuture>> future = taskFutures.getFirst(); future.complete(FetchResult.createFailed(Status.FETCH_FAILED)); // Task should be queued for a retry via the scheduled executor - verify(tasks.get(0)).getNumberOfRetries(); + verify(tasks.getFirst()).getNumberOfRetries(); assertThat(asyncRunner.countDelayedActions()).isEqualTo(1); assertTaskCounts(1, 0, 0); // Executor should requeue task - when(tasks.get(0).run()).thenReturn(new SafeFuture<>()); + when(tasks.getFirst().run()).thenReturn(new SafeFuture<>()); asyncRunner.executeQueuedActions(); assertTaskCounts(1, 1, 0); } @Test public void cancelTaskWhileWaitingToRetry() { - final BlobIdentifier blobIdentifier = dataStructureUtil.randomBlobIdentifier(); - recentBlobSidecarsFetcher.requestRecentBlobSidecar(blobIdentifier); + final BlobSidecar blobSidecar = dataStructureUtil.randomBlobSidecar(); + final List blobIdentifiers = + List.of(new BlobIdentifier(blobSidecar.getBlockRoot(), blobSidecar.getIndex())); + recentBlobSidecarsFetcher.requestRecentBlobSidecars( + blobSidecar.getBlockRoot(), blobIdentifiers); assertTaskCounts(1, 1, 0); assertThat(importedBlobSidecars).isEmpty(); - final SafeFuture> future = taskFutures.get(0); + final SafeFuture>> future = taskFutures.getFirst(); future.complete(FetchResult.createFailed(Status.FETCH_FAILED)); // Task should be queued for a retry via the scheduled executor - verify(tasks.get(0)).getNumberOfRetries(); + verify(tasks.getFirst()).getNumberOfRetries(); assertThat(asyncRunner.countDelayedActions()).isEqualTo(1); assertTaskCounts(1, 0, 0); // Cancel task - recentBlobSidecarsFetcher.cancelRecentBlobSidecarRequest(blobIdentifier); - verify(tasks.get(0)).cancel(); - when(tasks.get(0).run()) + recentBlobSidecarsFetcher.cancelRecentBlobSidecarsRequest(blobSidecar.getBlockRoot()); + verify(tasks.getFirst()).cancel(); + when(tasks.getFirst().run()) .thenReturn(SafeFuture.completedFuture(FetchResult.createFailed(Status.CANCELLED))); // Executor should requeue task, it should complete immediately and be removed @@ -196,22 +227,22 @@ public void cancelTaskWhileWaitingToRetry() { @Test public void handlesPeersUnavailable() { - final BlobIdentifier blobIdentifier = dataStructureUtil.randomBlobIdentifier(); - recentBlobSidecarsFetcher.requestRecentBlobSidecar(blobIdentifier); + recentBlobSidecarsFetcher.requestRecentBlobSidecars( + dataStructureUtil.randomBytes32(), dataStructureUtil.randomBlobIdentifiers(2)); assertTaskCounts(1, 1, 0); assertThat(importedBlobSidecars).isEmpty(); - final SafeFuture> future = taskFutures.get(0); + final SafeFuture>> future = taskFutures.getFirst(); future.complete(FetchResult.createFailed(Status.NO_AVAILABLE_PEERS)); // Task should be queued for a retry via the scheduled executor - verify(tasks.get(0), never()).getNumberOfRetries(); + verify(tasks.getFirst(), never()).getNumberOfRetries(); assertThat(asyncRunner.countDelayedActions()).isEqualTo(1); assertTaskCounts(1, 0, 0); // Executor should requeue task - when(tasks.get(0).run()).thenReturn(new SafeFuture<>()); + when(tasks.getFirst().run()).thenReturn(new SafeFuture<>()); asyncRunner.executeQueuedActions(); assertTaskCounts(1, 1, 0); } @@ -220,16 +251,16 @@ public void handlesPeersUnavailable() { public void queueFetchTaskWhenConcurrencyLimitReached() { final int taskCount = maxConcurrentRequests + 1; for (int i = 0; i < taskCount; i++) { - final BlobIdentifier blobIdentifier = dataStructureUtil.randomBlobIdentifier(); - recentBlobSidecarsFetcher.requestRecentBlobSidecar(blobIdentifier); + recentBlobSidecarsFetcher.requestRecentBlobSidecars( + dataStructureUtil.randomBytes32(), dataStructureUtil.randomBlobIdentifiers(2)); } assertTaskCounts(taskCount, taskCount - 1, 1); // Complete first task - final SafeFuture> future = taskFutures.get(0); + final SafeFuture>> future = taskFutures.getFirst(); final BlobSidecar blobSidecar = dataStructureUtil.randomBlobSidecar(); - future.complete(FetchResult.createSuccessful(blobSidecar)); + future.complete(FetchResult.createSuccessful(List.of(blobSidecar))); // After first task completes, remaining pending count should become active assertTaskCounts(taskCount - 1, taskCount - 1, 0); @@ -239,14 +270,21 @@ public void queueFetchTaskWhenConcurrencyLimitReached() { void shouldNotFetchBlobSidecarsWhileForwardSyncIsInProgress() { when(forwardSync.isSyncActive()).thenReturn(true); - recentBlobSidecarsFetcher.requestRecentBlobSidecar(dataStructureUtil.randomBlobIdentifier()); + recentBlobSidecarsFetcher.requestRecentBlobSidecars( + dataStructureUtil.randomBytes32(), dataStructureUtil.randomBlobIdentifiers(2)); assertTaskCounts(0, 0, 0); } @Test void shouldRequestRemainingRequiredBlobSidecarsWhenForwardSyncCompletes() { - final Set requiredBlobIdentifiers = - new HashSet<>(dataStructureUtil.randomBlobIdentifiers(2)); + final Bytes32 blockRoot = dataStructureUtil.randomBytes32(); + final Bytes32 blockRoot1 = dataStructureUtil.randomBytes32(); + final Map> requiredBlobIdentifiers = + Map.of( + blockRoot, + List.of(dataStructureUtil.randomBlobIdentifier(blockRoot)), + blockRoot1, + List.of(dataStructureUtil.randomBlobIdentifier(blockRoot1))); when(blockBlobSidecarsTrackersPool.getAllRequiredBlobSidecars()) .thenReturn(requiredBlobIdentifiers); @@ -258,19 +296,18 @@ void shouldRequestRemainingRequiredBlobSidecarsWhenForwardSyncCompletes() { syncSubscriber.onSyncingChange(false); assertTaskCounts(2, 2, 0); - final Set requestingBlobIdentifiers = - tasks.stream().map(FetchBlobSidecarTask::getKey).collect(Collectors.toSet()); - assertThat(requestingBlobIdentifiers) - .containsExactlyInAnyOrderElementsOf(requiredBlobIdentifiers); + final List requestingBlockRoots = + tasks.stream().map(FetchBlobSidecarsTask::getKey).toList(); + assertThat(requestingBlockRoots).containsExactly(blockRoot, blockRoot1); } - private FetchBlobSidecarTask createMockTask(final InvocationOnMock invocationOnMock) { - final BlobIdentifier blobIdentifier = invocationOnMock.getArgument(0); - final FetchBlobSidecarTask task = mock(FetchBlobSidecarTask.class); + private FetchBlobSidecarsTask createMockTask(final InvocationOnMock invocationOnMock) { + final Bytes32 blockRoot = invocationOnMock.getArgument(0); + final FetchBlobSidecarsTask task = mock(FetchBlobSidecarsTask.class); - lenient().when(task.getKey()).thenReturn(blobIdentifier); + lenient().when(task.getKey()).thenReturn(blockRoot); lenient().when(task.getNumberOfRetries()).thenReturn(0); - final SafeFuture> future = new SafeFuture<>(); + final SafeFuture>> future = new SafeFuture<>(); lenient().when(task.run()).thenReturn(future); taskFutures.add(future); tasks.add(task); diff --git a/eth-reference-tests/src/referenceTest/java/tech/pegasys/teku/reference/phase0/forkchoice/StubBlobSidecarManager.java b/eth-reference-tests/src/referenceTest/java/tech/pegasys/teku/reference/phase0/forkchoice/StubBlobSidecarManager.java index 5ffe6cdec0b..62387271513 100644 --- a/eth-reference-tests/src/referenceTest/java/tech/pegasys/teku/reference/phase0/forkchoice/StubBlobSidecarManager.java +++ b/eth-reference-tests/src/referenceTest/java/tech/pegasys/teku/reference/phase0/forkchoice/StubBlobSidecarManager.java @@ -64,12 +64,6 @@ public void prepareForBlockImport(final BlobSidecar blobSidecar, final RemoteOri // NOOP } - @Override - public void subscribeToReceivedBlobSidecar( - final ReceivedBlobSidecarListener receivedBlobSidecarListener) { - // NOOP - } - @Override public boolean isAvailabilityRequiredAtSlot(final UInt64 slot) { // NOOP diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManager.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManager.java index 7bfea3926be..4ca2c1ca5c9 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManager.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManager.java @@ -37,10 +37,6 @@ public SafeFuture validateAndPrepareForBlockImport( public void prepareForBlockImport( final BlobSidecar blobSidecar, final RemoteOrigin remoteOrigin) {} - @Override - public void subscribeToReceivedBlobSidecar( - final ReceivedBlobSidecarListener receivedBlobSidecarListener) {} - @Override public boolean isAvailabilityRequiredAtSlot(final UInt64 slot) { return false; @@ -64,8 +60,6 @@ SafeFuture validateAndPrepareForBlockImport( void prepareForBlockImport(BlobSidecar blobSidecar, RemoteOrigin origin); - void subscribeToReceivedBlobSidecar(ReceivedBlobSidecarListener receivedBlobSidecarListener); - boolean isAvailabilityRequiredAtSlot(UInt64 slot); BlobSidecarsAvailabilityChecker createAvailabilityChecker(SignedBeaconBlock block); @@ -73,10 +67,6 @@ SafeFuture validateAndPrepareForBlockImport( BlobSidecarsAndValidationResult createAvailabilityCheckerAndValidateImmediately( SignedBeaconBlock block, List blobSidecars); - interface ReceivedBlobSidecarListener { - void onBlobSidecarReceived(BlobSidecar blobSidecar); - } - enum RemoteOrigin { RPC, GOSSIP, diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerImpl.java index fc20d0c60e8..c1b5d97163d 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerImpl.java @@ -20,7 +20,6 @@ import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.ethereum.events.SlotEventsChannel; import tech.pegasys.teku.infrastructure.async.SafeFuture; -import tech.pegasys.teku.infrastructure.subscribers.Subscribers; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.kzg.KZG; import tech.pegasys.teku.spec.Spec; @@ -46,9 +45,6 @@ public class BlobSidecarManagerImpl implements BlobSidecarManager, SlotEventsCha forkChoiceBlobSidecarsAvailabilityCheckerProvider; private final UnpooledBlockBlobSidecarsTrackerProvider unpooledBlockBlobSidecarsTrackerProvider; - private final Subscribers receivedBlobSidecarSubscribers = - Subscribers.create(true); - public BlobSidecarManagerImpl( final Spec spec, final RecentChainData recentChainData, @@ -130,13 +126,6 @@ public SafeFuture validateAndPrepareForBlockImport( @Override public void prepareForBlockImport(final BlobSidecar blobSidecar, final RemoteOrigin origin) { blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar, origin); - receivedBlobSidecarSubscribers.forEach(s -> s.onBlobSidecarReceived(blobSidecar)); - } - - @Override - public void subscribeToReceivedBlobSidecar( - final ReceivedBlobSidecarListener receivedBlobSidecarListener) { - receivedBlobSidecarSubscribers.subscribe(receivedBlobSidecarListener); } @Override diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java index 64c89d91ba1..ce4f7fd5bd8 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTracker.java @@ -14,11 +14,11 @@ package tech.pegasys.teku.statetransition.blobs; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; import com.google.common.base.MoreObjects; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Optional; @@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import tech.pegasys.teku.infrastructure.async.SafeFuture; @@ -110,31 +109,24 @@ public Optional getBlobSidecar(final UInt64 index) { return Optional.ofNullable(blobSidecars.get(index)); } - public Stream getMissingBlobSidecars() { + public List getMissingBlobSidecars() { final Optional blockCommitmentsCount = getBlockKzgCommitmentsCount(); if (blockCommitmentsCount.isPresent()) { return UInt64.range(UInt64.ZERO, UInt64.valueOf(blockCommitmentsCount.get())) .filter(blobIndex -> !blobSidecars.containsKey(blobIndex)) - .map(blobIndex -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), blobIndex)); + .map(blobIndex -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), blobIndex)) + .toList(); } if (blobSidecars.isEmpty()) { - return Stream.of(); + return List.of(); } // We may return maxBlobsPerBlock because we don't know the block return UInt64.range(UInt64.ZERO, maxBlobsPerBlock) .filter(blobIndex -> !blobSidecars.containsKey(blobIndex)) - .map(blobIndex -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), blobIndex)); - } - - public Stream getUnusedBlobSidecarsForBlock() { - final Optional blockCommitmentsCount = getBlockKzgCommitmentsCount(); - checkState(blockCommitmentsCount.isPresent(), "Block must me known to call this method"); - - final UInt64 firstUnusedIndex = UInt64.valueOf(blockCommitmentsCount.get()); - return UInt64.range(firstUnusedIndex, maxBlobsPerBlock) - .map(blobIndex -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), blobIndex)); + .map(blobIndex -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), blobIndex)) + .toList(); } public boolean add(final BlobSidecar blobSidecar) { diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTrackersPool.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTrackersPool.java index fd96d946575..0d6c2c644ff 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTrackersPool.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTrackersPool.java @@ -15,8 +15,8 @@ import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; -import java.util.Set; import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.ethereum.events.SlotEventsChannel; import tech.pegasys.teku.infrastructure.unsigned.UInt64; @@ -80,21 +80,13 @@ public Optional getBlockBlobSidecarsTracker( } @Override - public Set getAllRequiredBlobSidecars() { - return Collections.emptySet(); + public Map> getAllRequiredBlobSidecars() { + return Collections.emptyMap(); } @Override public void enableBlockImportOnCompletion(final SignedBeaconBlock block) {} - @Override - public void subscribeRequiredBlobSidecar( - final RequiredBlobSidecarSubscriber requiredBlobSidecarSubscriber) {} - - @Override - public void subscribeRequiredBlobSidecarDropped( - final RequiredBlobSidecarDroppedSubscriber requiredBlobSidecarDroppedSubscriber) {} - @Override public void subscribeRequiredBlockRoot( final RequiredBlockRootSubscriber requiredBlockRootSubscriber) {} @@ -103,6 +95,14 @@ public void subscribeRequiredBlockRoot( public void subscribeRequiredBlockRootDropped( final RequiredBlockRootDroppedSubscriber requiredBlockRootDroppedSubscriber) {} + @Override + public void subscribeRequiredBlobSidecars( + final RequiredBlobSidecarsSubscriber requiredBlobSidecarsSubscriber) {} + + @Override + public void subscribeRequiredBlobSidecarsDropped( + final RequiredBlobSidecarsDroppedSubscriber requiredBlobSidecarsDroppedSubscriber) {} + @Override public void subscribeNewBlobSidecar(NewBlobSidecarSubscriber newBlobSidecarSubscriber) {} }; @@ -123,7 +123,7 @@ public void subscribeNewBlobSidecar(NewBlobSidecarSubscriber newBlobSidecarSubsc Optional getBlock(Bytes32 blockRoot); - Set getAllRequiredBlobSidecars(); + Map> getAllRequiredBlobSidecars(); BlockBlobSidecarsTracker getOrCreateBlockBlobSidecarsTracker(SignedBeaconBlock block); @@ -131,25 +131,17 @@ public void subscribeNewBlobSidecar(NewBlobSidecarSubscriber newBlobSidecarSubsc void enableBlockImportOnCompletion(SignedBeaconBlock block); - void subscribeRequiredBlobSidecar(RequiredBlobSidecarSubscriber requiredBlobSidecarSubscriber); - - void subscribeRequiredBlobSidecarDropped( - RequiredBlobSidecarDroppedSubscriber requiredBlobSidecarDroppedSubscriber); - void subscribeRequiredBlockRoot(RequiredBlockRootSubscriber requiredBlockRootSubscriber); void subscribeRequiredBlockRootDropped( RequiredBlockRootDroppedSubscriber requiredBlockRootDroppedSubscriber); - void subscribeNewBlobSidecar(NewBlobSidecarSubscriber newBlobSidecarSubscriber); + void subscribeRequiredBlobSidecars(RequiredBlobSidecarsSubscriber requiredBlobSidecarsSubscriber); - interface RequiredBlobSidecarSubscriber { - void onRequiredBlobSidecar(BlobIdentifier blobIdentifier); - } + void subscribeRequiredBlobSidecarsDropped( + RequiredBlobSidecarsDroppedSubscriber requiredBlobSidecarsDroppedSubscriber); - interface RequiredBlobSidecarDroppedSubscriber { - void onRequiredBlobSidecarDropped(BlobIdentifier blobIdentifier); - } + void subscribeNewBlobSidecar(NewBlobSidecarSubscriber newBlobSidecarSubscriber); interface RequiredBlockRootSubscriber { void onRequiredBlockRoot(Bytes32 blockRoot); @@ -159,6 +151,14 @@ interface RequiredBlockRootDroppedSubscriber { void onRequiredBlockRootDropped(Bytes32 blockRoot); } + interface RequiredBlobSidecarsSubscriber { + void onRequiredBlobSidecars(Bytes32 blockRoot, List blobIdentifiers); + } + + interface RequiredBlobSidecarsDroppedSubscriber { + void onRequiredBlobSidecarsDropped(Bytes32 blockRoot); + } + interface NewBlobSidecarSubscriber { void onNewBlobSidecar(BlobSidecar blobSidecar); } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index 0bfb291d0e2..10106122acf 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -14,6 +14,7 @@ package tech.pegasys.teku.statetransition.util; import static com.google.common.base.Preconditions.checkArgument; +import static java.util.stream.Collectors.toMap; import static tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil.getRootCauseMessage; import static tech.pegasys.teku.infrastructure.time.TimeUtilities.secondsToMillis; import static tech.pegasys.teku.statetransition.blobs.BlobSidecarManager.RemoteOrigin.LOCAL_EL; @@ -25,14 +26,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.NavigableSet; import java.util.Optional; -import java.util.Set; import java.util.TreeSet; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -116,10 +116,10 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis private final Subscribers requiredBlockRootDroppedSubscribers = Subscribers.create(true); - private final Subscribers requiredBlobSidecarSubscribers = + private final Subscribers requiredBlobSidecarsSubscribers = Subscribers.create(true); - private final Subscribers - requiredBlobSidecarDroppedSubscribers = Subscribers.create(true); + private final Subscribers + requiredBlobSidecarsDroppedSubscribers = Subscribers.create(true); private final Subscribers newBlobSidecarSubscribers = Subscribers.create(true); @@ -344,7 +344,7 @@ public synchronized void onCompletedBlockAndBlobSidecars( LOG.error( "Tracker for block {} is supposed to be completed but it is not. Missing blob sidecars: {}", block.toLogString(), - blobSidecarsTracker.getMissingBlobSidecars().count()); + blobSidecarsTracker.getMissingBlobSidecars().size()); } if (orderedBlobSidecarsTrackers.add(slotAndBlockRoot)) { @@ -425,10 +425,14 @@ public synchronized Optional getBlock(final Bytes32 blockRoot } @Override - public synchronized Set getAllRequiredBlobSidecars() { - return blockBlobSidecarsTrackers.values().stream() - .flatMap(BlockBlobSidecarsTracker::getMissingBlobSidecars) - .collect(Collectors.toSet()); + public synchronized Map> getAllRequiredBlobSidecars() { + return blockBlobSidecarsTrackers.entrySet().stream() + .filter( + entry -> { + final BlockBlobSidecarsTracker tracker = entry.getValue(); + return tracker.getBlock().isPresent() && !tracker.isComplete(); + }) + .collect(toMap(Entry::getKey, entry -> entry.getValue().getMissingBlobSidecars())); } @Override @@ -437,18 +441,6 @@ public synchronized void enableBlockImportOnCompletion(final SignedBeaconBlock b .ifPresent(tracker -> tracker.enableBlockImportOnCompletion(blockImportChannel)); } - @Override - public void subscribeRequiredBlobSidecar( - final RequiredBlobSidecarSubscriber requiredBlobSidecarSubscriber) { - requiredBlobSidecarSubscribers.subscribe(requiredBlobSidecarSubscriber); - } - - @Override - public void subscribeRequiredBlobSidecarDropped( - final RequiredBlobSidecarDroppedSubscriber requiredBlobSidecarDroppedSubscriber) { - requiredBlobSidecarDroppedSubscribers.subscribe(requiredBlobSidecarDroppedSubscriber); - } - @Override public void subscribeRequiredBlockRoot( final RequiredBlockRootSubscriber requiredBlockRootSubscriber) { @@ -461,6 +453,18 @@ public void subscribeRequiredBlockRootDropped( requiredBlockRootDroppedSubscribers.subscribe(requiredBlockRootDroppedSubscriber); } + @Override + public void subscribeRequiredBlobSidecars( + final RequiredBlobSidecarsSubscriber requiredBlobSidecarsSubscriber) { + requiredBlobSidecarsSubscribers.subscribe(requiredBlobSidecarsSubscriber); + } + + @Override + public void subscribeRequiredBlobSidecarsDropped( + final RequiredBlobSidecarsDroppedSubscriber requiredBlobSidecarsDroppedSubscriber) { + requiredBlobSidecarsDroppedSubscribers.subscribe(requiredBlobSidecarsDroppedSubscriber); + } + @Override public void subscribeNewBlobSidecar(final NewBlobSidecarSubscriber newBlobSidecarSubscriber) { newBlobSidecarSubscribers.subscribe(newBlobSidecarSubscriber); @@ -496,17 +500,6 @@ private BlockBlobSidecarsTracker internalOnNewBlock( countBlock(remoteOrigin); if (existingTracker.isRpcFetchTriggered()) { - // block has been set for the first time and we previously triggered fetching of - // missing blobSidecars. So we may have requested to fetch more sidecars - // than the block actually requires. Let's drop them. - existingTracker - .getUnusedBlobSidecarsForBlock() - .forEach( - blobIdentifier -> - requiredBlobSidecarDroppedSubscribers.deliver( - RequiredBlobSidecarDroppedSubscriber::onRequiredBlobSidecarDropped, - blobIdentifier)); - // if we attempted to fetch via RPC, we missed the opportunity to complete the // tracker via local EL (local El fetch requires the block to be known) // Let's try now @@ -680,7 +673,7 @@ private synchronized SafeFuture fetchMissingContentFromLocalEL( beaconBlockBodyDeneb.getBlobKzgCommitments(); final List missingBlobsIdentifiers = - blockBlobSidecarsTracker.getMissingBlobSidecars().toList(); + blockBlobSidecarsTracker.getMissingBlobSidecars(); final List versionedHashes = missingBlobsIdentifiers.stream() @@ -702,7 +695,7 @@ private synchronized SafeFuture fetchMissingContentFromLocalEL( blobAndProofs -> { checkArgument( blobAndProofs.size() == versionedHashes.size(), - "Queried %s versionedHashed but got %s blobAndProofs", + "Queried %d versionedHashed but got %s blobAndProofs", versionedHashes.size(), blobAndProofs.size()); @@ -777,14 +770,16 @@ private synchronized void fetchMissingContentFromRemotePeers( blockBlobSidecarsTracker.getSlotAndBlockRoot().getBlockRoot()); } - blockBlobSidecarsTracker - .getMissingBlobSidecars() - .forEach( - blobIdentifier -> { - poolStatsCounters.labels(COUNTER_SIDECAR_TYPE, COUNTER_RPC_FETCH_SUBTYPE).inc(); - requiredBlobSidecarSubscribers.deliver( - RequiredBlobSidecarSubscriber::onRequiredBlobSidecar, blobIdentifier); - }); + final List missingBlobIdentifiers = + blockBlobSidecarsTracker.getMissingBlobSidecars(); + + poolStatsCounters + .labels(COUNTER_SIDECAR_TYPE, COUNTER_RPC_FETCH_SUBTYPE) + .inc(missingBlobIdentifiers.size()); + requiredBlobSidecarsSubscribers.deliver( + requiredBlobSidecarsSubscriber -> + requiredBlobSidecarsSubscriber.onRequiredBlobSidecars( + slotAndBlockRoot.getBlockRoot(), missingBlobIdentifiers)); } private void dropMissingContent(final BlockBlobSidecarsTracker blockBlobSidecarsTracker) { @@ -799,12 +794,8 @@ private void dropMissingContent(final BlockBlobSidecarsTracker blockBlobSidecars blockBlobSidecarsTracker.getSlotAndBlockRoot().getBlockRoot()); } - blockBlobSidecarsTracker - .getMissingBlobSidecars() - .forEach( - blobIdentifier -> - requiredBlobSidecarDroppedSubscribers.deliver( - RequiredBlobSidecarDroppedSubscriber::onRequiredBlobSidecarDropped, - blobIdentifier)); + requiredBlobSidecarsDroppedSubscribers.deliver( + RequiredBlobSidecarsDroppedSubscriber::onRequiredBlobSidecarsDropped, + blockBlobSidecarsTracker.getSlotAndBlockRoot().getBlockRoot()); } } diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerTest.java index cac650a982e..349bb0cd517 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlobSidecarManagerTest.java @@ -40,7 +40,6 @@ import tech.pegasys.teku.spec.logic.versions.deneb.blobs.BlobSidecarsAndValidationResult; import tech.pegasys.teku.spec.logic.versions.deneb.blobs.BlobSidecarsAvailabilityChecker; import tech.pegasys.teku.spec.util.DataStructureUtil; -import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager.ReceivedBlobSidecarListener; import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager.RemoteOrigin; import tech.pegasys.teku.statetransition.blobs.BlobSidecarManagerImpl.ForkChoiceBlobSidecarsAvailabilityCheckerProvider; import tech.pegasys.teku.statetransition.blobs.BlobSidecarManagerImpl.UnpooledBlockBlobSidecarsTrackerProvider; @@ -81,9 +80,6 @@ public class BlobSidecarManagerTest { forkChoiceBlobSidecarsAvailabilityCheckerProvider, unpooledBlockBlobSidecarsTrackerProvider); - private final ReceivedBlobSidecarListener receivedBlobSidecarListener = - mock(ReceivedBlobSidecarListener.class); - private final BlobSidecar blobSidecar = dataStructureUtil.randomBlobSidecar(); private final List blobSidecars = List.of(blobSidecar); private final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(UInt64.ONE); @@ -92,7 +88,6 @@ public class BlobSidecarManagerTest { void setUp() { when(blobSidecarValidator.validate(any())) .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); - blobSidecarManager.subscribeToReceivedBlobSidecar(receivedBlobSidecarListener); } @Test @@ -102,7 +97,6 @@ void validateAndPrepareForBlockImport_shouldPrepareBlobSidecar() { .isCompletedWithValue(InternalValidationResult.ACCEPT); verify(blockBlobSidecarsTrackersPool).onNewBlobSidecar(blobSidecar, RemoteOrigin.GOSSIP); - verify(receivedBlobSidecarListener).onBlobSidecarReceived(blobSidecar); verify(futureBlobSidecars, never()).add(blobSidecar); assertThat(invalidBlobSidecarRoots.size()).isEqualTo(0); @@ -118,7 +112,6 @@ void validateAndPrepareForBlockImport_shouldSaveForTheFuture() { .isCompletedWithValue(InternalValidationResult.SAVE_FOR_FUTURE); verify(blockBlobSidecarsTrackersPool, never()).onNewBlobSidecar(eq(blobSidecar), any()); - verify(receivedBlobSidecarListener, never()).onBlobSidecarReceived(blobSidecar); verify(futureBlobSidecars).add(blobSidecar); assertThat(invalidBlobSidecarRoots.size()).isEqualTo(0); @@ -134,7 +127,6 @@ void validateAndPrepareForBlockImport_shouldReject() { .isCompletedWithValueMatching(InternalValidationResult::isReject); verify(blockBlobSidecarsTrackersPool, never()).onNewBlobSidecar(eq(blobSidecar), any()); - verify(receivedBlobSidecarListener, never()).onBlobSidecarReceived(blobSidecar); verify(futureBlobSidecars, never()).add(blobSidecar); assertThat(invalidBlobSidecarRoots) @@ -152,7 +144,6 @@ void validateAndPrepareForBlockImport_shouldIgnore() { .isCompletedWithValue(InternalValidationResult.IGNORE); verify(blockBlobSidecarsTrackersPool, never()).onNewBlobSidecar(eq(blobSidecar), any()); - verify(receivedBlobSidecarListener, never()).onBlobSidecarReceived(blobSidecar); verify(futureBlobSidecars, never()).add(blobSidecar); assertThat(invalidBlobSidecarRoots.size()).isEqualTo(0); @@ -169,7 +160,6 @@ void validateAndPrepareForBlockImport_shouldRejectKnownInvalidBlobs() { verify(blobSidecarValidator, never()).validate(any()); verify(blockBlobSidecarsTrackersPool, never()).onNewBlobSidecar(eq(blobSidecar), any()); - verify(receivedBlobSidecarListener, never()).onBlobSidecarReceived(blobSidecar); verify(futureBlobSidecars, never()).add(blobSidecar); assertThat(invalidBlobSidecarRoots.size()).isEqualTo(1); @@ -179,7 +169,6 @@ void validateAndPrepareForBlockImport_shouldRejectKnownInvalidBlobs() { void prepareForBlockImport_shouldAddToPoolAndNotify() { blobSidecarManager.prepareForBlockImport(blobSidecar, RemoteOrigin.GOSSIP); - verify(receivedBlobSidecarListener).onBlobSidecarReceived(blobSidecar); verify(blockBlobSidecarsTrackersPool).onNewBlobSidecar(blobSidecar, RemoteOrigin.GOSSIP); } @@ -196,7 +185,6 @@ void onSlot_shouldInteractWithPoolAndFutureBlobs() { verify(blockBlobSidecarsTrackersPool) .onNewBlobSidecar(futureBlobSidecarsList.getFirst(), RemoteOrigin.GOSSIP); - verify(receivedBlobSidecarListener).onBlobSidecarReceived(futureBlobSidecarsList.getFirst()); } @Test diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTrackerTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTrackerTest.java index 275fb2fcada..73f311d653d 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTrackerTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/BlockBlobSidecarsTrackerTest.java @@ -229,7 +229,7 @@ void add_shouldWorkWhenBlockIsSetFirst() { blockBlobSidecarsTracker.setBlock(block); - final BlobSidecar toAdd = blobSidecarsForBlock.get(0); + final BlobSidecar toAdd = blobSidecarsForBlock.getFirst(); final Map added = new HashMap<>(); final List stillMissing = @@ -280,61 +280,6 @@ void getMissingBlobSidecars_shouldReturnPartialBlobsIdentifierWhenBlockIsUnknown .containsExactlyInAnyOrderElementsOf(knownMissing); } - @Test - void getUnusedBlobSidecarsForBlock_shouldReturnShouldFailIfBlockIsUnknown() { - final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); - - assertThatThrownBy(blockBlobSidecarsTracker::getUnusedBlobSidecarsForBlock) - .isInstanceOf(IllegalStateException.class); - } - - @Test - void getUnusedBlobSidecarsForBlock_shouldReturnEmptySetIfBlockIsFull() { - final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock); - - blockBlobSidecarsTracker.setBlock(block); - - assertThat(blockBlobSidecarsTracker.getUnusedBlobSidecarsForBlock()).isEmpty(); - } - - @Test - void getUnusedBlobSidecarsForBlock_shouldReturnUnusedBlobSpace() { - final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock.plus(2)); - - blockBlobSidecarsTracker.setBlock(block); - - final Set expectedUnusedBlobs = - UInt64.range(maxBlobsPerBlock, maxBlobsPerBlock.plus(2)) - .map(index -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), index)) - .collect(Collectors.toSet()); - - assertThat(blockBlobSidecarsTracker.getUnusedBlobSidecarsForBlock()) - .containsExactlyInAnyOrderElementsOf(expectedUnusedBlobs); - } - - @Test - void getUnusedBlobSidecarsForBlock_shouldReturnAllMaxBlobsPerBlockIfBlockIsEmpty() { - - final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlockWithEmptyCommitments(); - final SlotAndBlockRoot slotAndBlockRoot = block.getSlotAndBlockRoot(); - - final BlockBlobSidecarsTracker blockBlobSidecarsTracker = - new BlockBlobSidecarsTracker(slotAndBlockRoot, maxBlobsPerBlock.plus(2)); - - blockBlobSidecarsTracker.setBlock(block); - - final Set expectedUnusedBlobs = - UInt64.range(UInt64.ZERO, maxBlobsPerBlock.plus(2)) - .map(index -> new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), index)) - .collect(Collectors.toSet()); - - assertThat(blockBlobSidecarsTracker.getUnusedBlobSidecarsForBlock()) - .containsExactlyInAnyOrderElementsOf(expectedUnusedBlobs); - } - @Test void getMissingBlobSidecars_shouldRespectMaxBlobsPerBlock() { final BlockBlobSidecarsTracker blockBlobSidecarsTracker = diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java index 2dad121a9eb..0196353575e 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java @@ -36,7 +36,6 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; -import java.util.stream.Stream; import org.apache.tuweni.bytes.Bytes32; import org.hyperledger.besu.metrics.ObservableMetricsSystem; import org.hyperledger.besu.metrics.Observation; @@ -106,8 +105,8 @@ public class BlockBlobSidecarsTrackersPoolImplTest { private UInt64 currentSlot = historicalTolerance.times(2); private final List requiredBlockRootEvents = new ArrayList<>(); private final List requiredBlockRootDroppedEvents = new ArrayList<>(); - private final List requiredBlobSidecarEvents = new ArrayList<>(); - private final List requiredBlobSidecarDroppedEvents = new ArrayList<>(); + private final List requiredBlobSidecarsEvents = new ArrayList<>(); + private final List requiredBlobSidecarsDroppedEvents = new ArrayList<>(); private final List newBlobSidecarEvents = new ArrayList<>(); private Optional> mockedTrackersFactory = @@ -119,9 +118,10 @@ public void setup() { blockBlobSidecarsTrackersPool.subscribeRequiredBlockRoot(requiredBlockRootEvents::add); blockBlobSidecarsTrackersPool.subscribeRequiredBlockRootDropped( requiredBlockRootDroppedEvents::add); - blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecar(requiredBlobSidecarEvents::add); - blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecarDropped( - requiredBlobSidecarDroppedEvents::add); + blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecars( + (blockRoot, blobIdentifiers) -> requiredBlobSidecarsEvents.addAll(blobIdentifiers)); + blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecarsDropped( + requiredBlobSidecarsDroppedEvents::add); blockBlobSidecarsTrackersPool.subscribeNewBlobSidecar(newBlobSidecarEvents::add); when(blobSidecarPublisher.apply(any())).thenReturn(SafeFuture.COMPLETE); setSlot(currentSlot); @@ -147,8 +147,8 @@ public void onNewBlock_addTrackerWithBlock() { assertThat(blockBlobSidecarsTrackersPool.getBlock(block.getRoot())).contains(block); assertThat(requiredBlockRootEvents).isEmpty(); assertThat(requiredBlockRootDroppedEvents).isEmpty(); - assertThat(requiredBlobSidecarEvents).isEmpty(); - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); + assertThat(requiredBlobSidecarsEvents).isEmpty(); + assertThat(requiredBlobSidecarsDroppedEvents).isEmpty(); assertBlobSidecarsCount(0); assertBlobSidecarsTrackersCount(1); @@ -174,8 +174,8 @@ public void onNewBlobSidecar_addTrackerWithBlobSidecarIgnoringDuplicates() { .contains(blobSidecar); assertThat(requiredBlockRootEvents).isEmpty(); assertThat(requiredBlockRootDroppedEvents).isEmpty(); - assertThat(requiredBlobSidecarEvents).isEmpty(); - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); + assertThat(requiredBlobSidecarsEvents).isEmpty(); + assertThat(requiredBlobSidecarsDroppedEvents).isEmpty(); assertThat(newBlobSidecarEvents).containsExactly(blobSidecar); assertBlobSidecarsCount(1); @@ -199,8 +199,8 @@ public void onNewBlobSidecar_shouldIgnoreDuplicates() { .isTrue(); assertThat(requiredBlockRootEvents).isEmpty(); assertThat(requiredBlockRootDroppedEvents).isEmpty(); - assertThat(requiredBlobSidecarEvents).isEmpty(); - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); + assertThat(requiredBlobSidecarsEvents).isEmpty(); + assertThat(requiredBlobSidecarsDroppedEvents).isEmpty(); assertThat(newBlobSidecarEvents).containsExactly(blobSidecar); assertBlobSidecarsCount(1); @@ -288,8 +288,8 @@ public void onNewBlock_shouldIgnorePreDenebBlocks() { assertThat(blockBlobSidecarsTrackersPool.containsBlock(block.getRoot())).isFalse(); assertThat(requiredBlockRootEvents).isEmpty(); assertThat(requiredBlockRootDroppedEvents).isEmpty(); - assertThat(requiredBlobSidecarEvents).isEmpty(); - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); + assertThat(requiredBlobSidecarsEvents).isEmpty(); + assertThat(requiredBlobSidecarsDroppedEvents).isEmpty(); assertBlobSidecarsCount(0); assertBlobSidecarsTrackersCount(0); @@ -314,8 +314,8 @@ public void onNewBlock_shouldIgnorePreDenebBlocks() { assertThat(blockBlobSidecarsTrackersPool.containsBlock(block.getRoot())).isFalse(); assertThat(requiredBlockRootEvents).isEmpty(); assertThat(requiredBlockRootDroppedEvents).isEmpty(); - assertThat(requiredBlobSidecarEvents).isEmpty(); - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); + assertThat(requiredBlobSidecarsEvents).isEmpty(); + assertThat(requiredBlobSidecarsDroppedEvents).isEmpty(); assertThat(newBlobSidecarEvents).isEmpty(); assertBlobSidecarsCount(0); @@ -350,7 +350,7 @@ public void onCompletedBlockAndBlobSidecars_shouldLogWarningWhenNotCompleted() { @Test public void onNewBlobSidecarOnNewBlock_addTrackerWithBothBlockAndBlobSidecar() { final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(currentSlot); - final BlobSidecar blobSidecar = dataStructureUtil.randomBlobSidecarsForBlock(block).get(0); + final BlobSidecar blobSidecar = dataStructureUtil.randomBlobSidecarsForBlock(block).getFirst(); blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar, RemoteOrigin.GOSSIP); blockBlobSidecarsTrackersPool.onNewBlock(block, Optional.empty()); @@ -362,7 +362,7 @@ public void onNewBlobSidecarOnNewBlock_addTrackerWithBothBlockAndBlobSidecar() { assertThat(blockBlobSidecarsTrackersPool.containsBlock(block.getRoot())).isTrue(); assertThat(requiredBlockRootEvents).isEmpty(); assertThat(requiredBlockRootDroppedEvents).isEmpty(); - assertThat(requiredBlobSidecarEvents).isEmpty(); + assertThat(requiredBlobSidecarsEvents).isEmpty(); assertThat(newBlobSidecarEvents).containsExactly(blobSidecar); assertBlobSidecarsCount(1); @@ -408,8 +408,8 @@ public void twoOnNewBlobSidecar_addTrackerWithBothBlobSidecars() { .isTrue(); assertThat(requiredBlockRootEvents).isEmpty(); assertThat(requiredBlockRootDroppedEvents).isEmpty(); - assertThat(requiredBlobSidecarEvents).isEmpty(); - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); + assertThat(requiredBlobSidecarsEvents).isEmpty(); + assertThat(requiredBlobSidecarsDroppedEvents).isEmpty(); assertThat(newBlobSidecarEvents).containsExactly(blobSidecar0, blobSidecar1); assertBlobSidecarsCount(2); @@ -431,8 +431,8 @@ public void twoOnNewBlock_addTrackerWithBothBlobSidecars() { assertThat(blockBlobSidecarsTrackersPool.containsBlock(block.getRoot())).isTrue(); assertThat(requiredBlockRootEvents).isEmpty(); assertThat(requiredBlockRootDroppedEvents).isEmpty(); - assertThat(requiredBlobSidecarEvents).isEmpty(); - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); + assertThat(requiredBlobSidecarsEvents).isEmpty(); + assertThat(requiredBlobSidecarsDroppedEvents).isEmpty(); assertThat(newBlobSidecarEvents).isEmpty(); assertBlobSidecarsCount(0); @@ -453,8 +453,8 @@ public void onCompletedBlockAndBlobSidecars_shouldCreateTrackerIgnoringHistorica assertThat(requiredBlockRootEvents).isEmpty(); assertThat(requiredBlockRootDroppedEvents).isEmpty(); - assertThat(requiredBlobSidecarEvents).isEmpty(); - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); + assertThat(requiredBlobSidecarsEvents).isEmpty(); + assertThat(requiredBlobSidecarsDroppedEvents).isEmpty(); assertThat(newBlobSidecarEvents).containsExactlyElementsOf(blobSidecars); final BlockBlobSidecarsTracker blockBlobSidecarsTracker = @@ -483,8 +483,8 @@ public void onCompletedBlockAndBlobSidecars_shouldNotTriggerFetch() { assertThat(requiredBlockRootEvents).isEmpty(); assertThat(requiredBlockRootDroppedEvents).isEmpty(); - assertThat(requiredBlobSidecarEvents).isEmpty(); - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); + assertThat(requiredBlobSidecarsEvents).isEmpty(); + assertThat(requiredBlobSidecarsDroppedEvents).isEmpty(); assertThat(newBlobSidecarEvents).containsExactlyElementsOf(blobSidecars); final BlockBlobSidecarsTracker blockBlobSidecarsTracker = @@ -528,8 +528,8 @@ public void shouldApplyIgnoreForBlock() { assertThat(blockBlobSidecarsTrackersPool.containsBlock(block.getRoot())).isFalse(); assertThat(requiredBlockRootEvents).isEmpty(); assertThat(requiredBlockRootDroppedEvents).isEmpty(); - assertThat(requiredBlobSidecarEvents).isEmpty(); - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); + assertThat(requiredBlobSidecarsEvents).isEmpty(); + assertThat(requiredBlobSidecarsDroppedEvents).isEmpty(); assertBlobSidecarsCount(0); assertBlobSidecarsTrackersCount(0); @@ -552,8 +552,8 @@ public void shouldApplyIgnoreForBlobSidecar() { .isFalse(); assertThat(requiredBlockRootEvents).isEmpty(); assertThat(requiredBlockRootDroppedEvents).isEmpty(); - assertThat(requiredBlobSidecarEvents).isEmpty(); - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); + assertThat(requiredBlobSidecarsEvents).isEmpty(); + assertThat(requiredBlobSidecarsDroppedEvents).isEmpty(); assertBlobSidecarsCount(0); assertBlobSidecarsTrackersCount(0); @@ -651,10 +651,10 @@ void shouldFetchMissingBlobSidecarsFromLocalELFirst() { .build()) .toList(); - final Set missingBlobIdentifiers = + final List missingBlobIdentifiers = UInt64.range(UInt64.ONE, UInt64.valueOf(4)) .map(index -> new BlobIdentifier(block.getRoot(), index)) - .collect(Collectors.toSet()); + .toList(); final List versionedHashes = IntStream.range(1, 4) @@ -670,8 +670,7 @@ void shouldFetchMissingBlobSidecarsFromLocalELFirst() { Optional.of( (slotAndRoot) -> { when(tracker.add(any())).thenReturn(true); - when(tracker.getMissingBlobSidecars()) - .thenAnswer(__ -> missingBlobIdentifiers.stream()); + when(tracker.getMissingBlobSidecars()).thenAnswer(__ -> missingBlobIdentifiers); when(tracker.getBlock()).thenReturn(Optional.of(block)); return tracker; }); @@ -690,8 +689,8 @@ void shouldFetchMissingBlobSidecarsFromLocalELFirst() { // no RPC requests, local el query is in flight assertThat(requiredBlockRootEvents).isEmpty(); assertThat(requiredBlockRootDroppedEvents).isEmpty(); - assertThat(requiredBlobSidecarEvents).isEmpty(); - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); + assertThat(requiredBlobSidecarsEvents).isEmpty(); + assertThat(requiredBlobSidecarsDroppedEvents).isEmpty(); // local el fetch triggered verify(tracker).setLocalElFetchTriggered(); @@ -725,8 +724,8 @@ void shouldFetchMissingBlobSidecarsFromLocalELFirst() { void shouldFetchMissingBlobSidecarsViaRPCAfterLocalEL() { final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(currentSlot); - final Set missingBlobs = - Set.of( + final List missingBlobs = + List.of( new BlobIdentifier(block.getRoot(), UInt64.ONE), new BlobIdentifier(block.getRoot(), UInt64.ZERO)); @@ -734,7 +733,7 @@ void shouldFetchMissingBlobSidecarsViaRPCAfterLocalEL() { Optional.of( (slotAndRoot) -> { BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getMissingBlobSidecars()).thenAnswer(__ -> missingBlobs.stream()); + when(tracker.getMissingBlobSidecars()).thenReturn(missingBlobs); when(tracker.getBlock()).thenReturn(Optional.of(block)); return tracker; }); @@ -753,16 +752,16 @@ void shouldFetchMissingBlobSidecarsViaRPCAfterLocalEL() { assertThat(requiredBlockRootEvents).isEmpty(); assertThat(requiredBlockRootDroppedEvents).isEmpty(); - assertThat(requiredBlobSidecarEvents).containsExactlyElementsOf(missingBlobs); - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); + assertThat(requiredBlobSidecarsEvents).containsExactlyElementsOf(missingBlobs); + assertThat(requiredBlobSidecarsDroppedEvents).isEmpty(); } @Test void shouldFetchMissingBlobSidecarsViaRPCWhenELLookupFails() { final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(currentSlot); - final Set missingBlobs = - Set.of( + final List missingBlobs = + List.of( new BlobIdentifier(block.getRoot(), UInt64.ONE), new BlobIdentifier(block.getRoot(), UInt64.ZERO)); @@ -770,7 +769,7 @@ void shouldFetchMissingBlobSidecarsViaRPCWhenELLookupFails() { Optional.of( (slotAndRoot) -> { BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getMissingBlobSidecars()).thenAnswer(__ -> missingBlobs.stream()); + when(tracker.getMissingBlobSidecars()).thenReturn(missingBlobs); when(tracker.getBlock()).thenReturn(Optional.of(block)); return tracker; }); @@ -789,8 +788,8 @@ void shouldFetchMissingBlobSidecarsViaRPCWhenELLookupFails() { assertThat(requiredBlockRootEvents).isEmpty(); assertThat(requiredBlockRootDroppedEvents).isEmpty(); - assertThat(requiredBlobSidecarEvents).containsExactlyElementsOf(missingBlobs); - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); + assertThat(requiredBlobSidecarsEvents).containsExactlyElementsOf(missingBlobs); + assertThat(requiredBlobSidecarsDroppedEvents).isEmpty(); } @Test @@ -803,14 +802,14 @@ void shouldFetchMissingBlockAndBlobSidecars() { .index(UInt64.valueOf(2)) .build(); - final Set missingBlobs = - Set.of( + final List missingBlobs = + List.of( new BlobIdentifier(block.getRoot(), UInt64.ONE), new BlobIdentifier(block.getRoot(), UInt64.ZERO)); final BlockBlobSidecarsTracker mockedTracker = mock(BlockBlobSidecarsTracker.class); when(mockedTracker.getBlock()).thenReturn(Optional.empty()); - when(mockedTracker.getMissingBlobSidecars()).thenReturn(missingBlobs.stream()); + when(mockedTracker.getMissingBlobSidecars()).thenReturn(missingBlobs); when(mockedTracker.getSlotAndBlockRoot()).thenReturn(block.getSlotAndBlockRoot()); mockedTrackersFactory = Optional.of((__) -> mockedTracker); @@ -825,87 +824,13 @@ void shouldFetchMissingBlockAndBlobSidecars() { verify(mockedTracker, never()).setLocalElFetchTriggered(); assertThat(requiredBlockRootEvents).containsExactly(block.getRoot()); - assertThat(requiredBlobSidecarEvents).containsExactlyElementsOf(missingBlobs); + assertThat(requiredBlobSidecarsEvents).containsExactlyElementsOf(missingBlobs); assertStats("block", "rpc_fetch", 1); assertStats("blob_sidecar", "rpc_fetch", missingBlobs.size()); assertThat(requiredBlockRootDroppedEvents).isEmpty(); - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); - } - - @Test - void shouldDropBlobSidecarsThatHasBeenFetchedButNotPresentInBlock() { - final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(currentSlot); - - final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, block.getRoot()); - final BlobSidecar blobSidecar = - dataStructureUtil - .createRandomBlobSidecarBuilder() - .signedBeaconBlockHeader(block.asHeader()) - .index(UInt64.valueOf(2)) - .build(); - - final Set blobsNotPresentInBlock = - Set.of( - new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), UInt64.valueOf(2)), - new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), UInt64.valueOf(3))); - - mockedTrackersFactory = - Optional.of( - (slotAndRoot) -> { - BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getBlock()).thenReturn(Optional.empty()); - when(tracker.getSlotAndBlockRoot()).thenReturn(slotAndBlockRoot); - when(tracker.setBlock(block)).thenReturn(true); - when(tracker.isRpcFetchTriggered()).thenReturn(true); - when(tracker.getUnusedBlobSidecarsForBlock()) - .thenReturn(blobsNotPresentInBlock.stream()); - return tracker; - }); - - blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar, RemoteOrigin.GOSSIP); - - blockBlobSidecarsTrackersPool.onNewBlock(block, Optional.empty()); - - assertThat(requiredBlobSidecarDroppedEvents).containsExactlyElementsOf(blobsNotPresentInBlock); - } - - @Test - void shouldNotDropUnusedBlobSidecarsIfFetchingHasNotOccurred() { - final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(currentSlot); - - final SlotAndBlockRoot slotAndBlockRoot = new SlotAndBlockRoot(currentSlot, block.getRoot()); - final BlobSidecar blobSidecar = - dataStructureUtil - .createRandomBlobSidecarBuilder() - .signedBeaconBlockHeader(block.asHeader()) - .index(UInt64.valueOf(2)) - .build(); - - final Set blobsNotUserInBlock = - Set.of( - new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), UInt64.valueOf(2)), - new BlobIdentifier(slotAndBlockRoot.getBlockRoot(), UInt64.valueOf(3))); - - mockedTrackersFactory = - Optional.of( - (slotAndRoot) -> { - BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getBlock()).thenReturn(Optional.empty()); - when(tracker.getSlotAndBlockRoot()).thenReturn(slotAndBlockRoot); - when(tracker.setBlock(block)).thenReturn(true); - when(tracker.isRpcFetchTriggered()).thenReturn(false); - when(tracker.getUnusedBlobSidecarsForBlock()) - .thenReturn(blobsNotUserInBlock.stream()); - return tracker; - }); - - blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar, RemoteOrigin.GOSSIP); - - blockBlobSidecarsTrackersPool.onNewBlock(block, Optional.empty()); - - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); + assertThat(requiredBlobSidecarsDroppedEvents).isEmpty(); } @Test @@ -936,8 +861,8 @@ public void shouldFetchContentWhenBlobSidecarIsNotForCurrentSlot() { void shouldDropPossiblyFetchedBlobSidecars() { final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(currentSlot); - final Set missingBlobs = - Set.of( + final List missingBlobs = + List.of( new BlobIdentifier(block.getRoot(), UInt64.ONE), new BlobIdentifier(block.getRoot(), UInt64.ZERO)); @@ -945,7 +870,7 @@ void shouldDropPossiblyFetchedBlobSidecars() { Optional.of( (slotAndRoot) -> { BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getMissingBlobSidecars()).thenReturn(missingBlobs.stream()); + when(tracker.getMissingBlobSidecars()).thenReturn(missingBlobs); when(tracker.getBlock()).thenReturn(Optional.of(block)); when(tracker.getSlotAndBlockRoot()).thenReturn(block.getSlotAndBlockRoot()); when(tracker.isRpcFetchTriggered()).thenReturn(true); @@ -958,20 +883,20 @@ void shouldDropPossiblyFetchedBlobSidecars() { blockBlobSidecarsTrackersPool.removeAllForBlock(block.getRoot()); - assertThat(requiredBlobSidecarDroppedEvents).containsExactlyElementsOf(missingBlobs); + assertThat(requiredBlobSidecarsDroppedEvents).containsExactly(block.getRoot()); // subsequent fetch will not try to fetch anything asyncRunner.executeQueuedActions(); - assertThat(requiredBlobSidecarEvents).isEmpty(); + assertThat(requiredBlobSidecarsEvents).isEmpty(); } @Test void shouldTryToFetchFromLocalELWhenBlockArrivesAfterRPCFetch() { final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(currentSlot); - final Set missingBlobs = - Set.of( + final List missingBlobs = + List.of( new BlobIdentifier(block.getRoot(), UInt64.ONE), new BlobIdentifier(block.getRoot(), UInt64.ZERO)); @@ -987,7 +912,7 @@ void shouldTryToFetchFromLocalELWhenBlockArrivesAfterRPCFetch() { mockedTrackersFactory = Optional.of( (slotAndRoot) -> { - when(tracker.getMissingBlobSidecars()).thenAnswer(__ -> missingBlobs.stream()); + when(tracker.getMissingBlobSidecars()).thenReturn(missingBlobs); when(tracker.getBlock()).thenReturn(Optional.empty()); when(tracker.setBlock(any())).thenReturn(true); when(tracker.getSlotAndBlockRoot()).thenReturn(block.getSlotAndBlockRoot()); @@ -1078,7 +1003,7 @@ void shouldNotDropPossiblyFetchedBlockIfFetchHasNotOccurred() { blockBlobSidecarsTrackersPool.removeAllForBlock(signedBeaconBlock.getRoot()); assertThat(requiredBlockRootDroppedEvents).isEmpty(); - assertThat(requiredBlobSidecarDroppedEvents).isEmpty(); + assertThat(requiredBlobSidecarsDroppedEvents).isEmpty(); } @Test @@ -1179,8 +1104,8 @@ void calculateFetchDelay_shouldReturnZeroIfSlotIsOld() { void getAllRequiredBlobSidecars_shouldReturnAllRequiredBlobSidecars() { final SignedBeaconBlock block1 = dataStructureUtil.randomSignedBeaconBlock(currentSlot); - final Set missingBlobs1 = - Set.of( + final List missingBlobs1 = + List.of( new BlobIdentifier(block1.getRoot(), UInt64.ONE), new BlobIdentifier(block1.getRoot(), UInt64.ZERO)); @@ -1188,7 +1113,7 @@ void getAllRequiredBlobSidecars_shouldReturnAllRequiredBlobSidecars() { Optional.of( (slotAndRoot) -> { BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getMissingBlobSidecars()).thenReturn(missingBlobs1.stream()); + when(tracker.getMissingBlobSidecars()).thenReturn(missingBlobs1); when(tracker.getBlock()).thenReturn(Optional.of(block1)); return tracker; }); @@ -1197,8 +1122,8 @@ void getAllRequiredBlobSidecars_shouldReturnAllRequiredBlobSidecars() { final SignedBeaconBlock block2 = dataStructureUtil.randomSignedBeaconBlock(currentSlot); - final Set missingBlobs2 = - Set.of( + final List missingBlobs2 = + List.of( new BlobIdentifier(block2.getRoot(), UInt64.ONE), new BlobIdentifier(block2.getRoot(), UInt64.valueOf(2))); @@ -1206,18 +1131,18 @@ void getAllRequiredBlobSidecars_shouldReturnAllRequiredBlobSidecars() { Optional.of( (slotAndRoot) -> { BlockBlobSidecarsTracker tracker = mock(BlockBlobSidecarsTracker.class); - when(tracker.getMissingBlobSidecars()).thenReturn(missingBlobs2.stream()); + when(tracker.getMissingBlobSidecars()).thenReturn(missingBlobs2); when(tracker.getBlock()).thenReturn(Optional.of(block2)); return tracker; }); blockBlobSidecarsTrackersPool.onNewBlock(block2, Optional.empty()); - final Set allMissing = - Stream.concat(missingBlobs1.stream(), missingBlobs2.stream()).collect(Collectors.toSet()); + final Map> allMissing = + Map.of(block1.getRoot(), missingBlobs1, block2.getRoot(), missingBlobs2); assertThat(blockBlobSidecarsTrackersPool.getAllRequiredBlobSidecars()) - .containsExactlyElementsOf(allMissing); + .containsExactlyInAnyOrderEntriesOf(allMissing); } @Test diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index 90508b0303b..e12bad7582c 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -102,7 +102,6 @@ import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.capella.BeaconBlockBodySchemaCapella; import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayloadHeader; import tech.pegasys.teku.spec.datastructures.interop.GenesisStateBuilder; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing; import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing; import tech.pegasys.teku.spec.datastructures.operations.SignedBlsToExecutionChange; @@ -355,7 +354,7 @@ protected SafeFuture doStart() { protected void startServices() { final RecentBlocksFetcher recentBlocksFetcher = syncService.getRecentBlocksFetcher(); recentBlocksFetcher.subscribeBlockFetched( - (block) -> + block -> blockManager .importBlock(block, RemoteOrigin.RPC) .thenCompose(BlockImportAndBroadcastValidationResults::blockImportResult) @@ -364,11 +363,7 @@ protected void startServices() { final RecentBlobSidecarsFetcher recentBlobSidecarsFetcher = syncService.getRecentBlobSidecarsFetcher(); recentBlobSidecarsFetcher.subscribeBlobSidecarFetched( - (blobSidecar) -> blobSidecarManager.prepareForBlockImport(blobSidecar, RemoteOrigin.RPC)); - blobSidecarManager.subscribeToReceivedBlobSidecar( - blobSidecar -> - recentBlobSidecarsFetcher.cancelRecentBlobSidecarRequest( - new BlobIdentifier(blobSidecar.getBlockRoot(), blobSidecar.getIndex()))); + blobSidecar -> blobSidecarManager.prepareForBlockImport(blobSidecar, RemoteOrigin.RPC)); final Optional network = beaconConfig.eth2NetworkConfig().getEth2Network(); if (network.isPresent() && network.get() == Eth2Network.EPHEMERY) {