Skip to content

Commit

Permalink
Improve failover robustness (#8405)
Browse files Browse the repository at this point in the history
* Improve failover robustness

* Update primary node ready callback name as per feedback

* Add test on failovers
  • Loading branch information
zilm13 authored Jun 27, 2024
1 parent d5056b0 commit 51a0488
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ public interface BeaconNodeReadinessChannel extends VoidReturningChannelInterfac

void onFailoverNodeNotReady(RemoteValidatorApiChannel failoverNotReady);

void onPrimaryNodeBackReady();
void onPrimaryNodeReady();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tech.pegasys.teku.validator.remote;

import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
import java.time.Duration;
import java.util.Comparator;
import java.util.Iterator;
Expand Down Expand Up @@ -73,6 +74,10 @@ public boolean isReady(final RemoteValidatorApiChannel beaconNodeApi) {
return readinessStatus.isReady();
}

public ReadinessStatus getReadinessStatus(final RemoteValidatorApiChannel beaconNodeApi) {
return readinessStatusCache.getOrDefault(beaconNodeApi, ReadinessStatus.READY);
}

public int getReadinessStatusWeight(final RemoteValidatorApiChannel beaconNodeApi) {
return readinessStatusCache.getOrDefault(beaconNodeApi, ReadinessStatus.READY).weight;
}
Expand Down Expand Up @@ -143,7 +148,8 @@ private SafeFuture<Void> performReadinessCheckAgainstAllNodes() {
final SafeFuture<Void> primaryReadinessCheck = performPrimaryReadinessCheck();
final Stream<SafeFuture<?>> failoverReadinessChecks =
failoverBeaconNodeApis.stream().map(this::performFailoverReadinessCheck);
return SafeFuture.allOf(primaryReadinessCheck, SafeFuture.allOf(failoverReadinessChecks));
return SafeFuture.allOf(
Streams.concat(Stream.of(primaryReadinessCheck), failoverReadinessChecks));
}

private SafeFuture<Void> performFailoverReadinessCheck(final RemoteValidatorApiChannel failover) {
Expand Down Expand Up @@ -192,9 +198,10 @@ private void processReadyResult(final boolean isPrimaryNode) {
if (!isPrimaryNode) {
return;
}
// Filtering of duplicates if needed happens on receiver's side
beaconNodeReadinessChannel.onPrimaryNodeReady();
if (latestPrimaryNodeReadiness.compareAndSet(false, true)) {
validatorLogger.primaryBeaconNodeIsBackAndReady();
beaconNodeReadinessChannel.onPrimaryNodeBackReady();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tech.pegasys.teku.validator.remote.eventsource;

import static java.util.Collections.emptyMap;
import static tech.pegasys.teku.validator.remote.BeaconNodeReadinessManager.ReadinessStatus.READY;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -134,7 +135,7 @@ public void onFailoverNodeNotReady(final RemoteValidatorApiChannel failoverNotIn
}

@Override
public void onPrimaryNodeBackReady() {
public void onPrimaryNodeReady() {
if (!currentEventStreamHasSameEndpoint(primaryBeaconNodeApi)) {
switchBackToPrimaryEventStream();
}
Expand Down Expand Up @@ -181,6 +182,12 @@ private synchronized boolean switchToFailoverEventStreamIfAvailable() {
if (failoverBeaconNodeApis.isEmpty()) {
return false;
}
// No need to change anything if current node is READY
if (beaconNodeReadinessManager
.getReadinessStatus(currentBeaconNodeUsedForEventStreaming)
.equals(READY)) {
return false;
}
return findReadyFailoverAndSwitch();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -128,7 +129,8 @@ public void retrievesReadinessAndPublishesToAChannel() {
assertThat(beaconNodeReadinessManager.isReady(beaconNodeApi)).isTrue();

verify(validatorLogger).primaryBeaconNodeIsBackAndReady();
verify(beaconNodeReadinessChannel).onPrimaryNodeBackReady();
// call it every time we check, channel will filter it
verify(beaconNodeReadinessChannel, times(2)).onPrimaryNodeReady();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static java.util.Collections.emptyMap;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -33,9 +34,11 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import tech.pegasys.teku.api.response.v1.EventType;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.logging.ValidatorLogger;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.validator.api.ValidatorTimingChannel;
import tech.pegasys.teku.validator.api.required.SyncingStatus;
import tech.pegasys.teku.validator.beaconnode.BeaconChainEventAdapter;
import tech.pegasys.teku.validator.remote.BeaconNodeReadinessManager;
import tech.pegasys.teku.validator.remote.RemoteValidatorApiChannel;
Expand Down Expand Up @@ -97,6 +100,45 @@ public void performsPrimaryReadinessCheckWhenFailoverNotReadyAndNoOtherFailovers
verify(beaconNodeReadinessManager).performPrimaryReadinessCheck();
}

@Test
public void doNotSwitchToFailoverWhenCurrentBeaconNodeIsReady() {
final BeaconNodeReadinessManager beaconNodeReadinessManager =
mock(BeaconNodeReadinessManager.class);
final RemoteValidatorApiChannel primaryNode = mock(RemoteValidatorApiChannel.class);
final RemoteValidatorApiChannel failover1 = mock(RemoteValidatorApiChannel.class);
final RemoteValidatorApiChannel failover2 = mock(RemoteValidatorApiChannel.class);
final EventSourceBeaconChainEventAdapter eventSourceBeaconChainEventAdapter =
new EventSourceBeaconChainEventAdapter(
beaconNodeReadinessManager,
primaryNode,
List.of(failover1, failover2),
mock(OkHttpClient.class),
mock(ValidatorLogger.class),
mock(BeaconChainEventAdapter.class),
mock(ValidatorTimingChannel.class),
metricsSystemMock,
true,
false,
mock(Spec.class));

eventSourceBeaconChainEventAdapter.currentBeaconNodeUsedForEventStreaming = failover1;

when(beaconNodeReadinessManager.getReadinessStatus(failover1))
.thenReturn(BeaconNodeReadinessManager.ReadinessStatus.READY);
final SafeFuture<SyncingStatus> someFuture = new SafeFuture<>();
when(primaryNode.getSyncingStatus()).thenReturn(someFuture);
eventSourceBeaconChainEventAdapter.onFailoverNodeNotReady(failover1);

verify(beaconNodeReadinessManager).getReadinessStatus(failover1);
// Shouldn't try failover2 when failover1 is good
verify(beaconNodeReadinessManager, never()).getReadinessStatus(failover2);
verify(beaconNodeReadinessManager, never()).getReadinessStatusWeight(failover2);
verify(beaconNodeReadinessManager, never()).isReady(any());

// But will try to return to primaryNode when it's possible
verify(beaconNodeReadinessManager).performPrimaryReadinessCheck();
}

private EventSourceBeaconChainEventAdapter initEventSourceBeaconChainEventAdapter(
final boolean shutdownWhenValidatorSlashedEnabled) {
return new EventSourceBeaconChainEventAdapter(
Expand Down

0 comments on commit 51a0488

Please sign in to comment.