Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
tbenr committed Oct 16, 2024
1 parent f5790a5 commit 5bc80db
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@

package tech.pegasys.teku.networking.eth2.gossip;

import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.events.ChannelInterface;
import tech.pegasys.teku.infrastructure.events.VoidReturningChannelInterface;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;

public interface BlockGossipChannel extends VoidReturningChannelInterface {
void publishBlock(SignedBeaconBlock block);
public interface BlockGossipChannel extends ChannelInterface {
SafeFuture<Void> publishBlock(SignedBeaconBlock block);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecMilestone;
Expand Down Expand Up @@ -246,6 +248,22 @@ private <T> void publishMessage(
slot));
}

private <T> SafeFuture<Void> publishMessageWithFeedback(
final UInt64 slot,
final T message,
final String type,
final BiFunction<GossipForkSubscriptions, T, SafeFuture<Void>> publisher) {
final Optional<GossipForkSubscriptions> gossipForkSubscriptions = getSubscriptionActiveAtSlot(slot)
.filter(this::isActive);

if(gossipForkSubscriptions.isEmpty()) {
LOG.warn("Not publishing {} because no gossip subscriptions are active for slot {}", type, slot);
return SafeFuture.COMPLETE;
}

return publisher.apply(gossipForkSubscriptions.get(), message);
}

public synchronized void subscribeToAttestationSubnetId(final int subnetId) {
if (currentAttestationSubnets.add(subnetId)) {
activeSubscriptions.forEach(
Expand Down

0 comments on commit 5bc80db

Please sign in to comment.