diff --git a/validator/remote/src/main/java/tech/pegasys/teku/validator/remote/BeaconNodeReadinessChannel.java b/validator/remote/src/main/java/tech/pegasys/teku/validator/remote/BeaconNodeReadinessChannel.java index 5f81beab75b..a9effabfa3f 100644 --- a/validator/remote/src/main/java/tech/pegasys/teku/validator/remote/BeaconNodeReadinessChannel.java +++ b/validator/remote/src/main/java/tech/pegasys/teku/validator/remote/BeaconNodeReadinessChannel.java @@ -25,5 +25,5 @@ public interface BeaconNodeReadinessChannel extends VoidReturningChannelInterfac void onFailoverNodeNotReady(RemoteValidatorApiChannel failoverNotReady); - void onPrimaryNodeBackReady(); + void onPrimaryNodeReady(); } diff --git a/validator/remote/src/main/java/tech/pegasys/teku/validator/remote/BeaconNodeReadinessManager.java b/validator/remote/src/main/java/tech/pegasys/teku/validator/remote/BeaconNodeReadinessManager.java index bf92de59b56..689e1414bd1 100644 --- a/validator/remote/src/main/java/tech/pegasys/teku/validator/remote/BeaconNodeReadinessManager.java +++ b/validator/remote/src/main/java/tech/pegasys/teku/validator/remote/BeaconNodeReadinessManager.java @@ -14,6 +14,7 @@ package tech.pegasys.teku.validator.remote; import com.google.common.collect.Maps; +import com.google.common.collect.Streams; import java.time.Duration; import java.util.Comparator; import java.util.Iterator; @@ -73,6 +74,10 @@ public boolean isReady(final RemoteValidatorApiChannel beaconNodeApi) { return readinessStatus.isReady(); } + public ReadinessStatus getReadinessStatus(final RemoteValidatorApiChannel beaconNodeApi) { + return readinessStatusCache.getOrDefault(beaconNodeApi, ReadinessStatus.READY); + } + public int getReadinessStatusWeight(final RemoteValidatorApiChannel beaconNodeApi) { return readinessStatusCache.getOrDefault(beaconNodeApi, ReadinessStatus.READY).weight; } @@ -143,7 +148,8 @@ private SafeFuture performReadinessCheckAgainstAllNodes() { final SafeFuture primaryReadinessCheck = performPrimaryReadinessCheck(); final Stream> failoverReadinessChecks = failoverBeaconNodeApis.stream().map(this::performFailoverReadinessCheck); - return SafeFuture.allOf(primaryReadinessCheck, SafeFuture.allOf(failoverReadinessChecks)); + return SafeFuture.allOf( + Streams.concat(Stream.of(primaryReadinessCheck), failoverReadinessChecks)); } private SafeFuture performFailoverReadinessCheck(final RemoteValidatorApiChannel failover) { @@ -192,9 +198,10 @@ private void processReadyResult(final boolean isPrimaryNode) { if (!isPrimaryNode) { return; } + // Filtering of duplicates if needed happens on receiver's side + beaconNodeReadinessChannel.onPrimaryNodeReady(); if (latestPrimaryNodeReadiness.compareAndSet(false, true)) { validatorLogger.primaryBeaconNodeIsBackAndReady(); - beaconNodeReadinessChannel.onPrimaryNodeBackReady(); } } diff --git a/validator/remote/src/main/java/tech/pegasys/teku/validator/remote/eventsource/EventSourceBeaconChainEventAdapter.java b/validator/remote/src/main/java/tech/pegasys/teku/validator/remote/eventsource/EventSourceBeaconChainEventAdapter.java index c555e0bd88a..3515253d90a 100644 --- a/validator/remote/src/main/java/tech/pegasys/teku/validator/remote/eventsource/EventSourceBeaconChainEventAdapter.java +++ b/validator/remote/src/main/java/tech/pegasys/teku/validator/remote/eventsource/EventSourceBeaconChainEventAdapter.java @@ -14,6 +14,7 @@ package tech.pegasys.teku.validator.remote.eventsource; import static java.util.Collections.emptyMap; +import static tech.pegasys.teku.validator.remote.BeaconNodeReadinessManager.ReadinessStatus.READY; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -134,7 +135,7 @@ public void onFailoverNodeNotReady(final RemoteValidatorApiChannel failoverNotIn } @Override - public void onPrimaryNodeBackReady() { + public void onPrimaryNodeReady() { if (!currentEventStreamHasSameEndpoint(primaryBeaconNodeApi)) { switchBackToPrimaryEventStream(); } @@ -181,6 +182,12 @@ private synchronized boolean switchToFailoverEventStreamIfAvailable() { if (failoverBeaconNodeApis.isEmpty()) { return false; } + // No need to change anything if current node is READY + if (beaconNodeReadinessManager + .getReadinessStatus(currentBeaconNodeUsedForEventStreaming) + .equals(READY)) { + return false; + } return findReadyFailoverAndSwitch(); } diff --git a/validator/remote/src/test/java/tech/pegasys/teku/validator/remote/BeaconNodeReadinessManagerTest.java b/validator/remote/src/test/java/tech/pegasys/teku/validator/remote/BeaconNodeReadinessManagerTest.java index 81b0210ef77..57fa7961369 100644 --- a/validator/remote/src/test/java/tech/pegasys/teku/validator/remote/BeaconNodeReadinessManagerTest.java +++ b/validator/remote/src/test/java/tech/pegasys/teku/validator/remote/BeaconNodeReadinessManagerTest.java @@ -15,6 +15,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @@ -128,7 +129,8 @@ public void retrievesReadinessAndPublishesToAChannel() { assertThat(beaconNodeReadinessManager.isReady(beaconNodeApi)).isTrue(); verify(validatorLogger).primaryBeaconNodeIsBackAndReady(); - verify(beaconNodeReadinessChannel).onPrimaryNodeBackReady(); + // call it every time we check, channel will filter it + verify(beaconNodeReadinessChannel, times(2)).onPrimaryNodeReady(); } @Test diff --git a/validator/remote/src/test/java/tech/pegasys/teku/validator/remote/eventsource/EventSourceBeaconChainEventAdapterTest.java b/validator/remote/src/test/java/tech/pegasys/teku/validator/remote/eventsource/EventSourceBeaconChainEventAdapterTest.java index 912b6c32916..df1a5b0b39f 100644 --- a/validator/remote/src/test/java/tech/pegasys/teku/validator/remote/eventsource/EventSourceBeaconChainEventAdapterTest.java +++ b/validator/remote/src/test/java/tech/pegasys/teku/validator/remote/eventsource/EventSourceBeaconChainEventAdapterTest.java @@ -16,6 +16,7 @@ import static java.util.Collections.emptyMap; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -33,9 +34,11 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import tech.pegasys.teku.api.response.v1.EventType; +import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.logging.ValidatorLogger; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.validator.api.ValidatorTimingChannel; +import tech.pegasys.teku.validator.api.required.SyncingStatus; import tech.pegasys.teku.validator.beaconnode.BeaconChainEventAdapter; import tech.pegasys.teku.validator.remote.BeaconNodeReadinessManager; import tech.pegasys.teku.validator.remote.RemoteValidatorApiChannel; @@ -97,6 +100,45 @@ public void performsPrimaryReadinessCheckWhenFailoverNotReadyAndNoOtherFailovers verify(beaconNodeReadinessManager).performPrimaryReadinessCheck(); } + @Test + public void doNotSwitchToFailoverWhenCurrentBeaconNodeIsReady() { + final BeaconNodeReadinessManager beaconNodeReadinessManager = + mock(BeaconNodeReadinessManager.class); + final RemoteValidatorApiChannel primaryNode = mock(RemoteValidatorApiChannel.class); + final RemoteValidatorApiChannel failover1 = mock(RemoteValidatorApiChannel.class); + final RemoteValidatorApiChannel failover2 = mock(RemoteValidatorApiChannel.class); + final EventSourceBeaconChainEventAdapter eventSourceBeaconChainEventAdapter = + new EventSourceBeaconChainEventAdapter( + beaconNodeReadinessManager, + primaryNode, + List.of(failover1, failover2), + mock(OkHttpClient.class), + mock(ValidatorLogger.class), + mock(BeaconChainEventAdapter.class), + mock(ValidatorTimingChannel.class), + metricsSystemMock, + true, + false, + mock(Spec.class)); + + eventSourceBeaconChainEventAdapter.currentBeaconNodeUsedForEventStreaming = failover1; + + when(beaconNodeReadinessManager.getReadinessStatus(failover1)) + .thenReturn(BeaconNodeReadinessManager.ReadinessStatus.READY); + final SafeFuture someFuture = new SafeFuture<>(); + when(primaryNode.getSyncingStatus()).thenReturn(someFuture); + eventSourceBeaconChainEventAdapter.onFailoverNodeNotReady(failover1); + + verify(beaconNodeReadinessManager).getReadinessStatus(failover1); + // Shouldn't try failover2 when failover1 is good + verify(beaconNodeReadinessManager, never()).getReadinessStatus(failover2); + verify(beaconNodeReadinessManager, never()).getReadinessStatusWeight(failover2); + verify(beaconNodeReadinessManager, never()).isReady(any()); + + // But will try to return to primaryNode when it's possible + verify(beaconNodeReadinessManager).performPrimaryReadinessCheck(); + } + private EventSourceBeaconChainEventAdapter initEventSourceBeaconChainEventAdapter( final boolean shutdownWhenValidatorSlashedEnabled) { return new EventSourceBeaconChainEventAdapter(