From 5bc80dbf80ebfa72cb99547e8efbd2ab3c3f4acf Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Wed, 16 Oct 2024 12:22:58 +0200 Subject: [PATCH] tmp --- .../eth2/gossip/BlockGossipChannel.java | 6 ++++-- .../eth2/gossip/forks/GossipForkManager.java | 18 ++++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlockGossipChannel.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlockGossipChannel.java index 9c892af9219..3e746aab012 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlockGossipChannel.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlockGossipChannel.java @@ -13,9 +13,11 @@ package tech.pegasys.teku.networking.eth2.gossip; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.events.ChannelInterface; import tech.pegasys.teku.infrastructure.events.VoidReturningChannelInterface; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; -public interface BlockGossipChannel extends VoidReturningChannelInterface { - void publishBlock(SignedBeaconBlock block); +public interface BlockGossipChannel extends ChannelInterface { + SafeFuture publishBlock(SignedBeaconBlock block); } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java index 3220469395c..77a98b4a3b1 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java @@ -26,9 +26,11 @@ import java.util.Set; import java.util.TreeMap; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.SpecMilestone; @@ -246,6 +248,22 @@ private void publishMessage( slot)); } + private SafeFuture publishMessageWithFeedback( + final UInt64 slot, + final T message, + final String type, + final BiFunction> publisher) { + final Optional gossipForkSubscriptions = getSubscriptionActiveAtSlot(slot) + .filter(this::isActive); + + if(gossipForkSubscriptions.isEmpty()) { + LOG.warn("Not publishing {} because no gossip subscriptions are active for slot {}", type, slot); + return SafeFuture.COMPLETE; + } + + return publisher.apply(gossipForkSubscriptions.get(), message); + } + public synchronized void subscribeToAttestationSubnetId(final int subnetId) { if (currentAttestationSubnets.add(subnetId)) { activeSubscriptions.forEach(