Skip to content

Commit

Permalink
Don't subscribe in AttestationTopicSubscriber if subscription is outd…
Browse files Browse the repository at this point in the history
…ated
  • Loading branch information
zilm13 committed Nov 19, 2024
1 parent 46adf0c commit 1fc8ff4
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import it.unimi.dsi.fastutil.ints.IntSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.ethereum.events.SlotEventsChannel;
Expand All @@ -39,6 +40,7 @@ public class AttestationTopicSubscriber implements SlotEventsChannel {
private final Eth2P2PNetwork eth2P2PNetwork;
private final Spec spec;
private final SettableLabelledGauge subnetSubscriptionsGauge;
private final AtomicReference<UInt64> lastSlot = new AtomicReference<>(null);

public AttestationTopicSubscriber(
final Spec spec,
Expand All @@ -56,6 +58,14 @@ public synchronized void subscribeToCommitteeForAggregation(
aggregationSlot, UInt64.valueOf(committeeIndex), committeesAtSlot);
final UInt64 currentUnsubscriptionSlot = subnetIdToUnsubscribeSlot.getOrDefault(subnetId, ZERO);
final UInt64 unsubscribeSlot = currentUnsubscriptionSlot.max(aggregationSlot);
if (lastSlot.get() != null && unsubscribeSlot.isLessThan(lastSlot.get())) {
LOG.trace(
"Skipping outdated aggregation subnet {} with unsubscribe due at slot {}",
subnetId,
unsubscribeSlot);
return;
}

if (currentUnsubscriptionSlot.equals(ZERO)) {
eth2P2PNetwork.subscribeToAttestationSubnetId(subnetId);
toggleAggregateSubscriptionMetric(subnetId, false);
Expand Down Expand Up @@ -97,6 +107,15 @@ public synchronized void subscribeToPersistentSubnets(

for (SubnetSubscription subnetSubscription : newSubscriptions) {
int subnetId = subnetSubscription.subnetId();
if (lastSlot.get() != null
&& subnetSubscription.unsubscriptionSlot().isLessThan(lastSlot.get())) {
LOG.trace(
"Skipping outdated persistent subnet {} with unsubscribe due at slot {}",
subnetId,
subnetSubscription.unsubscriptionSlot());
continue;
}

shouldUpdateENR = persistentSubnetIdSet.add(subnetId) || shouldUpdateENR;
LOG.trace(
"Subscribing to persistent subnet {} with unsubscribe due at slot {}",
Expand Down Expand Up @@ -127,6 +146,7 @@ public synchronized void subscribeToPersistentSubnets(

@Override
public synchronized void onSlot(final UInt64 slot) {
lastSlot.set(slot);
boolean shouldUpdateENR = false;

final Iterator<Int2ObjectMap.Entry<UInt64>> iterator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -47,6 +48,7 @@ public void shouldSubscribeToSubnet() {
final int committeeId = 10;
final int subnetId =
spec.computeSubnetForCommittee(ONE, UInt64.valueOf(committeeId), COMMITTEES_AT_SLOT);
subscriber.onSlot(ONE);
subscriber.subscribeToCommitteeForAggregation(committeeId, COMMITTEES_AT_SLOT, ONE);

verify(settableLabelledGaugeMock)
Expand Down Expand Up @@ -155,13 +157,52 @@ public void shouldPreserveLaterSubscriptionPeriodWhenEarlierSlotAdded() {
verify(eth2P2PNetwork).unsubscribeFromAttestationSubnetId(subnetId);
}

@Test
public void shouldNotSubscribeForExpiredAggregationSubnet() {
final int committeeId = 3;
final UInt64 slot = UInt64.valueOf(10);
final int subnetId =
spec.computeSubnetForCommittee(slot, UInt64.valueOf(committeeId), COMMITTEES_AT_SLOT);
// Sanity check second subscription is for the same subnet ID.
assertThat(subnetId)
.isEqualTo(
spec.computeSubnetForCommittee(slot, UInt64.valueOf(committeeId), COMMITTEES_AT_SLOT));

subscriber.onSlot(slot.plus(ONE));
subscriber.subscribeToCommitteeForAggregation(committeeId, COMMITTEES_AT_SLOT, slot);
verifyNoMoreInteractions(settableLabelledGaugeMock);
verify(eth2P2PNetwork, never()).subscribeToAttestationSubnetId(anyInt());
verify(eth2P2PNetwork, never()).unsubscribeFromAttestationSubnetId(anyInt());
}

@Test
public void shouldNotSubscribeForExpiredPersistentSubnet() {
Set<SubnetSubscription> subnetSubscriptions =
Set.of(
new SubnetSubscription(2, UInt64.valueOf(15)),
new SubnetSubscription(1, UInt64.valueOf(20)));

subscriber.onSlot(UInt64.valueOf(16));
subscriber.subscribeToPersistentSubnets(subnetSubscriptions);

verify(settableLabelledGaugeMock)
.set(1, String.format(AttestationTopicSubscriber.GAUGE_PERSISTENT_SUBNETS_LABEL, 1));
verifyNoMoreInteractions(settableLabelledGaugeMock);

verify(eth2P2PNetwork).setLongTermAttestationSubnetSubscriptions(IntSet.of(1));

verify(eth2P2PNetwork).subscribeToAttestationSubnetId(1);
verify(eth2P2PNetwork, never()).subscribeToAttestationSubnetId(eq(2));
}

@Test
public void shouldSubscribeToNewSubnetsAndUpdateENR_forPersistentSubscriptions() {
Set<SubnetSubscription> subnetSubscriptions =
Set.of(
new SubnetSubscription(1, UInt64.valueOf(20)),
new SubnetSubscription(2, UInt64.valueOf(15)));

subscriber.onSlot(UInt64.valueOf(15));
subscriber.subscribeToPersistentSubnets(subnetSubscriptions);

verify(settableLabelledGaugeMock)
Expand Down

0 comments on commit 1fc8ff4

Please sign in to comment.