Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid registration of non-active validators #7423

Merged
merged 31 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a86e148
Avoid registration of non-active validators
zilm13 Aug 15, 2023
578b171
Check validator statuses in batches
zilm13 Aug 18, 2023
9504635
Merge branch 'master' into fix/active-validator-registration
zilm13 Aug 18, 2023
fe9b2d1
Merge branch 'master' into fix/active-validator-registration
rolfyone Aug 23, 2023
f2e7c48
Merge branch 'master' into fix/active-validator-registration
zilm13 Aug 24, 2023
21d0a9b
Merge branch 'master' into fix/active-validator-registration
zilm13 Aug 31, 2023
b321a89
Merge branch 'master' into fix/active-validator-registration
zilm13 Sep 6, 2023
4495f7c
Cache validator statuses with ValidatorDataProvider and reuse it in v…
zilm13 Sep 7, 2023
49dbc16
Merge branch 'master' into fix/active-validator-registration
zilm13 Sep 7, 2023
5782d24
Fix integration test for PostRegisterValidator
zilm13 Sep 7, 2023
c2cbc48
Merge branch 'master' into fix/active-validator-registration
zilm13 Sep 20, 2023
039ef31
Remove duplicate validatorStatusProvider instance creation
zilm13 Sep 20, 2023
3b5f701
A lot of finals + getting error message refactoring
zilm13 Sep 20, 2023
c53e8a5
Use Subscribers model rather than Channel for new validator statuses
zilm13 Sep 20, 2023
8388e34
tmp broken
zilm13 Sep 24, 2023
ccd505e
Tmp. Mostly working with tests broken
zilm13 Sep 29, 2023
d9788e5
Merge branch 'master' into fix/active-validator-registration
zilm13 Sep 29, 2023
76385c6
Polishing
zilm13 Sep 29, 2023
e9bfdb2
Fixes + new test
zilm13 Sep 29, 2023
6c71df6
Merge branch 'master' into fix/active-validator-registration
zilm13 Sep 29, 2023
eb61ce5
Some polishing
zilm13 Sep 29, 2023
7ddd4f9
Fixed ValidatorStatusProvider was not subscribed to events
zilm13 Sep 29, 2023
96f2cec
Merge branch 'master' into fix/active-validator-registration
zilm13 Oct 2, 2023
e374caa
Log line updated
zilm13 Oct 2, 2023
a4af095
Merge branch 'master' into fix/active-validator-registration
zilm13 Oct 2, 2023
3198bdf
Refactor validator builder public key with override to use one functi…
zilm13 Oct 2, 2023
010c781
Merge branch 'master' into fix/active-validator-registration
zilm13 Oct 6, 2023
e9537bb
Renaming of several methods and classes + test update
zilm13 Oct 6, 2023
9bc7aef
Fix not doing full registration on possible missing events
zilm13 Oct 6, 2023
98d046a
Use cache in validator statuses when possibleMissingEvents and it's t…
zilm13 Oct 6, 2023
b429dc6
Merge branch 'master' into fix/active-validator-registration
zilm13 Oct 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockContainer;
import tech.pegasys.teku.spec.datastructures.builder.SignedValidatorRegistration;
import tech.pegasys.teku.spec.datastructures.builder.ValidatorRegistration;
import tech.pegasys.teku.spec.datastructures.genesis.GenesisData;
import tech.pegasys.teku.spec.datastructures.operations.Attestation;
import tech.pegasys.teku.spec.datastructures.operations.AttestationData;
Expand Down Expand Up @@ -641,26 +640,8 @@ public SafeFuture<Void> prepareBeaconProposer(
@Override
public SafeFuture<Void> registerValidators(
final SszList<SignedValidatorRegistration> validatorRegistrations) {
final List<BLSPublicKey> validatorIdentifiers =
validatorRegistrations.stream()
.map(SignedValidatorRegistration::getMessage)
.map(ValidatorRegistration::getPublicKey)
.toList();
return getValidatorStatuses(validatorIdentifiers)
.thenCompose(
maybeValidatorStatuses -> {
if (maybeValidatorStatuses.isEmpty()) {
final String errorMessage =
"Couldn't retrieve validator statuses during registering. Most likely the BN is still syncing.";
return SafeFuture.failedFuture(new IllegalStateException(errorMessage));
}
final SszList<SignedValidatorRegistration> applicableValidatorRegistrations =
getApplicableValidatorRegistrations(
validatorRegistrations, maybeValidatorStatuses.get());

return proposersDataManager.updateValidatorRegistrations(
applicableValidatorRegistrations, combinedChainDataClient.getCurrentSlot());
});
return proposersDataManager.updateValidatorRegistrations(
validatorRegistrations, combinedChainDataClient.getCurrentSlot());
}

@Override
Expand Down Expand Up @@ -775,33 +756,6 @@ private Optional<SyncCommitteeDuty> getSyncCommitteeDuty(
state.getValidators().get(validatorIndex).getPublicKey(), validatorIndex, duties));
}

