Skip to content

Commit

Permalink
Avoid registration of non-active validators (#7423)
Browse files Browse the repository at this point in the history
* Cache validator statuses with ValidatorDataProvider and reuse it in validator registrations

* Use Subscribers model rather than Channel for new validator statuses

* Refactor validator builder public key with override to use one function in all places

* Fix not doing full registration on possible missing events

* Use cache in validator statuses when possibleMissingEvents and it's the same epoch

---------

Co-authored-by: Paul Harris <[email protected]>
  • Loading branch information
zilm13 and rolfyone authored Oct 10, 2023
1 parent e47d202 commit 35f9747
Show file tree
Hide file tree
Showing 22 changed files with 1,611 additions and 1,111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockContainer;
import tech.pegasys.teku.spec.datastructures.builder.SignedValidatorRegistration;
import tech.pegasys.teku.spec.datastructures.builder.ValidatorRegistration;
import tech.pegasys.teku.spec.datastructures.genesis.GenesisData;
import tech.pegasys.teku.spec.datastructures.operations.Attestation;
import tech.pegasys.teku.spec.datastructures.operations.AttestationData;
Expand Down Expand Up @@ -641,26 +640,8 @@ public SafeFuture<Void> prepareBeaconProposer(
@Override
public SafeFuture<Void> registerValidators(
final SszList<SignedValidatorRegistration> validatorRegistrations) {
final List<BLSPublicKey> validatorIdentifiers =
validatorRegistrations.stream()
.map(SignedValidatorRegistration::getMessage)
.map(ValidatorRegistration::getPublicKey)
.toList();
return getValidatorStatuses(validatorIdentifiers)
.thenCompose(
maybeValidatorStatuses -> {
if (maybeValidatorStatuses.isEmpty()) {
final String errorMessage =
"Couldn't retrieve validator statuses during registering. Most likely the BN is still syncing.";
return SafeFuture.failedFuture(new IllegalStateException(errorMessage));
}
final SszList<SignedValidatorRegistration> applicableValidatorRegistrations =
getApplicableValidatorRegistrations(
validatorRegistrations, maybeValidatorStatuses.get());

return proposersDataManager.updateValidatorRegistrations(
applicableValidatorRegistrations, combinedChainDataClient.getCurrentSlot());
});
return proposersDataManager.updateValidatorRegistrations(
validatorRegistrations, combinedChainDataClient.getCurrentSlot());
}

@Override
Expand Down Expand Up @@ -775,33 +756,6 @@ private Optional<SyncCommitteeDuty> getSyncCommitteeDuty(
state.getValidators().get(validatorIndex).getPublicKey(), validatorIndex, duties));
}

private SszList<SignedValidatorRegistration> getApplicableValidatorRegistrations(
final SszList<SignedValidatorRegistration> validatorRegistrations,
final Map<BLSPublicKey, ValidatorStatus> validatorStatuses) {
final List<SignedValidatorRegistration> applicableValidatorRegistrations =
validatorRegistrations.stream()
.filter(
signedValidatorRegistration -> {
final BLSPublicKey validatorIdentifier =
signedValidatorRegistration.getMessage().getPublicKey();
final boolean unknownOrHasExited =
Optional.ofNullable(validatorStatuses.get(validatorIdentifier))
.map(ValidatorStatus::hasExited)
.orElse(true);
if (unknownOrHasExited) {
LOG.debug(
"Validator {} is unknown or has exited. It will be skipped for registering.",
validatorIdentifier.toAbbreviatedString());
}
return !unknownOrHasExited;
})
.toList();
if (validatorRegistrations.size() == applicableValidatorRegistrations.size()) {
return validatorRegistrations;
}
return validatorRegistrations.getSchema().createFromElements(applicableValidatorRegistrations);
}

