From e9979e1c66c72f9fe809a8813e13f872b09548df Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Wed, 16 Oct 2024 14:49:10 +0200 Subject: [PATCH] allow block and blobs publishing to give feedback --- .../teku/beacon/sync/SyncingNodeManager.java | 7 ++-- .../publisher/BlockPublisherDeneb.java | 4 +- .../publisher/BlockPublisherPhase0.java | 2 +- .../BlockBlobSidecarsTrackersPoolImpl.java | 9 ++-- .../statetransition/util/PoolFactory.java | 9 ++-- ...BlockBlobSidecarsTrackersPoolImplTest.java | 9 ++-- ...AttesterSlashingGossipIntegrationTest.java | 6 ++- .../eth2/gossip/AbstractGossipManager.java | 7 +++- .../eth2/gossip/BlobSidecarGossipChannel.java | 13 +++--- .../eth2/gossip/BlobSidecarGossipManager.java | 7 ++-- .../eth2/gossip/BlockGossipChannel.java | 1 - .../eth2/gossip/BlockGossipManager.java | 5 ++- .../eth2/gossip/forks/GossipForkManager.java | 28 +++++++------ .../gossip/forks/GossipForkSubscriptions.java | 7 ++-- .../GossipForkSubscriptionsDeneb.java | 5 ++- .../GossipForkSubscriptionsPhase0.java | 5 ++- .../gossip/AggregateGossipManagerTest.java | 3 ++ .../gossip/BlobSidecarGossipManagerTest.java | 12 +++--- .../eth2/gossip/BlockGossipManagerTest.java | 3 +- .../gossip/forks/GossipForkManagerTest.java | 7 ++-- .../teku/networking/eth2/NodeManager.java | 15 ++++--- .../networking/p2p/gossip/TopicChannel.java | 3 +- .../p2p/libp2p/gossip/GossipHandler.java | 21 ++++++---- .../p2p/libp2p/gossip/LibP2PTopicChannel.java | 7 ++-- .../networking/p2p/mock/MockTopicChannel.java | 5 ++- .../p2p/libp2p/gossip/GossipHandlerTest.java | 42 ++++++++++++++++--- .../beaconchain/BeaconChainController.java | 7 ++-- 27 files changed, 157 insertions(+), 92 deletions(-) diff --git a/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java b/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java index 35f0f6114f6..b66f60cd0ff 100644 --- a/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java +++ b/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java @@ -84,6 +84,7 @@ public class SyncingNodeManager { private final BlockGossipChannel blockGossipChannel; private SyncingNodeManager( + final AsyncRunner asyncRunner, final EventChannels eventChannels, final RecentChainData recentChainData, final BeaconChainUtil chainUtil, @@ -94,7 +95,7 @@ private SyncingNodeManager( this.chainUtil = chainUtil; this.eth2P2PNetwork = eth2P2PNetwork; this.syncService = syncService; - this.blockGossipChannel = eventChannels.getPublisher(BlockGossipChannel.class); + this.blockGossipChannel = eventChannels.getPublisher(BlockGossipChannel.class, asyncRunner); } @SuppressWarnings("FutureReturnValueIgnored") @@ -215,7 +216,7 @@ public static SyncingNodeManager create( syncService.start().join(); return new SyncingNodeManager( - eventChannels, recentChainData, chainUtil, eth2P2PNetwork, syncService); + asyncRunner, eventChannels, recentChainData, chainUtil, eth2P2PNetwork, syncService); } public SafeFuture connect(final SyncingNodeManager peer) { @@ -250,6 +251,6 @@ public void setSlot(final UInt64 slot) { } public void gossipBlock(final SignedBeaconBlock block) { - blockGossipChannel.publishBlock(block); + blockGossipChannel.publishBlock(block).ifExceptionGetsHereRaiseABug(); } } diff --git a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherDeneb.java b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherDeneb.java index d27ea756e37..bfe8329ed6e 100644 --- a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherDeneb.java +++ b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherDeneb.java @@ -64,8 +64,8 @@ void publishBlockAndBlobSidecars( final SignedBeaconBlock block, final List blobSidecars, final BlockPublishingPerformance blockPublishingPerformance) { - blockGossipChannel.publishBlock(block); - blobSidecarGossipChannel.publishBlobSidecars(blobSidecars); + blockGossipChannel.publishBlock(block).ifExceptionGetsHereRaiseABug(); + blobSidecarGossipChannel.publishBlobSidecars(blobSidecars).ifExceptionGetsHereRaiseABug(); blockPublishingPerformance.blockAndBlobSidecarsPublishingInitiated(); } } diff --git a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherPhase0.java b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherPhase0.java index e2b1ed61954..82da3f302e2 100644 --- a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherPhase0.java +++ b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/BlockPublisherPhase0.java @@ -53,7 +53,7 @@ void publishBlockAndBlobSidecars( final SignedBeaconBlock block, final List blobSidecars, final BlockPublishingPerformance blockPublishingPerformance) { - blockGossipChannel.publishBlock(block); + blockGossipChannel.publishBlock(block).ifExceptionGetsHereRaiseABug(); blockPublishingPerformance.blockPublishingInitiated(); } } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java index 8b90393da57..95582777962 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImpl.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -101,7 +102,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis private final RecentChainData recentChainData; private final ExecutionLayerChannel executionLayer; private final Supplier gossipValidatorSupplier; - private final Consumer blobSidecarGossipPublisher; + private final Function> blobSidecarGossipPublisher; private final int maxTrackers; private final BlockBlobSidecarsTrackerFactory trackerFactory; @@ -133,7 +134,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis final RecentChainData recentChainData, final ExecutionLayerChannel executionLayer, final Supplier gossipValidatorSupplier, - final Consumer blobSidecarGossipPublisher, + final Function> blobSidecarGossipPublisher, final UInt64 historicalSlotTolerance, final UInt64 futureSlotTolerance, final int maxTrackers) { @@ -165,7 +166,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis final RecentChainData recentChainData, final ExecutionLayerChannel executionLayer, final Supplier gossipValidatorSupplier, - final Consumer blobSidecarGossipPublisher, + final Function> blobSidecarGossipPublisher, final UInt64 historicalSlotTolerance, final UInt64 futureSlotTolerance, final int maxTrackers, @@ -246,7 +247,7 @@ public synchronized void onNewBlobSidecar( private void publishRecoveredBlobSidecar(final BlobSidecar blobSidecar) { LOG.debug("Publishing recovered blob sidecar {}", blobSidecar::toLogString); gossipValidatorSupplier.get().markForEquivocation(blobSidecar); - blobSidecarGossipPublisher.accept(blobSidecar); + blobSidecarGossipPublisher.apply(blobSidecar).ifExceptionGetsHereRaiseABug(); } private void countBlobSidecar(final RemoteOrigin origin) { diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/PoolFactory.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/PoolFactory.java index b97236c4c39..bd047c0c84f 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/PoolFactory.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/util/PoolFactory.java @@ -15,12 +15,13 @@ import com.google.common.annotations.VisibleForTesting; import java.util.Collections; -import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.metrics.Counter; import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; import tech.pegasys.teku.infrastructure.async.AsyncRunner; +import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge; import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; import tech.pegasys.teku.infrastructure.time.TimeProvider; @@ -120,7 +121,7 @@ public BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers( final RecentChainData recentChainData, final ExecutionLayerChannel executionLayer, final Supplier gossipValidatorSupplier, - final Consumer blobSidecarGossipPublisher) { + final Function> blobSidecarGossipPublisher) { return createPoolForBlockBlobSidecarsTrackers( blockImportChannel, spec, @@ -143,7 +144,7 @@ public BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers( final RecentChainData recentChainData, final ExecutionLayerChannel executionLayer, final Supplier gossipValidatorSupplier, - final Consumer blobSidecarGossipPublisher, + final Function> blobSidecarGossipPublisher, final UInt64 historicalBlockTolerance, final UInt64 futureBlockTolerance, final int maxTrackers) { @@ -172,7 +173,7 @@ BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers( final RecentChainData recentChainData, final ExecutionLayerChannel executionLayer, final Supplier gossipValidatorSupplier, - final Consumer blobSidecarGossipPublisher, + final Function> blobSidecarGossipPublisher, final UInt64 historicalBlockTolerance, final UInt64 futureBlockTolerance, final int maxItems, diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java index c5dadf551c2..5a5e47af29d 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/util/BlockBlobSidecarsTrackersPoolImplTest.java @@ -33,7 +33,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -81,7 +80,7 @@ public class BlockBlobSidecarsTrackersPoolImplTest { private final ExecutionLayerChannel executionLayer = mock(ExecutionLayerChannel.class); @SuppressWarnings("unchecked") - private final Consumer blobSidecarPublisher = mock(Consumer.class); + private final Function> blobSidecarPublisher = mock(Function.class); private final BlobSidecarGossipValidator blobSidecarGossipValidator = mock(BlobSidecarGossipValidator.class); @@ -235,7 +234,7 @@ public void onNewBlobSidecar_shouldMarkForEquivocationAndPublishWhenOriginIsLoca assertBlobSidecarsTrackersCount(3); verify(blobSidecarGossipValidator).markForEquivocation(blobSidecar1); - verify(blobSidecarPublisher, times(1)).accept(blobSidecar1); + verify(blobSidecarPublisher, times(1)).apply(blobSidecar1); } @Test @@ -254,7 +253,7 @@ public void onNewBlobSidecar_shouldPublishWhenOriginIsLocalELAndEquivocating() { assertBlobSidecarsTrackersCount(1); verify(blobSidecarGossipValidator).markForEquivocation(blobSidecar1); - verify(blobSidecarPublisher, times(1)).accept(blobSidecar1); + verify(blobSidecarPublisher, times(1)).apply(blobSidecar1); } @Test @@ -274,7 +273,7 @@ public void onNewBlobSidecar_shouldNotPublishWhenOriginIsLocalELIsNotCurrentSlot assertBlobSidecarsTrackersCount(1); verify(blobSidecarGossipValidator, never()).markForEquivocation(blobSidecar1); - verify(blobSidecarPublisher, never()).accept(blobSidecar1); + verify(blobSidecarPublisher, never()).apply(blobSidecar1); } @Test diff --git a/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/AttesterSlashingGossipIntegrationTest.java b/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/AttesterSlashingGossipIntegrationTest.java index b4c1d5bf835..5e5ac9e59a7 100644 --- a/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/AttesterSlashingGossipIntegrationTest.java +++ b/networking/eth2/src/integration-test/java/tech/pegasys/teku/networking/eth2/AttesterSlashingGossipIntegrationTest.java @@ -24,6 +24,8 @@ import org.junit.jupiter.api.Test; import tech.pegasys.teku.bls.BLSKeyGenerator; import tech.pegasys.teku.bls.BLSKeyPair; +import tech.pegasys.teku.infrastructure.async.AsyncRunner; +import tech.pegasys.teku.infrastructure.async.DelayedExecutorAsyncRunner; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.Waiter; import tech.pegasys.teku.networking.eth2.Eth2P2PNetworkFactory.Eth2P2PNetworkBuilder; @@ -35,7 +37,7 @@ import tech.pegasys.teku.statetransition.validation.InternalValidationResult; public class AttesterSlashingGossipIntegrationTest { - + private final AsyncRunner asyncRunner = DelayedExecutorAsyncRunner.create(); private final List validatorKeys = BLSKeyGenerator.generateKeyPairs(3); private final Eth2P2PNetworkFactory networkFactory = new Eth2P2PNetworkFactory(); private final DataStructureUtil dataStructureUtil = @@ -91,6 +93,6 @@ public void shouldGossipToPeers() throws Exception { private NodeManager createNodeManager(final Consumer networkBuilder) throws Exception { - return NodeManager.create(networkFactory, validatorKeys, networkBuilder); + return NodeManager.create(asyncRunner, networkFactory, validatorKeys, networkBuilder); } } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/AbstractGossipManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/AbstractGossipManager.java index 555eddc03c4..281d95ebf4d 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/AbstractGossipManager.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/AbstractGossipManager.java @@ -17,6 +17,7 @@ import java.util.Optional; import java.util.function.Function; import tech.pegasys.teku.infrastructure.async.AsyncRunner; +import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.ssz.SszData; import tech.pegasys.teku.infrastructure.ssz.schema.SszSchema; import tech.pegasys.teku.infrastructure.unsigned.UInt64; @@ -76,7 +77,11 @@ public Eth2TopicHandler getTopicHandler() { } protected void publishMessage(final T message) { - channel.ifPresent(c -> c.gossip(gossipEncoding.encode(message))); + channel.ifPresent(c -> c.gossip(gossipEncoding.encode(message)).finish(__ -> {})); + } + + protected SafeFuture publishMessageWithFeedback(final T message) { + return channel.map(c -> c.gossip(gossipEncoding.encode(message))).orElse(SafeFuture.COMPLETE); } @Override diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipChannel.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipChannel.java index 40b3b63181f..596b63829bd 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipChannel.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipChannel.java @@ -14,16 +14,17 @@ package tech.pegasys.teku.networking.eth2.gossip; import java.util.List; -import tech.pegasys.teku.infrastructure.events.VoidReturningChannelInterface; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.events.ChannelInterface; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; -public interface BlobSidecarGossipChannel extends VoidReturningChannelInterface { +public interface BlobSidecarGossipChannel extends ChannelInterface { - BlobSidecarGossipChannel NOOP = blobSidecar -> {}; + BlobSidecarGossipChannel NOOP = blobSidecar -> SafeFuture.COMPLETE; - default void publishBlobSidecars(final List blobSidecars) { - blobSidecars.forEach(this::publishBlobSidecar); + default SafeFuture publishBlobSidecars(final List blobSidecars) { + return SafeFuture.allOf(blobSidecars.stream().map(this::publishBlobSidecar)); } - void publishBlobSidecar(BlobSidecar blobSidecar); + SafeFuture publishBlobSidecar(BlobSidecar blobSidecar); } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipManager.java index 85fb9631084..7c0c0f41a99 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipManager.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipManager.java @@ -95,10 +95,11 @@ private BlobSidecarGossipManager( this.subnetIdToTopicHandler = subnetIdToTopicHandler; } - public void publishBlobSidecar(final BlobSidecar message) { + public SafeFuture publishBlobSidecar(final BlobSidecar message) { final int subnetId = spec.computeSubnetForBlobSidecar(message).intValue(); - Optional.ofNullable(subnetIdToChannel.get(subnetId)) - .ifPresent(channel -> channel.gossip(gossipEncoding.encode(message))); + return Optional.ofNullable(subnetIdToChannel.get(subnetId)) + .map(channel -> channel.gossip(gossipEncoding.encode(message))) + .orElse(SafeFuture.COMPLETE); } @VisibleForTesting diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlockGossipChannel.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlockGossipChannel.java index 3e746aab012..7bfa362ff2b 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlockGossipChannel.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlockGossipChannel.java @@ -15,7 +15,6 @@ import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.events.ChannelInterface; -import tech.pegasys.teku.infrastructure.events.VoidReturningChannelInterface; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; public interface BlockGossipChannel extends ChannelInterface { diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlockGossipManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlockGossipManager.java index c078e76fdd2..0974c754579 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlockGossipManager.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlockGossipManager.java @@ -14,6 +14,7 @@ package tech.pegasys.teku.networking.eth2.gossip; import tech.pegasys.teku.infrastructure.async.AsyncRunner; +import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding; import tech.pegasys.teku.networking.eth2.gossip.topics.GossipTopicName; import tech.pegasys.teku.networking.eth2.gossip.topics.OperationProcessor; @@ -51,8 +52,8 @@ public BlockGossipManager( debugDataDumper); } - public void publishBlock(final SignedBeaconBlock message) { - publishMessage(message); + public SafeFuture publishBlock(final SignedBeaconBlock message) { + return publishMessageWithFeedback(message); } @Override diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java index 77a98b4a3b1..3309740fb05 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java @@ -165,12 +165,13 @@ public synchronized void publishAttestation(final ValidatableAttestation attesta GossipForkSubscriptions::publishAttestation); } - public synchronized void publishBlock(final SignedBeaconBlock block) { - publishMessage(block.getSlot(), block, "block", GossipForkSubscriptions::publishBlock); + public synchronized SafeFuture publishBlock(final SignedBeaconBlock block) { + return publishMessageWithFeedback( + block.getSlot(), block, "block", GossipForkSubscriptions::publishBlock); } - public synchronized void publishBlobSidecar(final BlobSidecar blobSidecar) { - publishMessage( + public synchronized SafeFuture publishBlobSidecar(final BlobSidecar blobSidecar) { + return publishMessageWithFeedback( blobSidecar.getSlot(), blobSidecar, "blob sidecar", @@ -249,15 +250,16 @@ private void publishMessage( } private SafeFuture publishMessageWithFeedback( - final UInt64 slot, - final T message, - final String type, - final BiFunction> publisher) { - final Optional gossipForkSubscriptions = getSubscriptionActiveAtSlot(slot) - .filter(this::isActive); - - if(gossipForkSubscriptions.isEmpty()) { - LOG.warn("Not publishing {} because no gossip subscriptions are active for slot {}", type, slot); + final UInt64 slot, + final T message, + final String type, + final BiFunction> publisher) { + final Optional gossipForkSubscriptions = + getSubscriptionActiveAtSlot(slot).filter(this::isActive); + + if (gossipForkSubscriptions.isEmpty()) { + LOG.warn( + "Not publishing {} because no gossip subscriptions are active for slot {}", type, slot); return SafeFuture.COMPLETE; } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkSubscriptions.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkSubscriptions.java index e8fd1110a95..f83a96cdd1d 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkSubscriptions.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkSubscriptions.java @@ -14,6 +14,7 @@ package tech.pegasys.teku.networking.eth2.gossip.forks; import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; @@ -37,10 +38,10 @@ public interface GossipForkSubscriptions { void publishAttestation(ValidatableAttestation attestation); - void publishBlock(SignedBeaconBlock block); + SafeFuture publishBlock(SignedBeaconBlock block); - default void publishBlobSidecar(final BlobSidecar blobSidecar) { - // since Deneb + default SafeFuture publishBlobSidecar(final BlobSidecar blobSidecar) { + return SafeFuture.COMPLETE; } void subscribeToAttestationSubnetId(int subnetId); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsDeneb.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsDeneb.java index 87d5d6e68ca..7c91003db04 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsDeneb.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsDeneb.java @@ -15,6 +15,7 @@ import org.hyperledger.besu.plugin.services.MetricsSystem; import tech.pegasys.teku.infrastructure.async.AsyncRunner; +import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.networking.eth2.gossip.BlobSidecarGossipManager; import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding; import tech.pegasys.teku.networking.eth2.gossip.topics.OperationProcessor; @@ -104,7 +105,7 @@ void addBlobSidecarGossipManager(final ForkInfo forkInfo) { } @Override - public void publishBlobSidecar(final BlobSidecar blobSidecar) { - blobSidecarGossipManager.publishBlobSidecar(blobSidecar); + public SafeFuture publishBlobSidecar(final BlobSidecar blobSidecar) { + return blobSidecarGossipManager.publishBlobSidecar(blobSidecar); } } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsPhase0.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsPhase0.java index 766775e413d..85748cde963 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsPhase0.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsPhase0.java @@ -18,6 +18,7 @@ import org.apache.tuweni.bytes.Bytes32; import org.hyperledger.besu.plugin.services.MetricsSystem; import tech.pegasys.teku.infrastructure.async.AsyncRunner; +import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.gossip.AggregateGossipManager; import tech.pegasys.teku.networking.eth2.gossip.AttestationGossipManager; @@ -235,8 +236,8 @@ public void publishAttestation(final ValidatableAttestation attestation) { } @Override - public void publishBlock(final SignedBeaconBlock block) { - blockGossipManager.publishBlock(block); + public SafeFuture publishBlock(final SignedBeaconBlock block) { + return blockGossipManager.publishBlock(block); } @Override diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/AggregateGossipManagerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/AggregateGossipManagerTest.java index 3c4c10a3529..81c169cf65a 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/AggregateGossipManagerTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/AggregateGossipManagerTest.java @@ -18,10 +18,12 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import org.apache.tuweni.bytes.Bytes; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.StubAsyncRunner; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding; @@ -60,6 +62,7 @@ public class AggregateGossipManagerTest { @BeforeEach public void setup() { storageSystem.chainUpdater().initializeGenesis(); + when(topicChannel.gossip(any())).thenReturn(SafeFuture.COMPLETE); doReturn(topicChannel) .when(gossipNetwork) .subscribe(contains(GossipTopicName.BEACON_AGGREGATE_AND_PROOF.toString()), any()); diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipManagerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipManagerTest.java index fe2d5391d70..3c42b72abe5 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipManagerTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/BlobSidecarGossipManagerTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.safeJoin; import java.util.HashMap; import java.util.Map; @@ -31,7 +32,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import tech.pegasys.teku.infrastructure.async.SafeFuture; -import tech.pegasys.teku.infrastructure.async.SafeFutureAssert; import tech.pegasys.teku.infrastructure.async.StubAsyncRunner; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding; @@ -115,7 +115,7 @@ public void testGossipingBlobSidecarPublishesToCorrectSubnet() { dataStructureUtil.createRandomBlobSidecarBuilder().index(UInt64.ONE).build(); final Bytes serialized = gossipEncoding.encode(blobSidecar); - blobSidecarGossipManager.publishBlobSidecar(blobSidecar); + safeJoin(blobSidecarGossipManager.publishBlobSidecar(blobSidecar)); topicChannels.forEach( (subnetId, channel) -> { @@ -133,7 +133,7 @@ public void testGossipingBlobSidecarWithLargeIndexGossipToCorrectSubnet() { dataStructureUtil.createRandomBlobSidecarBuilder().index(UInt64.valueOf(10)).build(); final Bytes serialized = gossipEncoding.encode(blobSidecar); - blobSidecarGossipManager.publishBlobSidecar(blobSidecar); + safeJoin(blobSidecarGossipManager.publishBlobSidecar(blobSidecar)); final SpecConfig config = spec.forMilestone(SpecMilestone.DENEB).getConfig(); final SpecConfigDeneb specConfigDeneb = SpecConfigDeneb.required(config); @@ -170,8 +170,7 @@ public void testAcceptingSidecarGossipIfOnTheCorrectTopic() { System.out.println(blobSidecar); final InternalValidationResult validationResult = - SafeFutureAssert.safeJoin( - topicHandler.getProcessor().process(blobSidecar, Optional.empty())); + safeJoin(topicHandler.getProcessor().process(blobSidecar, Optional.empty())); assertThat(validationResult).isEqualTo(InternalValidationResult.ACCEPT); } @@ -185,8 +184,7 @@ public void testRejectingSidecarGossipIfNotOnTheCorrectTopic() { final BlobSidecar blobSidecar = dataStructureUtil.createRandomBlobSidecarBuilder().index(UInt64.valueOf(2)).build(); final InternalValidationResult validationResult = - SafeFutureAssert.safeJoin( - topicHandler.getProcessor().process(blobSidecar, Optional.empty())); + safeJoin(topicHandler.getProcessor().process(blobSidecar, Optional.empty())); assertThat(validationResult.isReject()).isTrue(); assertThat(validationResult.getDescription()) diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/BlockGossipManagerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/BlockGossipManagerTest.java index 0e9d3745400..bbcc2bebb0a 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/BlockGossipManagerTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/BlockGossipManagerTest.java @@ -18,6 +18,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.safeJoin; import org.apache.tuweni.bytes.Bytes; import org.junit.jupiter.api.BeforeEach; @@ -79,7 +80,7 @@ public void onBlockProposed() { // Should gossip new blocks received from event bus SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(1); Bytes serialized = gossipEncoding.encode(block); - blockGossipManager.publishBlock(block); + safeJoin(blockGossipManager.publishBlock(block)); verify(topicChannel).gossip(serialized); } diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManagerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManagerTest.java index 06a7cd3c39b..50193f5a400 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManagerTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManagerTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.safeJoin; import java.util.Optional; import java.util.function.BiConsumer; @@ -285,17 +286,17 @@ void shouldPublishBlockToForkForBlockSlot() { final SignedBeaconBlock thirdForkBlock = dataStructureUtil.randomSignedBeaconBlock(spec.computeStartSlotAtEpoch(UInt64.valueOf(2))); - manager.publishBlock(firstForkBlock); + safeJoin(manager.publishBlock(firstForkBlock)); verify(firstFork).publishBlock(firstForkBlock); verify(secondFork, never()).publishBlock(firstForkBlock); verify(thirdFork, never()).publishBlock(firstForkBlock); - manager.publishBlock(secondForkBlock); + safeJoin(manager.publishBlock(secondForkBlock)); verify(firstFork, never()).publishBlock(secondForkBlock); verify(secondFork).publishBlock(secondForkBlock); verify(thirdFork, never()).publishBlock(secondForkBlock); - manager.publishBlock(thirdForkBlock); + safeJoin(manager.publishBlock(thirdForkBlock)); verify(firstFork, never()).publishBlock(thirdForkBlock); verify(secondFork, never()).publishBlock(thirdForkBlock); verify(thirdFork).publishBlock(thirdForkBlock); diff --git a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/NodeManager.java b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/NodeManager.java index 44029d7e007..aa3599941de 100644 --- a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/NodeManager.java +++ b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/NodeManager.java @@ -17,6 +17,7 @@ import java.util.function.Consumer; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import tech.pegasys.teku.bls.BLSKeyPair; +import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.events.ChannelExceptionHandler; import tech.pegasys.teku.infrastructure.events.EventChannels; @@ -52,23 +53,26 @@ private NodeManager( public static NodeManager create( final Spec spec, + final AsyncRunner asyncRunner, final Eth2P2PNetworkFactory networkFactory, final List validatorKeys) throws Exception { - return create(spec, networkFactory, validatorKeys, c -> {}); + return create(spec, asyncRunner, networkFactory, validatorKeys, c -> {}); } @Deprecated public static NodeManager create( + final AsyncRunner asyncRunner, final Eth2P2PNetworkFactory networkFactory, final List validatorKeys, final Consumer configureNetwork) throws Exception { - return create(DEFAULT_SPEC, networkFactory, validatorKeys, configureNetwork); + return create(DEFAULT_SPEC, asyncRunner, networkFactory, validatorKeys, configureNetwork); } public static NodeManager create( final Spec spec, + final AsyncRunner asyncRunner, final Eth2P2PNetworkFactory networkFactory, final List validatorKeys, final Consumer configureNetwork) @@ -76,11 +80,12 @@ public static NodeManager create( final RecentChainData storageClient = MemoryOnlyRecentChainData.create(spec); final BeaconChainUtil chainUtil = BeaconChainUtil.create(spec, storageClient, validatorKeys); chainUtil.initializeStorage(); - return create(spec, networkFactory, configureNetwork, storageClient, chainUtil); + return create(spec, asyncRunner, networkFactory, configureNetwork, storageClient, chainUtil); } public static NodeManager create( final Spec spec, + final AsyncRunner asyncRunner, final Eth2P2PNetworkFactory networkFactory, final Consumer configureNetwork, final RecentChainData storageClient, @@ -100,7 +105,7 @@ public static NodeManager create( configureNetwork.accept(networkBuilder); final BlockGossipChannel blockGossipChannel = - eventChannels.getPublisher(BlockGossipChannel.class); + eventChannels.getPublisher(BlockGossipChannel.class, asyncRunner); final Eth2P2PNetwork eth2P2PNetwork = networkBuilder.startNetwork(); return new NodeManager(blockGossipChannel, storageClient, chainUtil, eth2P2PNetwork); @@ -125,6 +130,6 @@ public RecentChainData storageClient() { } public void gossipBlock(final SignedBeaconBlock block) { - blockGossipChannel.publishBlock(block); + blockGossipChannel.publishBlock(block).ifExceptionGetsHereRaiseABug(); } } diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/gossip/TopicChannel.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/gossip/TopicChannel.java index 581cc2479e5..aff796b8f5f 100644 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/gossip/TopicChannel.java +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/gossip/TopicChannel.java @@ -14,9 +14,10 @@ package tech.pegasys.teku.networking.p2p.gossip; import org.apache.tuweni.bytes.Bytes; +import tech.pegasys.teku.infrastructure.async.SafeFuture; public interface TopicChannel { - void gossip(Bytes data); + SafeFuture gossip(Bytes data); void close(); } diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/gossip/GossipHandler.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/gossip/GossipHandler.java index 73a0749c4e0..246fa40ae8b 100644 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/gossip/GossipHandler.java +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/gossip/GossipHandler.java @@ -73,20 +73,25 @@ public SafeFuture apply(final MessageApi message) { } LOG.trace("Received message for topic {}", topic); - PubsubMessage pubsubMessage = message.getOriginalMessage(); - if (!(pubsubMessage instanceof PreparedPubsubMessage)) { + final PubsubMessage pubsubMessage = message.getOriginalMessage(); + if (!(pubsubMessage instanceof PreparedPubsubMessage gossipPubsubMessage)) { throw new IllegalArgumentException( "Don't know this PubsubMessage implementation: " + pubsubMessage.getClass()); } - PreparedPubsubMessage gossipPubsubMessage = (PreparedPubsubMessage) pubsubMessage; return handler.handleMessage(gossipPubsubMessage.getPreparedMessage()); } - public void gossip(final Bytes bytes) { + public SafeFuture gossip(final Bytes bytes) { LOG.trace("Gossiping {}: {} bytes", topic, bytes.size()); - SafeFuture.of(publisher.publish(Unpooled.wrappedBuffer(bytes.toArrayUnsafe()), topic)) - .finish( - () -> LOG.trace("Successfully gossiped message on {}", topic), - err -> LOG.debug("Failed to gossip message on " + topic, err)); + return SafeFuture.of(publisher.publish(Unpooled.wrappedBuffer(bytes.toArrayUnsafe()), topic)) + .handle( + (result, error) -> { + if (error != null) { + LOG.debug("Failed to gossip message on {}", topic, error); + } else { + LOG.trace("Successfully gossiped message on {}", topic); + } + return null; + }); } } diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/gossip/LibP2PTopicChannel.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/gossip/LibP2PTopicChannel.java index a2f94b7072a..225a63a6245 100644 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/gossip/LibP2PTopicChannel.java +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/gossip/LibP2PTopicChannel.java @@ -16,6 +16,7 @@ import io.libp2p.core.pubsub.PubsubSubscription; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.tuweni.bytes.Bytes; +import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.networking.p2p.gossip.TopicChannel; public class LibP2PTopicChannel implements TopicChannel { @@ -30,11 +31,11 @@ public LibP2PTopicChannel( } @Override - public void gossip(final Bytes data) { + public SafeFuture gossip(final Bytes data) { if (closed.get()) { - return; + return SafeFuture.COMPLETE; } - topicHandler.gossip(data); + return topicHandler.gossip(data); } @Override diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/mock/MockTopicChannel.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/mock/MockTopicChannel.java index 225d3360dda..ef5ca70b146 100644 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/mock/MockTopicChannel.java +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/mock/MockTopicChannel.java @@ -14,13 +14,14 @@ package tech.pegasys.teku.networking.p2p.mock; import org.apache.tuweni.bytes.Bytes; +import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.networking.p2p.gossip.TopicChannel; public class MockTopicChannel implements TopicChannel { @Override - public void gossip(final Bytes data) { - // Do nothing + public SafeFuture gossip(final Bytes data) { + return SafeFuture.COMPLETE; } @Override diff --git a/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/gossip/GossipHandlerTest.java b/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/gossip/GossipHandlerTest.java index 0124a31fe27..27df09a542e 100644 --- a/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/gossip/GossipHandlerTest.java +++ b/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/gossip/GossipHandlerTest.java @@ -20,12 +20,14 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.assertThatSafeFuture; import io.libp2p.core.pubsub.PubsubPublisherApi; import io.libp2p.core.pubsub.Topic; import io.libp2p.core.pubsub.ValidationResult; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import kotlin.Unit; import org.apache.tuweni.bytes.Bytes; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -96,15 +98,15 @@ public void apply_bufferCapacityExceedsMaxSize() { @Test public void gossip_newMessage() { final Bytes message = Bytes.fromHexString("0x01"); - gossipHandler.gossip(message); + assertThatSafeFuture(gossipHandler.gossip(message)).isCompleted(); verify(publisher).publish(toByteBuf(message), topic); } @Test public void gossip_duplicateMessage() { // Deduplication is done a libp2p level. final Bytes message = Bytes.fromHexString("0x01"); - gossipHandler.gossip(message); - gossipHandler.gossip(message); + assertThatSafeFuture(gossipHandler.gossip(message)).isCompleted(); + assertThatSafeFuture(gossipHandler.gossip(message)).isCompleted(); verify(publisher, times(2)).publish(toByteBuf(message), topic); } @@ -112,12 +114,42 @@ public void gossip_duplicateMessage() { // Deduplication is done a libp2p level. public void gossip_distinctMessages() { final Bytes message1 = Bytes.fromHexString("0x01"); final Bytes message2 = Bytes.fromHexString("0x02"); - gossipHandler.gossip(message1); - gossipHandler.gossip(message2); + assertThatSafeFuture(gossipHandler.gossip(message1)).isCompleted(); + assertThatSafeFuture(gossipHandler.gossip(message2)).isCompleted(); verify(publisher).publish(toByteBuf(message1), topic); verify(publisher).publish(toByteBuf(message2), topic); } + @Test + public void gossip_returnCompletedFutureEvenIfFails() { + final Bytes message = Bytes.fromHexString("0x01"); + final SafeFuture result = new SafeFuture<>(); + when(publisher.publish(any(), any())).thenReturn(result); + final SafeFuture gossipResult = gossipHandler.gossip(message); + + verify(publisher).publish(toByteBuf(message), topic); + assertThat(gossipResult).isNotCompleted(); + + result.completeExceptionally(new RuntimeException("Failed to gossip")); + assertThat(gossipResult).isCompleted(); + } + + @Test + public void + gossip_returnFutureCompletingOnSuccessfulPublishing() { // Deduplication is done a libp2p + // level. + final Bytes message = Bytes.fromHexString("0x01"); + final SafeFuture result = new SafeFuture<>(); + when(publisher.publish(any(), any())).thenReturn(result); + final SafeFuture gossipResult = gossipHandler.gossip(message); + + verify(publisher).publish(toByteBuf(message), topic); + assertThat(gossipResult).isNotCompleted(); + + result.complete(Unit.INSTANCE); + assertThat(gossipResult).isCompleted(); + } + private ByteBuf toByteBuf(final Bytes bytes) { return Unpooled.wrappedBuffer(bytes.toArrayUnsafe()); } diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index f26cbb1fdd1..1e3c7a8f399 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -618,7 +618,7 @@ protected void initBlockBlobSidecarsTrackersPool() { final BlockImportChannel blockImportChannel = eventChannels.getPublisher(BlockImportChannel.class, beaconAsyncRunner); final BlobSidecarGossipChannel blobSidecarGossipChannel = - eventChannels.getPublisher(BlobSidecarGossipChannel.class); + eventChannels.getPublisher(BlobSidecarGossipChannel.class, beaconAsyncRunner); final BlockBlobSidecarsTrackersPoolImpl pool = poolFactory.createPoolForBlockBlobSidecarsTrackers( blockImportChannel, @@ -939,10 +939,11 @@ public void initValidatorApiHandler() { final BlockImportChannel blockImportChannel = eventChannels.getPublisher(BlockImportChannel.class, beaconAsyncRunner); final BlockGossipChannel blockGossipChannel = - eventChannels.getPublisher(BlockGossipChannel.class); + eventChannels.getPublisher(BlockGossipChannel.class, beaconAsyncRunner); final BlobSidecarGossipChannel blobSidecarGossipChannel; if (spec.isMilestoneSupported(SpecMilestone.DENEB)) { - blobSidecarGossipChannel = eventChannels.getPublisher(BlobSidecarGossipChannel.class); + blobSidecarGossipChannel = + eventChannels.getPublisher(BlobSidecarGossipChannel.class, beaconAsyncRunner); } else { blobSidecarGossipChannel = BlobSidecarGossipChannel.NOOP; }