From 1fc8ff4a7dd665029750d7be3c11cd90cbadf063 Mon Sep 17 00:00:00 2001 From: Dmitrii Shmatko Date: Tue, 19 Nov 2024 19:35:43 +0100 Subject: [PATCH] Don't subscribe in AttestationTopicSubscriber if subscription is outdated --- .../subnets/AttestationTopicSubscriber.java | 20 +++++++++ .../AttestationTopicSubscriberTest.java | 41 +++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/AttestationTopicSubscriber.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/AttestationTopicSubscriber.java index ccf785f92c8..fd32d847909 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/AttestationTopicSubscriber.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/AttestationTopicSubscriber.java @@ -21,6 +21,7 @@ import it.unimi.dsi.fastutil.ints.IntSet; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import tech.pegasys.teku.ethereum.events.SlotEventsChannel; @@ -39,6 +40,7 @@ public class AttestationTopicSubscriber implements SlotEventsChannel { private final Eth2P2PNetwork eth2P2PNetwork; private final Spec spec; private final SettableLabelledGauge subnetSubscriptionsGauge; + private final AtomicReference lastSlot = new AtomicReference<>(null); public AttestationTopicSubscriber( final Spec spec, @@ -56,6 +58,14 @@ public synchronized void subscribeToCommitteeForAggregation( aggregationSlot, UInt64.valueOf(committeeIndex), committeesAtSlot); final UInt64 currentUnsubscriptionSlot = subnetIdToUnsubscribeSlot.getOrDefault(subnetId, ZERO); final UInt64 unsubscribeSlot = currentUnsubscriptionSlot.max(aggregationSlot); + if (lastSlot.get() != null && unsubscribeSlot.isLessThan(lastSlot.get())) { + LOG.trace( + "Skipping outdated aggregation subnet {} with unsubscribe due at slot {}", + subnetId, + unsubscribeSlot); + return; + } + if (currentUnsubscriptionSlot.equals(ZERO)) { eth2P2PNetwork.subscribeToAttestationSubnetId(subnetId); toggleAggregateSubscriptionMetric(subnetId, false); @@ -97,6 +107,15 @@ public synchronized void subscribeToPersistentSubnets( for (SubnetSubscription subnetSubscription : newSubscriptions) { int subnetId = subnetSubscription.subnetId(); + if (lastSlot.get() != null + && subnetSubscription.unsubscriptionSlot().isLessThan(lastSlot.get())) { + LOG.trace( + "Skipping outdated persistent subnet {} with unsubscribe due at slot {}", + subnetId, + subnetSubscription.unsubscriptionSlot()); + continue; + } + shouldUpdateENR = persistentSubnetIdSet.add(subnetId) || shouldUpdateENR; LOG.trace( "Subscribing to persistent subnet {} with unsubscribe due at slot {}", @@ -127,6 +146,7 @@ public synchronized void subscribeToPersistentSubnets( @Override public synchronized void onSlot(final UInt64 slot) { + lastSlot.set(slot); boolean shouldUpdateENR = false; final Iterator> iterator = diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/subnets/AttestationTopicSubscriberTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/subnets/AttestationTopicSubscriberTest.java index 4ff2241e165..5fbdc1e466a 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/subnets/AttestationTopicSubscriberTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/subnets/AttestationTopicSubscriberTest.java @@ -15,6 +15,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -47,6 +48,7 @@ public void shouldSubscribeToSubnet() { final int committeeId = 10; final int subnetId = spec.computeSubnetForCommittee(ONE, UInt64.valueOf(committeeId), COMMITTEES_AT_SLOT); + subscriber.onSlot(ONE); subscriber.subscribeToCommitteeForAggregation(committeeId, COMMITTEES_AT_SLOT, ONE); verify(settableLabelledGaugeMock) @@ -155,6 +157,44 @@ public void shouldPreserveLaterSubscriptionPeriodWhenEarlierSlotAdded() { verify(eth2P2PNetwork).unsubscribeFromAttestationSubnetId(subnetId); } + @Test + public void shouldNotSubscribeForExpiredAggregationSubnet() { + final int committeeId = 3; + final UInt64 slot = UInt64.valueOf(10); + final int subnetId = + spec.computeSubnetForCommittee(slot, UInt64.valueOf(committeeId), COMMITTEES_AT_SLOT); + // Sanity check second subscription is for the same subnet ID. + assertThat(subnetId) + .isEqualTo( + spec.computeSubnetForCommittee(slot, UInt64.valueOf(committeeId), COMMITTEES_AT_SLOT)); + + subscriber.onSlot(slot.plus(ONE)); + subscriber.subscribeToCommitteeForAggregation(committeeId, COMMITTEES_AT_SLOT, slot); + verifyNoMoreInteractions(settableLabelledGaugeMock); + verify(eth2P2PNetwork, never()).subscribeToAttestationSubnetId(anyInt()); + verify(eth2P2PNetwork, never()).unsubscribeFromAttestationSubnetId(anyInt()); + } + + @Test + public void shouldNotSubscribeForExpiredPersistentSubnet() { + Set subnetSubscriptions = + Set.of( + new SubnetSubscription(2, UInt64.valueOf(15)), + new SubnetSubscription(1, UInt64.valueOf(20))); + + subscriber.onSlot(UInt64.valueOf(16)); + subscriber.subscribeToPersistentSubnets(subnetSubscriptions); + + verify(settableLabelledGaugeMock) + .set(1, String.format(AttestationTopicSubscriber.GAUGE_PERSISTENT_SUBNETS_LABEL, 1)); + verifyNoMoreInteractions(settableLabelledGaugeMock); + + verify(eth2P2PNetwork).setLongTermAttestationSubnetSubscriptions(IntSet.of(1)); + + verify(eth2P2PNetwork).subscribeToAttestationSubnetId(1); + verify(eth2P2PNetwork, never()).subscribeToAttestationSubnetId(eq(2)); + } + @Test public void shouldSubscribeToNewSubnetsAndUpdateENR_forPersistentSubscriptions() { Set subnetSubscriptions = @@ -162,6 +202,7 @@ public void shouldSubscribeToNewSubnetsAndUpdateENR_forPersistentSubscriptions() new SubnetSubscription(1, UInt64.valueOf(20)), new SubnetSubscription(2, UInt64.valueOf(15))); + subscriber.onSlot(UInt64.valueOf(15)); subscriber.subscribeToPersistentSubnets(subnetSubscriptions); verify(settableLabelledGaugeMock)