Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

alter ActiveP2pNetwork concept of close to in sync #8853

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ private void updateCurrentState() {
currentState = SyncState.SYNCING;
} else if (startingUp) {
currentState = SyncState.START_UP;

} else {
currentState = SyncState.IN_SYNC;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public ForkChoiceUtil(
}

public UInt64 getSlotsSinceGenesis(final ReadOnlyStore store, final boolean useUnixTime) {
UInt64 time =
final UInt64 time =
useUnixTime ? UInt64.valueOf(Instant.now().getEpochSecond()) : store.getTimeSeconds();
return getCurrentSlot(time, store.getGenesisTime());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,17 @@ private synchronized void startup() {
processedAttestationSubscriptionProvider.subscribe(gossipForkManager::publishAttestation);
eventChannels.subscribe(BlockGossipChannel.class, gossipForkManager::publishBlock);
eventChannels.subscribe(BlobSidecarGossipChannel.class, gossipForkManager::publishBlobSidecar);
if (isCloseToInSync()) {
if (recentChainData.isCloseToInSync()) {
startGossip();
}
peerManager.subscribeConnect(peer -> onPeerConnected());
}

private void onPeerConnected() {
if (gossipStarted.get() || state.get() != State.RUNNING) {
return;
}
if (recentChainData.isCloseToInSync()) {
startGossip();
}
}
Expand Down Expand Up @@ -167,36 +177,19 @@ private synchronized void stopGossip() {
}

@Override
public void onSyncStateChanged(final boolean isInSync, final boolean isOptimistic) {
public void onSyncStateChanged(final boolean isOptimistic) {
gossipForkManager.onOptimisticHeadChanged(isOptimistic);

if (state.get() != State.RUNNING) {
return;
}
if (isInSync || isCloseToInSync()) {
if (recentChainData.isCloseToInSync()) {
startGossip();
} else {
stopGossip();
}
}

@VisibleForTesting
boolean isCloseToInSync() {
final Optional<UInt64> currentEpoch = recentChainData.getCurrentEpoch();
if (currentEpoch.isEmpty()) {
return false;
}

final int maxLookaheadEpochs = spec.getSpecConfig(currentEpoch.get()).getMaxSeedLookahead();
final int slotsPerEpoch = spec.slotsPerEpoch(currentEpoch.get());
final int maxLookaheadSlots = slotsPerEpoch * maxLookaheadEpochs;

return recentChainData
.getChainHeadSlotsBehind()
.orElse(UInt64.MAX_VALUE)
.isLessThanOrEqualTo(maxLookaheadSlots);
}

private void setTopicScoringParams() {
gossipUpdateTask =
asyncRunner.runWithFixedDelay(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public interface Eth2P2PNetwork extends P2PNetwork<Eth2Peer> {

void onEpoch(UInt64 epoch);

void onSyncStateChanged(final boolean isInSync, final boolean isOptimistic);
void onSyncStateChanged(final boolean isOptimistic);

void subscribeToAttestationSubnetId(int subnetId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public NoOpEth2P2PNetwork(final Spec spec) {
public void onEpoch(final UInt64 epoch) {}

@Override
public void onSyncStateChanged(final boolean isInSync, final boolean isOptimistic) {}
public void onSyncStateChanged(final boolean isOptimistic) {}

@Override
public void subscribeToAttestationSubnetId(final int subnetId) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding;
import tech.pegasys.teku.networking.eth2.gossip.forks.GossipForkManager;
import tech.pegasys.teku.networking.eth2.gossip.topics.ProcessedAttestationSubscriptionProvider;
import tech.pegasys.teku.networking.eth2.peers.Eth2Peer;
import tech.pegasys.teku.networking.eth2.peers.Eth2PeerManager;
import tech.pegasys.teku.networking.p2p.discovery.DiscoveryNetwork;
import tech.pegasys.teku.networking.p2p.peer.PeerConnectedSubscriber;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.datastructures.attestation.ProcessedAttestationListener;
Expand Down Expand Up @@ -81,10 +83,6 @@ public class ActiveEth2P2PNetworkTest {
private Fork altairFork;
private Bytes32 genesisValidatorsRoot;

private final int maxFollowDistanceSlots =
spec.getGenesisSpecConfig().getMaxSeedLookahead()
* spec.slotsPerEpoch(storageSystem.combinedChainDataClient().getCurrentEpoch());

@BeforeEach
public void setup() {
when(discoveryNetwork.start()).thenReturn(SafeFuture.completedFuture(null));
Expand Down Expand Up @@ -190,15 +188,40 @@ public void unsubscribeFromSyncCommitteeSubnetId_shouldUpdateDiscoveryENR() {
}

@Test
void onSyncStateChanged_shouldEnableGossipWhenInSync() {
void shouldStartGossipOnPeerConnect() {
@SuppressWarnings("unchecked")
final ArgumentCaptor<PeerConnectedSubscriber<Eth2Peer>> peerManagerCaptor =
ArgumentCaptor.forClass(PeerConnectedSubscriber.class);
// Current slot is a long way beyond the chain head
storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(1000));
storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(64));

assertThat(network.start()).isCompleted();
// Won't start gossip as chain head is too old
verify(peerManager).subscribeConnect(peerManagerCaptor.capture());

network.onSyncStateChanged(false);
// based on network time we know we're too far behind, so we don't start gossip
verify(gossipForkManager, never()).configureGossipForEpoch(any());

network.onSyncStateChanged(true, false);
// we are still too far behind, so on peer connect gossip is not started
peerManagerCaptor.getValue().onConnected(mock(Eth2Peer.class));
verify(gossipForkManager, never()).configureGossipForEpoch(any());

// Advance the chain
storageSystem.chainUpdater().updateBestBlock(storageSystem.chainUpdater().advanceChain(64));

// on peer connect gossip is started
peerManagerCaptor.getValue().onConnected(mock(Eth2Peer.class));
verify(gossipForkManager).configureGossipForEpoch(any());
}

@Test
void onSyncStateChanged_shouldEnableGossipWhenInSync() {
// Current slot is a long way beyond the chain head
storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(32));

assertThat(network.start()).isCompleted();

network.onSyncStateChanged(false);

// Even though we're a long way behind, start gossip because we believe we're in sync
verify(gossipForkManager).configureGossipForEpoch(any());
Expand All @@ -210,90 +233,56 @@ void onSyncStateChanged_shouldStopGossipWhenTooFarBehindAndNotInSync() {
storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(1000));

assertThat(network.start()).isCompleted();
network.onSyncStateChanged(true, false);
// Even though we're a long way behind, start gossip because we believe we're in sync
verify(gossipForkManager).configureGossipForEpoch(any());
network.onSyncStateChanged(false);
// based on network time we know we're too far behind, so we don't start gossip
verify(gossipForkManager, never()).configureGossipForEpoch(any());

network.onSyncStateChanged(false, false);
verify(gossipForkManager).stopGossip();
network.onSyncStateChanged(false);
verify(gossipForkManager, never()).stopGossip();
}

@Test
void onSyncStateChanged_shouldNotifyForkManagerOfOptimisticSyncState() {
assertThat(network.start()).isCompleted();

network.onSyncStateChanged(false, true);
network.onSyncStateChanged(true);
verify(gossipForkManager).onOptimisticHeadChanged(true);

network.onSyncStateChanged(false, false);
network.onSyncStateChanged(false);
verify(gossipForkManager).onOptimisticHeadChanged(false);

network.onSyncStateChanged(true, true);
network.onSyncStateChanged(true);
verify(gossipForkManager, times(2)).onOptimisticHeadChanged(true);

network.onSyncStateChanged(true, false);
network.onSyncStateChanged(false);
verify(gossipForkManager, times(2)).onOptimisticHeadChanged(false);
}

@Test
void onSyncStateChanged_shouldNotResultInMultipleSubscriptions() {
// Current slot is a long way beyond the chain head
storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(1000));
storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(32));

assertThat(network.start()).isCompleted();
// Won't start gossip as chain head is too old
verify(gossipForkManager, never()).configureGossipForEpoch(any());
// verify(gossipForkManager, never()).configureGossipForEpoch(any());

network.onSyncStateChanged(true, false);
network.onSyncStateChanged(false);
verify(gossipForkManager).configureGossipForEpoch(any());
assertThat(subscribers.getSubscriberCount()).isEqualTo(1);
verify(eventChannels, times(1)).subscribe(eq(BlockGossipChannel.class), any());

network.onSyncStateChanged(false, false);
storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(100));
network.onSyncStateChanged(false);
verify(gossipForkManager).stopGossip();

network.onSyncStateChanged(true, false);
verify(gossipForkManager, times(2)).configureGossipForEpoch(any());
network.onSyncStateChanged(false);
verify(gossipForkManager).configureGossipForEpoch(any());
// Can't unsubscribe from these so should only subscribe once
assertThat(subscribers.getSubscriberCount()).isEqualTo(1);
verify(eventChannels, times(1)).subscribe(eq(BlockGossipChannel.class), any());
}

@Test
void isCloseToInSync_shouldCalculateWhenDistanceOutOfRange() {
storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(maxFollowDistanceSlots + 1));
assertThat(network.isCloseToInSync()).isFalse();
}

@Test
void isCloseToInSync_shouldCalculateWhenDistanceInRange() {
storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(maxFollowDistanceSlots));
assertThat(network.isCloseToInSync()).isTrue();
}

@Test
void isCloseToInSync_shouldReturnFalseWhenEmptyCurrentEpoch() {
final StorageSystem storageSystem = InMemoryStorageSystemBuilder.buildDefault();
final RecentChainData recentChainData = storageSystem.recentChainData();
final ActiveEth2P2PNetwork network =
new ActiveEth2P2PNetwork(
spec,
asyncRunner,
discoveryNetwork,
peerManager,
gossipForkManager,
eventChannels,
recentChainData,
attestationSubnetService,
syncCommitteeSubnetService,
gossipEncoding,
gossipConfigurator,
processedAttestationSubscriptionProvider,
true);

assertThat(network.isCloseToInSync()).isFalse();
}

@SuppressWarnings("unchecked")
private ArgumentCaptor<Iterable<Integer>> subnetIdCaptor() {
return ArgumentCaptor.forClass(Iterable.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,7 @@ public void initSyncService() {

// p2pNetwork subscription so gossip can be enabled and disabled appropriately
syncService.subscribeToSyncStateChangesAndUpdate(
state -> p2pNetwork.onSyncStateChanged(state.isInSync(), state.isOptimistic()));
state -> p2pNetwork.onSyncStateChanged(state.isOptimistic()));
}

protected void initOperationsReOrgManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private void processEpochPrecompute(final UInt64 epoch) {
}

private void processSlotWhileSyncing(final SyncState currentSyncState) {
UInt64 slot = nodeSlot.getValue();
final UInt64 slot = nodeSlot.getValue();
this.forkChoiceTrigger.onSlotStartedWhileSyncing(slot);
if (currentSyncState == SyncState.AWAITING_EL) {
eventLog.syncEventAwaitingEL(slot, recentChainData.getHeadSlot(), p2pNetwork.getPeerCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static tech.pegasys.teku.infrastructure.time.TimeUtilities.secondsToMillis;

import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -41,6 +42,7 @@
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.SpecVersion;
import tech.pegasys.teku.spec.config.SpecConfig;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.MinimalBeaconBlockSummary;
Expand All @@ -58,6 +60,7 @@
import tech.pegasys.teku.spec.datastructures.state.ForkInfo;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconStateCache;
import tech.pegasys.teku.spec.logic.common.helpers.MiscHelpers;
import tech.pegasys.teku.spec.logic.common.util.BeaconStateUtil;
import tech.pegasys.teku.storage.api.ChainHeadChannel;
import tech.pegasys.teku.storage.api.FinalizedCheckpointChannel;
Expand Down Expand Up @@ -194,6 +197,27 @@ public UInt64 computeTimeAtSlot(final UInt64 slot) {
return genesisTime.plus(slot.times(spec.getSecondsPerSlot(slot)));
}

@VisibleForTesting
boolean isCloseToInSync(final UInt64 currentTimeSeconds) {
final SpecVersion specVersion = spec.getGenesisSpec();
final MiscHelpers miscHelpers = specVersion.miscHelpers();
final SpecConfig specConfig = specVersion.getConfig();
final UInt64 networkSlot = miscHelpers.computeSlotAtTime(getGenesisTime(), currentTimeSeconds);

final int maxLookaheadEpochs = specConfig.getMaxSeedLookahead();
final int slotsPerEpoch = specVersion.getSlotsPerEpoch();
final int maxLookaheadSlots = slotsPerEpoch * maxLookaheadEpochs;

return networkSlot.minusMinZero(getHeadSlot()).isLessThanOrEqualTo(maxLookaheadSlots);
}

public boolean isCloseToInSync() {
if (store == null) {
return false;
}
return isCloseToInSync(store.getTimeInSeconds());
}

public Optional<GenesisData> getGenesisData() {
return genesisData;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,45 @@ public void getBlockRootBySlotWithHeadRoot_forUnknownHeadRoot() {
assertThat(recentChainData.getBlockRootInEffectBySlot(bestBlock.getSlot(), headRoot)).isEmpty();
}

@Test
public void isCloseToInSync_preGenesis() {
initPreGenesis();
assertThat(recentChainData.isCloseToInSync()).isFalse();
}

@Test
public void isCloseToSync_belowBoundary() {
initPostGenesis();
final SpecConfig specConfig = spec.getGenesisSpecConfig();
final int seconds =
specConfig.getMaxSeedLookahead()
* specConfig.getSlotsPerEpoch()
* specConfig.getSecondsPerSlot();
assertThat(recentChainData.isCloseToInSync(UInt64.valueOf(seconds - 1))).isTrue();
}

@Test
public void isCloseToSync_atBoundary() {
initPostGenesis();
final SpecConfig specConfig = spec.getGenesisSpecConfig();
final int seconds =
specConfig.getMaxSeedLookahead()
* specConfig.getSlotsPerEpoch()
* specConfig.getSecondsPerSlot();
assertThat(recentChainData.isCloseToInSync(UInt64.valueOf(seconds))).isTrue();
}

@Test
public void isCloseToSync_aboveBoundary() {
initPostGenesis();
final SpecConfig specConfig = spec.getGenesisSpecConfig();
final int seconds =
specConfig.getMaxSeedLookahead()
* specConfig.getSlotsPerEpoch()
* specConfig.getSecondsPerSlot();
assertThat(recentChainData.isCloseToInSync(UInt64.valueOf(seconds + 8))).isFalse();
}

@Test
public void getBlockRootBySlotWithHeadRoot_withForkRoot() {
initPostGenesis();
Expand Down