private List<ProposerDuty> getProposalSlotsForEpoch(final BeaconState state, final UInt64 epoch) {
final UInt64 epochStartSlot = spec.computeStartSlotAtEpoch(epoch);
final UInt64 startSlot = epochStartSlot.max(GENESIS_SLOT.increment());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,13 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.tuweni.bytes.Bytes32;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import tech.pegasys.teku.api.ChainDataProvider;
import tech.pegasys.teku.api.NodeDataProvider;
Expand All @@ -62,7 +60,6 @@
import tech.pegasys.teku.bls.BLSPublicKey;
import tech.pegasys.teku.bls.BLSSignature;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.SafeFutureAssert;
import tech.pegasys.teku.infrastructure.ssz.SszList;
import tech.pegasys.teku.infrastructure.ssz.SszMutableList;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
Expand Down Expand Up @@ -1059,67 +1056,6 @@ void registerValidators_shouldUpdateRegistrations() {
verify(proposersDataManager).updateValidatorRegistrations(validatorRegistrations, ONE);
}

@Test
void registerValidators_shouldIgnoreExitedAndUnknownValidators() {
final int numOfValidatorRegistrationsAttempted = ValidatorStatus.values().length + 2;

final SszList<SignedValidatorRegistration> validatorRegistrations =
dataStructureUtil.randomSignedValidatorRegistrations(numOfValidatorRegistrationsAttempted);

final Map<BLSPublicKey, ValidatorStatus> knownValidators =
IntStream.range(0, ValidatorStatus.values().length)
.mapToObj(
statusIdx ->
Map.entry(
validatorRegistrations.get(statusIdx).getMessage().getPublicKey(),
ValidatorStatus.values()[statusIdx]))
.collect(Collectors.toUnmodifiableMap(Entry::getKey, Entry::getValue));

final List<BLSPublicKey> exitedOrUnknownKeys =
IntStream.range(
ValidatorStatus.exited_unslashed.ordinal(), numOfValidatorRegistrationsAttempted)
.mapToObj(
statusOrdinal ->
validatorRegistrations.get(statusOrdinal).getMessage().getPublicKey())
.collect(Collectors.toUnmodifiableList());

setupValidatorsState(validatorRegistrations, ValidatorStatus.values().length, knownValidators);

when(chainDataClient.getCurrentSlot()).thenReturn(ONE);

final SafeFuture<Void> result = validatorApiHandler.registerValidators(validatorRegistrations);

assertThat(result).isCompleted();

@SuppressWarnings("unchecked")
final ArgumentCaptor<SszList<SignedValidatorRegistration>> argumentCaptor =
ArgumentCaptor.forClass(SszList.class);

verify(proposersDataManager).updateValidatorRegistrations(argumentCaptor.capture(), eq(ONE));

final SszList<SignedValidatorRegistration> capturedRegistrations = argumentCaptor.getValue();

assertThat(capturedRegistrations)
.hasSize(5)
.map(signedRegistration -> signedRegistration.getMessage().getPublicKey())
.doesNotContainAnyElementsOf(exitedOrUnknownKeys);
}

@Test
void registerValidators_shouldReportErrorIfCannotRetrieveValidatorStatuses() {
final SszList<SignedValidatorRegistration> validatorRegistrations =
dataStructureUtil.randomSignedValidatorRegistrations(4);

when(chainDataProvider.getStateValidators(eq("head"), any(), any()))
.thenReturn(SafeFuture.completedFuture(Optional.empty()));

final SafeFuture<Void> result = validatorApiHandler.registerValidators(validatorRegistrations);

SafeFutureAssert.assertThatSafeFuture(result)
.isCompletedExceptionallyWithMessage(
"Couldn't retrieve validator statuses during registering. Most likely the BN is still syncing.");
}

@Test
public void checkValidatorsDoppelganger_ShouldReturnEmptyResult()
throws ExecutionException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static tech.pegasys.teku.infrastructure.http.HttpStatusCodes.SC_BAD_REQUEST;
Expand All @@ -23,13 +24,20 @@
import static tech.pegasys.teku.spec.schemas.ApiSchemas.SIGNED_VALIDATOR_REGISTRATIONS_SCHEMA;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import okhttp3.Response;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import tech.pegasys.teku.api.response.v1.beacon.ValidatorStatus;
import tech.pegasys.teku.beaconrestapi.AbstractDataBackedRestAPIIntegrationTest;
import tech.pegasys.teku.beaconrestapi.handlers.v1.validator.PostRegisterValidator;
import tech.pegasys.teku.bls.BLSPublicKey;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.json.JsonUtil;
import tech.pegasys.teku.infrastructure.ssz.SszList;
Expand All @@ -44,6 +52,17 @@ public class PostRegisterValidatorTest extends AbstractDataBackedRestAPIIntegrat
void setup() {
startRestAPIAtGenesis(SpecMilestone.BELLATRIX);
dataStructureUtil = new DataStructureUtil(spec);
when(validatorApiChannel.getValidatorStatuses(anyCollection()))
.thenAnswer(
args -> {
final Collection<BLSPublicKey> publicKeys = args.getArgument(0);
final Map<BLSPublicKey, ValidatorStatus> validatorStatuses =
publicKeys.stream()
.collect(
Collectors.toMap(
Function.identity(), __ -> ValidatorStatus.active_ongoing));
return SafeFuture.completedFuture(Optional.of(validatorStatuses));
});
}

@Test
Expand All @@ -52,7 +71,7 @@ void shouldReturnOk() throws IOException {
dataStructureUtil.randomSignedValidatorRegistrations(10);
when(validatorApiChannel.registerValidators(request)).thenReturn(SafeFuture.COMPLETE);

try (Response response =
try (final Response response =
post(
PostRegisterValidator.ROUTE,
JsonUtil.serialize(
Expand All @@ -69,7 +88,25 @@ void shouldReturnServerErrorWhenThereIsAnExceptionWhileRegistering() throws IOEx
when(validatorApiChannel.registerValidators(request))
.thenReturn(SafeFuture.failedFuture(new IllegalStateException("oopsy")));

try (Response response =
try (final Response response =
post(
PostRegisterValidator.ROUTE,
JsonUtil.serialize(
request, SIGNED_VALIDATOR_REGISTRATIONS_SCHEMA.getJsonTypeDefinition()))) {

assertThat(response.code()).isEqualTo(SC_INTERNAL_SERVER_ERROR);
assertThat(response.body().string()).isEqualTo("{\"code\":500,\"message\":\"oopsy\"}");
}
}

@Test
void shouldReturnServerErrorWhenThereIsAnExceptionWhileGettingStatuses() throws IOException {
final SszList<SignedValidatorRegistration> request =
dataStructureUtil.randomSignedValidatorRegistrations(10);
when(validatorApiChannel.getValidatorStatuses(anyCollection()))
.thenReturn(SafeFuture.failedFuture(new IllegalStateException("oopsy")));

try (final Response response =
post(
PostRegisterValidator.ROUTE,
JsonUtil.serialize(
Expand All @@ -84,7 +121,7 @@ void shouldReturnServerErrorWhenThereIsAnExceptionWhileRegistering() throws IOEx
@ValueSource(strings = {"[{}]", "{}", "invalid"})
void shouldReturnBadRequest(final String body) throws IOException {
when(validatorApiChannel.registerValidators(any())).thenReturn(SafeFuture.COMPLETE);
try (Response response = post(PostRegisterValidator.ROUTE, body)) {
try (final Response response = post(PostRegisterValidator.ROUTE, body)) {
assertThat(response.code()).isEqualTo(SC_BAD_REQUEST);
}
verifyNoInteractions(validatorApiChannel);
Expand All @@ -93,7 +130,7 @@ void shouldReturnBadRequest(final String body) throws IOException {
@Test
void shouldHandleEmptyRequest() throws IOException {
when(validatorApiChannel.registerValidators(any())).thenReturn(SafeFuture.COMPLETE);
try (Response response = post(PostRegisterValidator.ROUTE, "[]")) {
try (final Response response = post(PostRegisterValidator.ROUTE, "[]")) {
assertThat(response.code()).isEqualTo(SC_OK);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Optional;
import tech.pegasys.teku.api.DataProvider;
import tech.pegasys.teku.api.ValidatorDataProvider;
import tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil;
import tech.pegasys.teku.infrastructure.restapi.endpoints.AsyncApiResponse;
import tech.pegasys.teku.infrastructure.restapi.endpoints.EndpointMetadata;
import tech.pegasys.teku.infrastructure.restapi.endpoints.RestApiEndpoint;
Expand Down Expand Up @@ -70,7 +71,7 @@ public void handleRequest(final RestApiRequest request) throws JsonProcessingExc
(__, error) -> {
if (error != null) {
return AsyncApiResponse.respondWithError(
SC_INTERNAL_SERVER_ERROR, error.getMessage());
SC_INTERNAL_SERVER_ERROR, ExceptionUtil.getRootCauseMessage(error));
}
return AsyncApiResponse.respondOk(null);
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public SafeFuture<Optional<Attestation>> createAggregate(
}

public SafeFuture<List<SubmitDataError>> sendAggregateAndProofs(
List<SignedAggregateAndProof> aggregateAndProofs) {
final List<SignedAggregateAndProof> aggregateAndProofs) {
return validatorApiChannel.sendAggregateAndProofs(aggregateAndProofs);
}

Expand Down Expand Up @@ -294,13 +294,42 @@ public SafeFuture<Void> sendContributionAndProofs(
}

public SafeFuture<Void> prepareBeaconProposer(
List<BeaconPreparableProposer> beaconPreparableProposers) {
final List<BeaconPreparableProposer> beaconPreparableProposers) {
return validatorApiChannel.prepareBeaconProposer(beaconPreparableProposers);
}

public SafeFuture<Void> registerValidators(
SszList<SignedValidatorRegistration> validatorRegistrations) {
return validatorApiChannel.registerValidators(validatorRegistrations);
final SszList<SignedValidatorRegistration> validatorRegistrations) {
return validatorApiChannel
.getValidatorStatuses(
validatorRegistrations.stream()
.map(registration -> registration.getMessage().getPublicKey())
.toList())
.thenComposeChecked(
maybeValidatorStatuses -> {
if (maybeValidatorStatuses.isEmpty()) {
final String errorMessage =
"Couldn't retrieve validator statuses during registering. Most likely the BN is still syncing.";
return SafeFuture.failedFuture(new IllegalStateException(errorMessage));
}

final List<SignedValidatorRegistration> activeAndPendingValidatorRegistrations =
validatorRegistrations.stream()
.filter(
registration ->
Optional.ofNullable(
maybeValidatorStatuses
.get()
.get(registration.getMessage().getPublicKey()))
.map(status -> !status.hasExited())
.orElse(false))
.toList();

return validatorApiChannel.registerValidators(
validatorRegistrations
.getSchema()
.createFromElements(activeAndPendingValidatorRegistrations));
});
}

public boolean isPhase0Slot(final UInt64 slot) {
Expand Down
Loading

0 comments on commit 35f9747

Please sign in to comment.