private SszList<SignedValidatorRegistration> getApplicableValidatorRegistrations(
final SszList<SignedValidatorRegistration> validatorRegistrations,
final Map<BLSPublicKey, ValidatorStatus> validatorStatuses) {
final List<SignedValidatorRegistration> applicableValidatorRegistrations =
validatorRegistrations.stream()
.filter(
signedValidatorRegistration -> {
final BLSPublicKey validatorIdentifier =
signedValidatorRegistration.getMessage().getPublicKey();
final boolean unknownOrHasExited =
Optional.ofNullable(validatorStatuses.get(validatorIdentifier))
.map(ValidatorStatus::hasExited)
.orElse(true);
if (unknownOrHasExited) {
LOG.debug(
"Validator {} is unknown or has exited. It will be skipped for registering.",
validatorIdentifier.toAbbreviatedString());
}
return !unknownOrHasExited;
})
.toList();
if (validatorRegistrations.size() == applicableValidatorRegistrations.size()) {
return validatorRegistrations;
}
return validatorRegistrations.getSchema().createFromElements(applicableValidatorRegistrations);
}

private List<ProposerDuty> getProposalSlotsForEpoch(final BeaconState state, final UInt64 epoch) {
final UInt64 epochStartSlot = spec.computeStartSlotAtEpoch(epoch);
final UInt64 startSlot = epochStartSlot.max(GENESIS_SLOT.increment());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,13 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.tuweni.bytes.Bytes32;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import tech.pegasys.teku.api.ChainDataProvider;
import tech.pegasys.teku.api.NodeDataProvider;
Expand All @@ -62,7 +60,6 @@
import tech.pegasys.teku.bls.BLSPublicKey;
import tech.pegasys.teku.bls.BLSSignature;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.SafeFutureAssert;
import tech.pegasys.teku.infrastructure.ssz.SszList;
import tech.pegasys.teku.infrastructure.ssz.SszMutableList;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
Expand Down Expand Up @@ -1059,67 +1056,6 @@ void registerValidators_shouldUpdateRegistrations() {
verify(proposersDataManager).updateValidatorRegistrations(validatorRegistrations, ONE);
}

@Test
void registerValidators_shouldIgnoreExitedAndUnknownValidators() {
final int numOfValidatorRegistrationsAttempted = ValidatorStatus.values().length + 2;

final SszList<SignedValidatorRegistration> validatorRegistrations =
dataStructureUtil.randomSignedValidatorRegistrations(numOfValidatorRegistrationsAttempted);

final Map<BLSPublicKey, ValidatorStatus> knownValidators =
IntStream.range(0, ValidatorStatus.values().length)
.mapToObj(
statusIdx ->
Map.entry(
validatorRegistrations.get(statusIdx).getMessage().getPublicKey(),
ValidatorStatus.values()[statusIdx]))
.collect(Collectors.toUnmodifiableMap(Entry::getKey, Entry::getValue));

final List<BLSPublicKey> exitedOrUnknownKeys =
IntStream.range(
ValidatorStatus.exited_unslashed.ordinal(), numOfValidatorRegistrationsAttempted)
.mapToObj(
statusOrdinal ->
validatorRegistrations.get(statusOrdinal).getMessage().getPublicKey())
.collect(Collectors.toUnmodifiableList());

setupValidatorsState(validatorRegistrations, ValidatorStatus.values().length, knownValidators);

when(chainDataClient.getCurrentSlot()).thenReturn(ONE);

final SafeFuture<Void> result = validatorApiHandler.registerValidators(validatorRegistrations);

assertThat(result).isCompleted();

@SuppressWarnings("unchecked")
final ArgumentCaptor<SszList<SignedValidatorRegistration>> argumentCaptor =
ArgumentCaptor.forClass(SszList.class);

verify(proposersDataManager).updateValidatorRegistrations(argumentCaptor.capture(), eq(ONE));

final SszList<SignedValidatorRegistration> capturedRegistrations = argumentCaptor.getValue();

assertThat(capturedRegistrations)
.hasSize(5)
.map(signedRegistration -> signedRegistration.getMessage().getPublicKey())
.doesNotContainAnyElementsOf(exitedOrUnknownKeys);
}

@Test
void registerValidators_shouldReportErrorIfCannotRetrieveValidatorStatuses() {
final SszList<SignedValidatorRegistration> validatorRegistrations =
dataStructureUtil.randomSignedValidatorRegistrations(4);

when(chainDataProvider.getStateValidators(eq("head"), any(), any()))
.thenReturn(SafeFuture.completedFuture(Optional.empty()));

final SafeFuture<Void> result = validatorApiHandler.registerValidators(validatorRegistrations);

SafeFutureAssert.assertThatSafeFuture(result)
.isCompletedExceptionallyWithMessage(
"Couldn't retrieve validator statuses during registering. Most likely the BN is still syncing.");
}

@Test
public void checkValidatorsDoppelganger_ShouldReturnEmptyResult()
throws ExecutionException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static tech.pegasys.teku.infrastructure.http.HttpStatusCodes.SC_BAD_REQUEST;
Expand All @@ -23,13 +24,20 @@
import static tech.pegasys.teku.spec.schemas.ApiSchemas.SIGNED_VALIDATOR_REGISTRATIONS_SCHEMA;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import okhttp3.Response;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import tech.pegasys.teku.api.response.v1.beacon.ValidatorStatus;
import tech.pegasys.teku.beaconrestapi.AbstractDataBackedRestAPIIntegrationTest;
import tech.pegasys.teku.beaconrestapi.handlers.v1.validator.PostRegisterValidator;
import tech.pegasys.teku.bls.BLSPublicKey;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.json.JsonUtil;
import tech.pegasys.teku.infrastructure.ssz.SszList;
Expand All @@ -44,6 +52,17 @@ public class PostRegisterValidatorTest extends AbstractDataBackedRestAPIIntegrat
void setup() {
startRestAPIAtGenesis(SpecMilestone.BELLATRIX);
dataStructureUtil = new DataStructureUtil(spec);
when(validatorApiChannel.getValidatorStatuses(anyCollection()))
.thenAnswer(
args -> {
final Collection<BLSPublicKey> publicKeys = args.getArgument(0);
final Map<BLSPublicKey, ValidatorStatus> validatorStatuses =
publicKeys.stream()
.collect(
Collectors.toMap(
Function.identity(), __ -> ValidatorStatus.active_ongoing));
return SafeFuture.completedFuture(Optional.of(validatorStatuses));
});
}

@Test
Expand All @@ -52,7 +71,7 @@ void shouldReturnOk() throws IOException {
dataStructureUtil.randomSignedValidatorRegistrations(10);
when(validatorApiChannel.registerValidators(request)).thenReturn(SafeFuture.COMPLETE);

try (Response response =
try (final Response response =
post(
PostRegisterValidator.ROUTE,
JsonUtil.serialize(
Expand All @@ -69,7 +88,25 @@ void shouldReturnServerErrorWhenThereIsAnExceptionWhileRegistering() throws IOEx
when(validatorApiChannel.registerValidators(request))
.thenReturn(SafeFuture.failedFuture(new IllegalStateException("oopsy")));

try (Response response =
try (final Response response =
post(
PostRegisterValidator.ROUTE,
JsonUtil.serialize(
request, SIGNED_VALIDATOR_REGISTRATIONS_SCHEMA.getJsonTypeDefinition()))) {

assertThat(response.code()).isEqualTo(SC_INTERNAL_SERVER_ERROR);
assertThat(response.body().string()).isEqualTo("{\"code\":500,\"message\":\"oopsy\"}");
}
}

@Test
void shouldReturnServerErrorWhenThereIsAnExceptionWhileGettingStatuses() throws IOException {
final SszList<SignedValidatorRegistration> request =
dataStructureUtil.randomSignedValidatorRegistrations(10);
when(validatorApiChannel.getValidatorStatuses(anyCollection()))
.thenReturn(SafeFuture.failedFuture(new IllegalStateException("oopsy")));

try (final Response response =
post(
PostRegisterValidator.ROUTE,
JsonUtil.serialize(
Expand All @@ -84,7 +121,7 @@ void shouldReturnServerErrorWhenThereIsAnExceptionWhileRegistering() throws IOEx
@ValueSource(strings = {"[{}]", "{}", "invalid"})
void shouldReturnBadRequest(final String body) throws IOException {
when(validatorApiChannel.registerValidators(any())).thenReturn(SafeFuture.COMPLETE);
try (Response response = post(PostRegisterValidator.ROUTE, body)) {
try (final Response response = post(PostRegisterValidator.ROUTE, body)) {
assertThat(response.code()).isEqualTo(SC_BAD_REQUEST);
}
verifyNoInteractions(validatorApiChannel);
Expand All @@ -93,7 +130,7 @@ void shouldReturnBadRequest(final String body) throws IOException {
@Test
void shouldHandleEmptyRequest() throws IOException {
when(validatorApiChannel.registerValidators(any())).thenReturn(SafeFuture.COMPLETE);
try (Response response = post(PostRegisterValidator.ROUTE, "[]")) {
try (final Response response = post(PostRegisterValidator.ROUTE, "[]")) {
assertThat(response.code()).isEqualTo(SC_OK);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Optional;
import tech.pegasys.teku.api.DataProvider;
import tech.pegasys.teku.api.ValidatorDataProvider;
import tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil;
import tech.pegasys.teku.infrastructure.restapi.endpoints.AsyncApiResponse;
import tech.pegasys.teku.infrastructure.restapi.endpoints.EndpointMetadata;
import tech.pegasys.teku.infrastructure.restapi.endpoints.RestApiEndpoint;
Expand Down Expand Up @@ -70,7 +71,7 @@ public void handleRequest(final RestApiRequest request) throws JsonProcessingExc
(__, error) -> {
if (error != null) {
return AsyncApiResponse.respondWithError(
SC_INTERNAL_SERVER_ERROR, error.getMessage());
SC_INTERNAL_SERVER_ERROR, ExceptionUtil.getRootCauseMessage(error));
}
return AsyncApiResponse.respondOk(null);
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public SafeFuture<Optional<Attestation>> createAggregate(
}

public SafeFuture<List<SubmitDataError>> sendAggregateAndProofs(
List<SignedAggregateAndProof> aggregateAndProofs) {
final List<SignedAggregateAndProof> aggregateAndProofs) {
return validatorApiChannel.sendAggregateAndProofs(aggregateAndProofs);
}

Expand Down Expand Up @@ -294,13 +294,42 @@ public SafeFuture<Void> sendContributionAndProofs(
}

public SafeFuture<Void> prepareBeaconProposer(
List<BeaconPreparableProposer> beaconPreparableProposers) {
final List<BeaconPreparableProposer> beaconPreparableProposers) {
return validatorApiChannel.prepareBeaconProposer(beaconPreparableProposers);
}

public SafeFuture<Void> registerValidators(
SszList<SignedValidatorRegistration> validatorRegistrations) {
return validatorApiChannel.registerValidators(validatorRegistrations);
final SszList<SignedValidatorRegistration> validatorRegistrations) {
return validatorApiChannel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have 2 doubts here:

1- putting filtering here means that it will only be applied when registering from a remote VC, and it will not be applied in single process.
I see this ValidatorDataProvider as a very thin layer. We should not have business logic unless we really want to differentiate between in process and remote VC handling.

2- if we filter upfront it means that validators in unknown status will not go in our BN data structure. Which means that, if VC doesn't keep calling the registration endpoint, the validator could become known at some point in future. It is an edge case but in theory we loose this info.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for 2-. It is totally irrelevant considering what we are trying to achieve with this PR :)

.getValidatorStatuses(
validatorRegistrations.stream()
.map(registration -> registration.getMessage().getPublicKey())
.toList())
.thenComposeChecked(
maybeValidatorStatuses -> {
if (maybeValidatorStatuses.isEmpty()) {
final String errorMessage =
"Couldn't retrieve validator statuses during registering. Most likely the BN is still syncing.";
return SafeFuture.failedFuture(new IllegalStateException(errorMessage));
}

final List<SignedValidatorRegistration> activeAndPendingValidatorRegistrations =
validatorRegistrations.stream()
.filter(
registration ->
Optional.ofNullable(
maybeValidatorStatuses
.get()
.get(registration.getMessage().getPublicKey()))
.map(status -> !status.hasExited())
.orElse(false))
.toList();

return validatorApiChannel.registerValidators(
validatorRegistrations
.getSchema()
.createFromElements(activeAndPendingValidatorRegistrations));
});
}

public boolean isPhase0Slot(final UInt64 slot) {
Expand Down
Loading