Skip to content

Commit

Permalink
allow block and blobs publishing to give feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
tbenr committed Oct 16, 2024
1 parent 5bc80db commit e9979e1
Show file tree
Hide file tree
Showing 27 changed files with 157 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class SyncingNodeManager {
private final BlockGossipChannel blockGossipChannel;

private SyncingNodeManager(
final AsyncRunner asyncRunner,
final EventChannels eventChannels,
final RecentChainData recentChainData,
final BeaconChainUtil chainUtil,
Expand All @@ -94,7 +95,7 @@ private SyncingNodeManager(
this.chainUtil = chainUtil;
this.eth2P2PNetwork = eth2P2PNetwork;
this.syncService = syncService;
this.blockGossipChannel = eventChannels.getPublisher(BlockGossipChannel.class);
this.blockGossipChannel = eventChannels.getPublisher(BlockGossipChannel.class, asyncRunner);
}

@SuppressWarnings("FutureReturnValueIgnored")
Expand Down Expand Up @@ -215,7 +216,7 @@ public static SyncingNodeManager create(
syncService.start().join();

return new SyncingNodeManager(
eventChannels, recentChainData, chainUtil, eth2P2PNetwork, syncService);
asyncRunner, eventChannels, recentChainData, chainUtil, eth2P2PNetwork, syncService);
}

public SafeFuture<Peer> connect(final SyncingNodeManager peer) {
Expand Down Expand Up @@ -250,6 +251,6 @@ public void setSlot(final UInt64 slot) {
}

public void gossipBlock(final SignedBeaconBlock block) {
blockGossipChannel.publishBlock(block);
blockGossipChannel.publishBlock(block).ifExceptionGetsHereRaiseABug();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ void publishBlockAndBlobSidecars(
final SignedBeaconBlock block,
final List<BlobSidecar> blobSidecars,
final BlockPublishingPerformance blockPublishingPerformance) {
blockGossipChannel.publishBlock(block);
blobSidecarGossipChannel.publishBlobSidecars(blobSidecars);
blockGossipChannel.publishBlock(block).ifExceptionGetsHereRaiseABug();
blobSidecarGossipChannel.publishBlobSidecars(blobSidecars).ifExceptionGetsHereRaiseABug();
blockPublishingPerformance.blockAndBlobSidecarsPublishingInitiated();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void publishBlockAndBlobSidecars(
final SignedBeaconBlock block,
final List<BlobSidecar> blobSidecars,
final BlockPublishingPerformance blockPublishingPerformance) {
blockGossipChannel.publishBlock(block);
blockGossipChannel.publishBlock(block).ifExceptionGetsHereRaiseABug();
blockPublishingPerformance.blockPublishingInitiated();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -101,7 +102,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
private final RecentChainData recentChainData;
private final ExecutionLayerChannel executionLayer;
private final Supplier<BlobSidecarGossipValidator> gossipValidatorSupplier;
private final Consumer<BlobSidecar> blobSidecarGossipPublisher;
private final Function<BlobSidecar, SafeFuture<Void>> blobSidecarGossipPublisher;
private final int maxTrackers;

private final BlockBlobSidecarsTrackerFactory trackerFactory;
Expand Down Expand Up @@ -133,7 +134,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
final RecentChainData recentChainData,
final ExecutionLayerChannel executionLayer,
final Supplier<BlobSidecarGossipValidator> gossipValidatorSupplier,
final Consumer<BlobSidecar> blobSidecarGossipPublisher,
final Function<BlobSidecar, SafeFuture<Void>> blobSidecarGossipPublisher,
final UInt64 historicalSlotTolerance,
final UInt64 futureSlotTolerance,
final int maxTrackers) {
Expand Down Expand Up @@ -165,7 +166,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
final RecentChainData recentChainData,
final ExecutionLayerChannel executionLayer,
final Supplier<BlobSidecarGossipValidator> gossipValidatorSupplier,
final Consumer<BlobSidecar> blobSidecarGossipPublisher,
final Function<BlobSidecar, SafeFuture<Void>> blobSidecarGossipPublisher,
final UInt64 historicalSlotTolerance,
final UInt64 futureSlotTolerance,
final int maxTrackers,
Expand Down Expand Up @@ -246,7 +247,7 @@ public synchronized void onNewBlobSidecar(
private void publishRecoveredBlobSidecar(final BlobSidecar blobSidecar) {
LOG.debug("Publishing recovered blob sidecar {}", blobSidecar::toLogString);
gossipValidatorSupplier.get().markForEquivocation(blobSidecar);
blobSidecarGossipPublisher.accept(blobSidecar);
blobSidecarGossipPublisher.apply(blobSidecar).ifExceptionGetsHereRaiseABug();
}

private void countBlobSidecar(final RemoteOrigin origin) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@

import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge;
import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;
import tech.pegasys.teku.infrastructure.time.TimeProvider;
Expand Down Expand Up @@ -120,7 +121,7 @@ public BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
final RecentChainData recentChainData,
final ExecutionLayerChannel executionLayer,
final Supplier<BlobSidecarGossipValidator> gossipValidatorSupplier,
final Consumer<BlobSidecar> blobSidecarGossipPublisher) {
final Function<BlobSidecar, SafeFuture<Void>> blobSidecarGossipPublisher) {
return createPoolForBlockBlobSidecarsTrackers(
blockImportChannel,
spec,
Expand All @@ -143,7 +144,7 @@ public BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
final RecentChainData recentChainData,
final ExecutionLayerChannel executionLayer,
final Supplier<BlobSidecarGossipValidator> gossipValidatorSupplier,
final Consumer<BlobSidecar> blobSidecarGossipPublisher,
final Function<BlobSidecar, SafeFuture<Void>> blobSidecarGossipPublisher,
final UInt64 historicalBlockTolerance,
final UInt64 futureBlockTolerance,
final int maxTrackers) {
Expand Down Expand Up @@ -172,7 +173,7 @@ BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
final RecentChainData recentChainData,
final ExecutionLayerChannel executionLayer,
final Supplier<BlobSidecarGossipValidator> gossipValidatorSupplier,
final Consumer<BlobSidecar> blobSidecarGossipPublisher,
final Function<BlobSidecar, SafeFuture<Void>> blobSidecarGossipPublisher,
final UInt64 historicalBlockTolerance,
final UInt64 futureBlockTolerance,
final int maxItems,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -81,7 +80,7 @@ public class BlockBlobSidecarsTrackersPoolImplTest {
private final ExecutionLayerChannel executionLayer = mock(ExecutionLayerChannel.class);

@SuppressWarnings("unchecked")
private final Consumer<BlobSidecar> blobSidecarPublisher = mock(Consumer.class);
private final Function<BlobSidecar, SafeFuture<Void>> blobSidecarPublisher = mock(Function.class);

private final BlobSidecarGossipValidator blobSidecarGossipValidator =
mock(BlobSidecarGossipValidator.class);
Expand Down Expand Up @@ -235,7 +234,7 @@ public void onNewBlobSidecar_shouldMarkForEquivocationAndPublishWhenOriginIsLoca
assertBlobSidecarsTrackersCount(3);

verify(blobSidecarGossipValidator).markForEquivocation(blobSidecar1);
verify(blobSidecarPublisher, times(1)).accept(blobSidecar1);
verify(blobSidecarPublisher, times(1)).apply(blobSidecar1);
}

@Test
Expand All @@ -254,7 +253,7 @@ public void onNewBlobSidecar_shouldPublishWhenOriginIsLocalELAndEquivocating() {
assertBlobSidecarsTrackersCount(1);

verify(blobSidecarGossipValidator).markForEquivocation(blobSidecar1);
verify(blobSidecarPublisher, times(1)).accept(blobSidecar1);
verify(blobSidecarPublisher, times(1)).apply(blobSidecar1);
}

@Test
Expand All @@ -274,7 +273,7 @@ public void onNewBlobSidecar_shouldNotPublishWhenOriginIsLocalELIsNotCurrentSlot
assertBlobSidecarsTrackersCount(1);

verify(blobSidecarGossipValidator, never()).markForEquivocation(blobSidecar1);
verify(blobSidecarPublisher, never()).accept(blobSidecar1);
verify(blobSidecarPublisher, never()).apply(blobSidecar1);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.bls.BLSKeyGenerator;
import tech.pegasys.teku.bls.BLSKeyPair;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.DelayedExecutorAsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.Waiter;
import tech.pegasys.teku.networking.eth2.Eth2P2PNetworkFactory.Eth2P2PNetworkBuilder;
Expand All @@ -35,7 +37,7 @@
import tech.pegasys.teku.statetransition.validation.InternalValidationResult;

public class AttesterSlashingGossipIntegrationTest {

private final AsyncRunner asyncRunner = DelayedExecutorAsyncRunner.create();
private final List<BLSKeyPair> validatorKeys = BLSKeyGenerator.generateKeyPairs(3);
private final Eth2P2PNetworkFactory networkFactory = new Eth2P2PNetworkFactory();
private final DataStructureUtil dataStructureUtil =
Expand Down Expand Up @@ -91,6 +93,6 @@ public void shouldGossipToPeers() throws Exception {

private NodeManager createNodeManager(final Consumer<Eth2P2PNetworkBuilder> networkBuilder)
throws Exception {
return NodeManager.create(networkFactory, validatorKeys, networkBuilder);
return NodeManager.create(asyncRunner, networkFactory, validatorKeys, networkBuilder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Optional;
import java.util.function.Function;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.ssz.SszData;
import tech.pegasys.teku.infrastructure.ssz.schema.SszSchema;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
Expand Down Expand Up @@ -76,7 +77,11 @@ public Eth2TopicHandler<T> getTopicHandler() {
}

protected void publishMessage(final T message) {
channel.ifPresent(c -> c.gossip(gossipEncoding.encode(message)));
channel.ifPresent(c -> c.gossip(gossipEncoding.encode(message)).finish(__ -> {}));
}

protected SafeFuture<Void> publishMessageWithFeedback(final T message) {
return channel.map(c -> c.gossip(gossipEncoding.encode(message))).orElse(SafeFuture.COMPLETE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@
package tech.pegasys.teku.networking.eth2.gossip;

import java.util.List;
import tech.pegasys.teku.infrastructure.events.VoidReturningChannelInterface;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.events.ChannelInterface;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;

public interface BlobSidecarGossipChannel extends VoidReturningChannelInterface {
public interface BlobSidecarGossipChannel extends ChannelInterface {

BlobSidecarGossipChannel NOOP = blobSidecar -> {};
BlobSidecarGossipChannel NOOP = blobSidecar -> SafeFuture.COMPLETE;

default void publishBlobSidecars(final List<BlobSidecar> blobSidecars) {
blobSidecars.forEach(this::publishBlobSidecar);
default SafeFuture<Void> publishBlobSidecars(final List<BlobSidecar> blobSidecars) {
return SafeFuture.allOf(blobSidecars.stream().map(this::publishBlobSidecar));
}

void publishBlobSidecar(BlobSidecar blobSidecar);
SafeFuture<Void> publishBlobSidecar(BlobSidecar blobSidecar);
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,11 @@ private BlobSidecarGossipManager(
this.subnetIdToTopicHandler = subnetIdToTopicHandler;
}

public void publishBlobSidecar(final BlobSidecar message) {
public SafeFuture<Void> publishBlobSidecar(final BlobSidecar message) {
final int subnetId = spec.computeSubnetForBlobSidecar(message).intValue();
Optional.ofNullable(subnetIdToChannel.get(subnetId))
.ifPresent(channel -> channel.gossip(gossipEncoding.encode(message)));
return Optional.ofNullable(subnetIdToChannel.get(subnetId))
.map(channel -> channel.gossip(gossipEncoding.encode(message)))
.orElse(SafeFuture.COMPLETE);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.events.ChannelInterface;
import tech.pegasys.teku.infrastructure.events.VoidReturningChannelInterface;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;

public interface BlockGossipChannel extends ChannelInterface {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tech.pegasys.teku.networking.eth2.gossip;

import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding;
import tech.pegasys.teku.networking.eth2.gossip.topics.GossipTopicName;
import tech.pegasys.teku.networking.eth2.gossip.topics.OperationProcessor;
Expand Down Expand Up @@ -51,8 +52,8 @@ public BlockGossipManager(
debugDataDumper);
}

public void publishBlock(final SignedBeaconBlock message) {
publishMessage(message);
public SafeFuture<Void> publishBlock(final SignedBeaconBlock message) {
return publishMessageWithFeedback(message);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,13 @@ public synchronized void publishAttestation(final ValidatableAttestation attesta
GossipForkSubscriptions::publishAttestation);
}

public synchronized void publishBlock(final SignedBeaconBlock block) {
publishMessage(block.getSlot(), block, "block", GossipForkSubscriptions::publishBlock);
public synchronized SafeFuture<Void> publishBlock(final SignedBeaconBlock block) {
return publishMessageWithFeedback(
block.getSlot(), block, "block", GossipForkSubscriptions::publishBlock);
}

public synchronized void publishBlobSidecar(final BlobSidecar blobSidecar) {
publishMessage(
public synchronized SafeFuture<Void> publishBlobSidecar(final BlobSidecar blobSidecar) {
return publishMessageWithFeedback(
blobSidecar.getSlot(),
blobSidecar,
"blob sidecar",
Expand Down Expand Up @@ -249,15 +250,16 @@ private <T> void publishMessage(
}

private <T> SafeFuture<Void> publishMessageWithFeedback(
final UInt64 slot,
final T message,
final String type,
final BiFunction<GossipForkSubscriptions, T, SafeFuture<Void>> publisher) {
final Optional<GossipForkSubscriptions> gossipForkSubscriptions = getSubscriptionActiveAtSlot(slot)
.filter(this::isActive);

if(gossipForkSubscriptions.isEmpty()) {
LOG.warn("Not publishing {} because no gossip subscriptions are active for slot {}", type, slot);
final UInt64 slot,
final T message,
final String type,
final BiFunction<GossipForkSubscriptions, T, SafeFuture<Void>> publisher) {
final Optional<GossipForkSubscriptions> gossipForkSubscriptions =
getSubscriptionActiveAtSlot(slot).filter(this::isActive);

if (gossipForkSubscriptions.isEmpty()) {
LOG.warn(
"Not publishing {} because no gossip subscriptions are active for slot {}", type, slot);
return SafeFuture.COMPLETE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tech.pegasys.teku.networking.eth2.gossip.forks;

import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
Expand All @@ -37,10 +38,10 @@ public interface GossipForkSubscriptions {

void publishAttestation(ValidatableAttestation attestation);

void publishBlock(SignedBeaconBlock block);
SafeFuture<Void> publishBlock(SignedBeaconBlock block);

default void publishBlobSidecar(final BlobSidecar blobSidecar) {
// since Deneb
default SafeFuture<Void> publishBlobSidecar(final BlobSidecar blobSidecar) {
return SafeFuture.COMPLETE;
}

void subscribeToAttestationSubnetId(int subnetId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import org.hyperledger.besu.plugin.services.MetricsSystem;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.networking.eth2.gossip.BlobSidecarGossipManager;
import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding;
import tech.pegasys.teku.networking.eth2.gossip.topics.OperationProcessor;
Expand Down Expand Up @@ -104,7 +105,7 @@ void addBlobSidecarGossipManager(final ForkInfo forkInfo) {
}

@Override
public void publishBlobSidecar(final BlobSidecar blobSidecar) {
blobSidecarGossipManager.publishBlobSidecar(blobSidecar);
public SafeFuture<Void> publishBlobSidecar(final BlobSidecar blobSidecar) {
return blobSidecarGossipManager.publishBlobSidecar(blobSidecar);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.tuweni.bytes.Bytes32;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.gossip.AggregateGossipManager;
import tech.pegasys.teku.networking.eth2.gossip.AttestationGossipManager;
Expand Down Expand Up @@ -235,8 +236,8 @@ public void publishAttestation(final ValidatableAttestation attestation) {
}

@Override
public void publishBlock(final SignedBeaconBlock block) {
blockGossipManager.publishBlock(block);
public SafeFuture<Void> publishBlock(final SignedBeaconBlock block) {
return blockGossipManager.publishBlock(block);
}

@Override
Expand Down
Loading

0 comments on commit e9979e1

Please sign in to comment.