Skip to content

Commit

Permalink
alter ActiveP2pNetwork concept of close to in sync (Consensys#8853)
Browse files Browse the repository at this point in the history
 - moved the isCloseToInSync into recentChaindata

Signed-off-by: Paul Harris <[email protected]>
Co-authored-by: Enrico Del Fante <[email protected]>
  • Loading branch information
rolfyone and tbenr authored Dec 1, 2024
1 parent 809ea98 commit 93ab4a8
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public ForkChoiceUtil(
}

public UInt64 getSlotsSinceGenesis(final ReadOnlyStore store, final boolean useUnixTime) {
UInt64 time =
final UInt64 time =
useUnixTime ? UInt64.valueOf(Instant.now().getEpochSecond()) : store.getTimeSeconds();
return getCurrentSlot(time, store.getGenesisTime());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,17 @@ private synchronized void startup() {
processedAttestationSubscriptionProvider.subscribe(gossipForkManager::publishAttestation);
eventChannels.subscribe(BlockGossipChannel.class, gossipForkManager::publishBlock);
eventChannels.subscribe(BlobSidecarGossipChannel.class, gossipForkManager::publishBlobSidecar);
if (isCloseToInSync()) {
if (recentChainData.isCloseToInSync()) {
startGossip();
}
peerManager.subscribeConnect(peer -> onPeerConnected());
}

private void onPeerConnected() {
if (gossipStarted.get() || state.get() != State.RUNNING) {
return;
}
if (recentChainData.isCloseToInSync()) {
startGossip();
}
}
Expand Down Expand Up @@ -167,36 +177,19 @@ private synchronized void stopGossip() {
}

@Override
public void onSyncStateChanged(final boolean isInSync, final boolean isOptimistic) {
public void onSyncStateChanged(final boolean isCloseToInSync, final boolean isOptimistic) {
gossipForkManager.onOptimisticHeadChanged(isOptimistic);

if (state.get() != State.RUNNING) {
return;
}
if (isInSync || isCloseToInSync()) {
if (isCloseToInSync) {
startGossip();
} else {
stopGossip();
}
}

@VisibleForTesting
boolean isCloseToInSync() {
final Optional<UInt64> currentEpoch = recentChainData.getCurrentEpoch();
if (currentEpoch.isEmpty()) {
return false;
}

final int maxLookaheadEpochs = spec.getSpecConfig(currentEpoch.get()).getMaxSeedLookahead();
final int slotsPerEpoch = spec.slotsPerEpoch(currentEpoch.get());
final int maxLookaheadSlots = slotsPerEpoch * maxLookaheadEpochs;

return recentChainData
.getChainHeadSlotsBehind()
.orElse(UInt64.MAX_VALUE)
.isLessThanOrEqualTo(maxLookaheadSlots);
}

private void setTopicScoringParams() {
gossipUpdateTask =
asyncRunner.runWithFixedDelay(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public interface Eth2P2PNetwork extends P2PNetwork<Eth2Peer> {

void onEpoch(UInt64 epoch);

void onSyncStateChanged(final boolean isInSync, final boolean isOptimistic);
void onSyncStateChanged(final boolean isCloseToInSync, final boolean isOptimistic);

void subscribeToAttestationSubnetId(int subnetId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public NoOpEth2P2PNetwork(final Spec spec) {
public void onEpoch(final UInt64 epoch) {}

@Override
public void onSyncStateChanged(final boolean isInSync, final boolean isOptimistic) {}
public void onSyncStateChanged(final boolean isCloseToInSync, final boolean isOptimistic) {}

@Override
public void subscribeToAttestationSubnetId(final int subnetId) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding;
import tech.pegasys.teku.networking.eth2.gossip.forks.GossipForkManager;
import tech.pegasys.teku.networking.eth2.gossip.topics.ProcessedAttestationSubscriptionProvider;
import tech.pegasys.teku.networking.eth2.peers.Eth2Peer;
import tech.pegasys.teku.networking.eth2.peers.Eth2PeerManager;
import tech.pegasys.teku.networking.p2p.discovery.DiscoveryNetwork;
import tech.pegasys.teku.networking.p2p.peer.PeerConnectedSubscriber;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.datastructures.attestation.ProcessedAttestationListener;
Expand Down Expand Up @@ -81,10 +83,6 @@ public class ActiveEth2P2PNetworkTest {
private Fork altairFork;
private Bytes32 genesisValidatorsRoot;

private final int maxFollowDistanceSlots =
spec.getGenesisSpecConfig().getMaxSeedLookahead()
* spec.slotsPerEpoch(storageSystem.combinedChainDataClient().getCurrentEpoch());

@BeforeEach
public void setup() {
when(discoveryNetwork.start()).thenReturn(SafeFuture.completedFuture(null));
Expand Down Expand Up @@ -190,14 +188,39 @@ public void unsubscribeFromSyncCommitteeSubnetId_shouldUpdateDiscoveryENR() {
}

@Test
void onSyncStateChanged_shouldEnableGossipWhenInSync() {
void shouldStartGossipOnPeerConnect() {
@SuppressWarnings("unchecked")
final ArgumentCaptor<PeerConnectedSubscriber<Eth2Peer>> peerManagerCaptor =
ArgumentCaptor.forClass(PeerConnectedSubscriber.class);
// Current slot is a long way beyond the chain head
storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(1000));
storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(64));

assertThat(network.start()).isCompleted();
// Won't start gossip as chain head is too old
verify(peerManager).subscribeConnect(peerManagerCaptor.capture());

network.onSyncStateChanged(false, false);
// based on network time we know we're too far behind, so we don't start gossip
verify(gossipForkManager, never()).configureGossipForEpoch(any());

// we are still too far behind, so on peer connect gossip is not started
peerManagerCaptor.getValue().onConnected(mock(Eth2Peer.class));
verify(gossipForkManager, never()).configureGossipForEpoch(any());

// Advance the chain
storageSystem.chainUpdater().updateBestBlock(storageSystem.chainUpdater().advanceChain(64));

// on peer connect gossip is started
peerManagerCaptor.getValue().onConnected(mock(Eth2Peer.class));
verify(gossipForkManager).configureGossipForEpoch(any());
}

@Test
void onSyncStateChanged_shouldEnableGossipWhenInSync() {
// Current slot is a long way beyond the chain head
storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(32));

assertThat(network.start()).isCompleted();

network.onSyncStateChanged(true, false);

// Even though we're a long way behind, start gossip because we believe we're in sync
Expand Down Expand Up @@ -259,41 +282,6 @@ void onSyncStateChanged_shouldNotResultInMultipleSubscriptions() {
verify(eventChannels, times(1)).subscribe(eq(BlockGossipChannel.class), any());
}

@Test
void isCloseToInSync_shouldCalculateWhenDistanceOutOfRange() {
storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(maxFollowDistanceSlots + 1));
assertThat(network.isCloseToInSync()).isFalse();
}

@Test
void isCloseToInSync_shouldCalculateWhenDistanceInRange() {
storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(maxFollowDistanceSlots));
assertThat(network.isCloseToInSync()).isTrue();
}

@Test
void isCloseToInSync_shouldReturnFalseWhenEmptyCurrentEpoch() {
final StorageSystem storageSystem = InMemoryStorageSystemBuilder.buildDefault();
final RecentChainData recentChainData = storageSystem.recentChainData();
final ActiveEth2P2PNetwork network =
new ActiveEth2P2PNetwork(
spec,
asyncRunner,
discoveryNetwork,
peerManager,
gossipForkManager,
eventChannels,
recentChainData,
attestationSubnetService,
syncCommitteeSubnetService,
gossipEncoding,
gossipConfigurator,
processedAttestationSubscriptionProvider,
true);

assertThat(network.isCloseToInSync()).isFalse();
}

@SuppressWarnings("unchecked")
private ArgumentCaptor<Iterable<Integer>> subnetIdCaptor() {
return ArgumentCaptor.forClass(Iterable.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,8 @@ public void initSyncService() {

// p2pNetwork subscription so gossip can be enabled and disabled appropriately
syncService.subscribeToSyncStateChangesAndUpdate(
state -> p2pNetwork.onSyncStateChanged(state.isInSync(), state.isOptimistic()));
state ->
p2pNetwork.onSyncStateChanged(recentChainData.isCloseToInSync(), state.isOptimistic()));
}

protected void initOperationsReOrgManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private void processEpochPrecompute(final UInt64 epoch) {
}

private void processSlotWhileSyncing(final SyncState currentSyncState) {
UInt64 slot = nodeSlot.getValue();
final UInt64 slot = nodeSlot.getValue();
this.forkChoiceTrigger.onSlotStartedWhileSyncing(slot);
if (currentSyncState == SyncState.AWAITING_EL) {
eventLog.syncEventAwaitingEL(slot, recentChainData.getHeadSlot(), p2pNetwork.getPeerCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static tech.pegasys.teku.infrastructure.time.TimeUtilities.secondsToMillis;

import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -41,6 +42,7 @@
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.SpecVersion;
import tech.pegasys.teku.spec.config.SpecConfig;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.MinimalBeaconBlockSummary;
Expand All @@ -58,6 +60,7 @@
import tech.pegasys.teku.spec.datastructures.state.ForkInfo;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconStateCache;
import tech.pegasys.teku.spec.logic.common.helpers.MiscHelpers;
import tech.pegasys.teku.spec.logic.common.util.BeaconStateUtil;
import tech.pegasys.teku.storage.api.ChainHeadChannel;
import tech.pegasys.teku.storage.api.FinalizedCheckpointChannel;
Expand Down Expand Up @@ -194,6 +197,27 @@ public UInt64 computeTimeAtSlot(final UInt64 slot) {
return genesisTime.plus(slot.times(spec.getSecondsPerSlot(slot)));
}

@VisibleForTesting
boolean isCloseToInSync(final UInt64 currentTimeSeconds) {
final SpecVersion specVersion = spec.getGenesisSpec();
final MiscHelpers miscHelpers = specVersion.miscHelpers();
final SpecConfig specConfig = specVersion.getConfig();
final UInt64 networkSlot = miscHelpers.computeSlotAtTime(getGenesisTime(), currentTimeSeconds);

final int maxLookaheadEpochs = specConfig.getMaxSeedLookahead();
final int slotsPerEpoch = specVersion.getSlotsPerEpoch();
final int maxLookaheadSlots = slotsPerEpoch * maxLookaheadEpochs;

return networkSlot.minusMinZero(getHeadSlot()).isLessThanOrEqualTo(maxLookaheadSlots);
}

public boolean isCloseToInSync() {
if (store == null) {
return false;
}
return isCloseToInSync(store.getTimeInSeconds());
}

public Optional<GenesisData> getGenesisData() {
return genesisData;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,45 @@ public void getBlockRootBySlotWithHeadRoot_forUnknownHeadRoot() {
assertThat(recentChainData.getBlockRootInEffectBySlot(bestBlock.getSlot(), headRoot)).isEmpty();
}

@Test
public void isCloseToInSync_preGenesis() {
initPreGenesis();
assertThat(recentChainData.isCloseToInSync()).isFalse();
}

@Test
public void isCloseToSync_belowBoundary() {
initPostGenesis();
final SpecConfig specConfig = spec.getGenesisSpecConfig();
final int seconds =
specConfig.getMaxSeedLookahead()
* specConfig.getSlotsPerEpoch()
* specConfig.getSecondsPerSlot();
assertThat(recentChainData.isCloseToInSync(UInt64.valueOf(seconds - 1))).isTrue();
}

@Test
public void isCloseToSync_atBoundary() {
initPostGenesis();
final SpecConfig specConfig = spec.getGenesisSpecConfig();
final int seconds =
specConfig.getMaxSeedLookahead()
* specConfig.getSlotsPerEpoch()
* specConfig.getSecondsPerSlot();
assertThat(recentChainData.isCloseToInSync(UInt64.valueOf(seconds))).isTrue();
}

@Test
public void isCloseToSync_aboveBoundary() {
initPostGenesis();
final SpecConfig specConfig = spec.getGenesisSpecConfig();
final int seconds =
specConfig.getMaxSeedLookahead()
* specConfig.getSlotsPerEpoch()
* specConfig.getSecondsPerSlot();
assertThat(recentChainData.isCloseToInSync(UInt64.valueOf(seconds + 8))).isFalse();
}

@Test
public void getBlockRootBySlotWithHeadRoot_withForkRoot() {
initPostGenesis();
Expand Down

0 comments on commit 93ab4a8

Please sign in to comment.