Skip to content

Commit

Permalink
Broadcast validator revamped (Consensys#7689)
Browse files Browse the repository at this point in the history
  • Loading branch information
tbenr authored Nov 13, 2023
1 parent 291d10f commit 657cc3f
Show file tree
Hide file tree
Showing 20 changed files with 704 additions and 417 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockContainer;
import tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel;
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult;
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult.FailureReason;
import tech.pegasys.teku.statetransition.block.BlockImportChannel;
import tech.pegasys.teku.statetransition.block.BlockImportChannel.BlockImportAndBroadcastValidationResults;
import tech.pegasys.teku.statetransition.validation.BlockValidator.BroadcastValidationResult;
import tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator.BroadcastValidationResult;
import tech.pegasys.teku.validator.api.SendSignedBlockResult;
import tech.pegasys.teku.validator.coordinator.BlockFactory;
import tech.pegasys.teku.validator.coordinator.DutyMetrics;
Expand Down Expand Up @@ -59,41 +58,67 @@ public SafeFuture<SendSignedBlockResult> sendSignedBlock(
.thenCompose(
signedBlockContainer ->
gossipAndImportUnblindedSignedBlock(signedBlockContainer, broadcastValidationLevel))
.thenApply(
result -> {
if (result.isSuccessful()) {
LOG.trace(
"Successfully imported proposed block: {}",
maybeBlindedBlockContainer.getSignedBlock().toLogString());
dutyMetrics.onBlockPublished(maybeBlindedBlockContainer.getSlot());
return SendSignedBlockResult.success(maybeBlindedBlockContainer.getRoot());
} else if (result.getFailureReason() == FailureReason.BLOCK_IS_FROM_FUTURE) {
LOG.debug(
"Delayed processing proposed block {} because it is from the future",
maybeBlindedBlockContainer.getSignedBlock().toLogString());
dutyMetrics.onBlockPublished(maybeBlindedBlockContainer.getSlot());
return SendSignedBlockResult.notImported(result.getFailureReason().name());
} else {
VALIDATOR_LOGGER.proposedBlockImportFailed(
result.getFailureReason().toString(),
maybeBlindedBlockContainer.getSlot(),
maybeBlindedBlockContainer.getRoot(),
result.getFailureCause());
.thenCompose(result -> calculateResult(maybeBlindedBlockContainer, result));
}

return SendSignedBlockResult.notImported(result.getFailureReason().name());
private SafeFuture<SendSignedBlockResult> calculateResult(
final SignedBlockContainer maybeBlindedBlockContainer,
final BlockImportAndBroadcastValidationResults blockImportAndBroadcastValidationResults) {

// broadcast validation can fail earlier than block import.
// The assumption is that in that block import will fail but not as fast
// (there might be the state transition in progress)
// Thus, to let the API return as soon as possible, let's check broadcast validation first.
return blockImportAndBroadcastValidationResults
.broadcastValidationResult()
.thenCompose(
broadcastValidationResult -> {
if (broadcastValidationResult.isFailure()) {
return SafeFuture.completedFuture(
SendSignedBlockResult.rejected(
"Broadcast validation failed: " + broadcastValidationResult.name()));
}

return blockImportAndBroadcastValidationResults
.blockImportResult()
.thenApply(
importResult -> {
if (importResult.isSuccessful()) {
LOG.trace(
"Successfully imported proposed block: {}",
maybeBlindedBlockContainer.getSignedBlock().toLogString());
dutyMetrics.onBlockPublished(maybeBlindedBlockContainer.getSlot());
return SendSignedBlockResult.success(
maybeBlindedBlockContainer.getRoot());
}
if (importResult.getFailureReason() == FailureReason.BLOCK_IS_FROM_FUTURE) {
LOG.debug(
"Delayed processing proposed block {} because it is from the future",
maybeBlindedBlockContainer.getSignedBlock().toLogString());
dutyMetrics.onBlockPublished(maybeBlindedBlockContainer.getSlot());
return SendSignedBlockResult.notImported(
importResult.getFailureReason().name());
}
VALIDATOR_LOGGER.proposedBlockImportFailed(
importResult.getFailureReason().toString(),
maybeBlindedBlockContainer.getSlot(),
maybeBlindedBlockContainer.getRoot(),
importResult.getFailureCause());

return SendSignedBlockResult.notImported(
importResult.getFailureReason().name());
});
});
}

private SafeFuture<BlockImportResult> gossipAndImportUnblindedSignedBlock(
private SafeFuture<BlockImportAndBroadcastValidationResults> gossipAndImportUnblindedSignedBlock(
final SignedBlockContainer blockContainer,
final BroadcastValidationLevel broadcastValidationLevel) {

if (broadcastValidationLevel == BroadcastValidationLevel.NOT_REQUIRED) {
// when broadcast validation is disabled, we can publish the block immediately and then import
publishBlock(blockContainer);
return importBlock(blockContainer, broadcastValidationLevel)
.thenCompose(BlockImportAndBroadcastValidationResults::blockImportResult);
return importBlock(blockContainer, broadcastValidationLevel);
}

// when broadcast validation is enabled, we need to wait for the validation to complete before
Expand All @@ -104,7 +129,7 @@ private SafeFuture<BlockImportResult> gossipAndImportUnblindedSignedBlock(
importBlock(blockContainer, broadcastValidationLevel);

blockImportAndBroadcastValidationResults
.thenCompose(results -> results.broadcastValidationResult().orElseThrow())
.thenCompose(BlockImportAndBroadcastValidationResults::broadcastValidationResult)
.thenAccept(
broadcastValidationResult -> {
if (broadcastValidationResult == BroadcastValidationResult.SUCCESS) {
Expand All @@ -124,8 +149,7 @@ private SafeFuture<BlockImportResult> gossipAndImportUnblindedSignedBlock(
blockContainer.getSlot(),
err));

return blockImportAndBroadcastValidationResults.thenCompose(
BlockImportAndBroadcastValidationResults::blockImportResult);
return blockImportAndBroadcastValidationResults;
}

abstract SafeFuture<BlockImportAndBroadcastValidationResults> importBlock(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,6 @@ private SafeFuture<BlockImportAndBroadcastValidationResults> prepareBlockImportR
final BlockImportResult blockImportResult) {
return SafeFuture.completedFuture(
new BlockImportAndBroadcastValidationResults(
SafeFuture.completedFuture(blockImportResult), Optional.empty()));
SafeFuture.completedFuture(blockImportResult)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.mockito.Mockito.when;
import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.assertThatSafeFuture;

import java.util.Optional;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
Expand All @@ -32,7 +31,7 @@
import tech.pegasys.teku.spec.util.DataStructureUtil;
import tech.pegasys.teku.statetransition.block.BlockImportChannel;
import tech.pegasys.teku.statetransition.block.BlockImportChannel.BlockImportAndBroadcastValidationResults;
import tech.pegasys.teku.statetransition.validation.BlockValidator.BroadcastValidationResult;
import tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator.BroadcastValidationResult;
import tech.pegasys.teku.validator.api.SendSignedBlockResult;
import tech.pegasys.teku.validator.coordinator.BlockFactory;
import tech.pegasys.teku.validator.coordinator.DutyMetrics;
Expand Down Expand Up @@ -68,8 +67,7 @@ public void setUp() {
SafeFuture.completedFuture(
new BlockImportAndBroadcastValidationResults(
SafeFuture.completedFuture(
BlockImportResult.successful(signedBlockContents.getSignedBlock())),
Optional.empty())));
BlockImportResult.successful(signedBlockContents.getSignedBlock())))));

assertThatSafeFuture(
blockPublisher.sendSignedBlock(
Expand All @@ -81,10 +79,8 @@ public void setUp() {
}

@Test
public void
sendSignedBlock_shouldImportImmediatelyAndWaitToPublishWhenBroadcastValidationIsSpecified() {
final Optional<SafeFuture<BroadcastValidationResult>> validationResult =
Optional.of(new SafeFuture<>());
public void sendSignedBlock_shouldWaitToPublishWhenBroadcastValidationIsSpecified() {
final SafeFuture<BroadcastValidationResult> validationResult = new SafeFuture<>();
when(blockPublisher.importBlock(
signedBlockContents, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION))
.thenReturn(
Expand All @@ -94,25 +90,27 @@ public void setUp() {
BlockImportResult.successful(signedBlockContents.getSignedBlock())),
validationResult)));

assertThatSafeFuture(
blockPublisher.sendSignedBlock(
signedBlockContents, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION))
.isCompletedWithValue(SendSignedBlockResult.success(signedBlockContents.getRoot()));
final SafeFuture<SendSignedBlockResult> sendSignedBlockResult =
blockPublisher.sendSignedBlock(
signedBlockContents, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION);

assertThatSafeFuture(sendSignedBlockResult).isNotCompleted();

verify(blockPublisher)
.importBlock(signedBlockContents, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION);

verify(blockPublisher, never()).publishBlock(signedBlockContents);

validationResult.get().complete(BroadcastValidationResult.SUCCESS);
validationResult.complete(BroadcastValidationResult.SUCCESS);

verify(blockPublisher).publishBlock(signedBlockContents);
assertThatSafeFuture(sendSignedBlockResult)
.isCompletedWithValue(SendSignedBlockResult.success(signedBlockContents.getRoot()));
}

@Test
public void sendSignedBlock_shouldImportImmediatelyAndNotPublishWhenBroadcastValidationFails() {
final Optional<SafeFuture<BroadcastValidationResult>> validationResult =
Optional.of(new SafeFuture<>());
public void sendSignedBlock_shouldNotPublishWhenBroadcastValidationFails() {
final SafeFuture<BroadcastValidationResult> validationResult = new SafeFuture<>();
when(blockPublisher.importBlock(
signedBlockContents, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION))
.thenReturn(
Expand All @@ -122,19 +120,25 @@ public void sendSignedBlock_shouldImportImmediatelyAndNotPublishWhenBroadcastVal
BlockImportResult.successful(signedBlockContents.getSignedBlock())),
validationResult)));

assertThatSafeFuture(
blockPublisher.sendSignedBlock(
signedBlockContents, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION))
.isCompletedWithValue(SendSignedBlockResult.success(signedBlockContents.getRoot()));
final SafeFuture<SendSignedBlockResult> sendSignedBlockResult =
blockPublisher.sendSignedBlock(
signedBlockContents, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION);

assertThatSafeFuture(sendSignedBlockResult).isNotCompleted();

verify(blockPublisher)
.importBlock(signedBlockContents, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION);

verify(blockPublisher, never()).publishBlock(signedBlockContents);

validationResult.get().complete(BroadcastValidationResult.CONSENSUS_FAILURE);
validationResult.complete(BroadcastValidationResult.CONSENSUS_FAILURE);

verify(blockPublisher, never()).publishBlock(signedBlockContents);
assertThatSafeFuture(sendSignedBlockResult)
.isCompletedWithValue(
SendSignedBlockResult.rejected(
"Broadcast validation failed: "
+ BroadcastValidationResult.CONSENSUS_FAILURE.name()));
}

private static class BlockPublisherTest extends AbstractBlockPublisher {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import tech.pegasys.teku.statetransition.forkchoice.MergeTransitionBlockValidator;
import tech.pegasys.teku.statetransition.forkchoice.StubForkChoiceNotifier;
import tech.pegasys.teku.statetransition.forkchoice.TickProcessor;
import tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator;
import tech.pegasys.teku.statetransition.validation.InternalValidationResult;
import tech.pegasys.teku.storage.client.RecentChainData;
import tech.pegasys.teku.storage.protoarray.ForkChoiceStrategy;
Expand Down Expand Up @@ -341,7 +342,7 @@ private void applyBlock(
block.getSlot(),
block.getParentRoot());
final SafeFuture<BlockImportResult> result =
forkChoice.onBlock(block, Optional.empty(), Optional.empty(), executionLayer);
forkChoice.onBlock(block, Optional.empty(), BlockBroadcastValidator.NOOP, executionLayer);
assertThat(result).isCompleted();
final BlockImportResult importResult = safeJoin(result);
assertThat(importResult)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public interface BlockImportResult {
BlockImportResult FAILED_DESCENDANT_OF_INVALID_BLOCK =
new FailedBlockImportResult(FailureReason.DESCENDANT_OF_INVALID_BLOCK, Optional.empty());

BlockImportResult FAILED_BROADCAST_VALIDATION =
new FailedBlockImportResult(FailureReason.FAILED_BROADCAST_VALIDATION, Optional.empty());

static BlockImportResult failedDataAvailabilityCheckInvalid(final Optional<Throwable> cause) {
return new FailedBlockImportResult(FailureReason.FAILED_DATA_AVAILABILITY_CHECK_INVALID, cause);
}
Expand Down Expand Up @@ -80,10 +83,9 @@ enum FailureReason {
FAILED_EXECUTION_PAYLOAD_EXECUTION,
FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING,
DESCENDANT_OF_INVALID_BLOCK,
FAILED_BROADCAST_GOSSIP_VALIDATION,
FAILED_BROADCAST_EQUIVOCATION_VALIDATION,
FAILED_DATA_AVAILABILITY_CHECK_INVALID,
FAILED_DATA_AVAILABILITY_CHECK_NOT_AVAILABLE,
FAILED_BROADCAST_VALIDATION,
INTERNAL_ERROR // A catch-all category for unexpected errors (bugs)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@

package tech.pegasys.teku.statetransition.block;

import java.util.Optional;
import com.google.common.annotations.VisibleForTesting;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.events.ChannelInterface;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel;
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult;
import tech.pegasys.teku.statetransition.validation.BlockValidator.BroadcastValidationResult;
import tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator.BroadcastValidationResult;

public interface BlockImportChannel extends ChannelInterface {

Expand All @@ -28,5 +28,13 @@ SafeFuture<BlockImportAndBroadcastValidationResults> importBlock(

record BlockImportAndBroadcastValidationResults(
SafeFuture<BlockImportResult> blockImportResult,
Optional<SafeFuture<BroadcastValidationResult>> broadcastValidationResult) {}
SafeFuture<BroadcastValidationResult> broadcastValidationResult) {

/** only used in tests */
@VisibleForTesting
public BlockImportAndBroadcastValidationResults(
final SafeFuture<BlockImportResult> blockImportResult) {
this(blockImportResult, SafeFuture.completedFuture(BroadcastValidationResult.SUCCESS));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel;
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoice;
import tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator;
import tech.pegasys.teku.storage.client.RecentChainData;
import tech.pegasys.teku.weaksubjectivity.WeakSubjectivityValidator;

Expand Down Expand Up @@ -81,14 +82,14 @@ public BlockImporter(

@CheckReturnValue
public SafeFuture<BlockImportResult> importBlock(final SignedBeaconBlock block) {
return importBlock(block, Optional.empty(), Optional.empty());
return importBlock(block, Optional.empty(), BlockBroadcastValidator.NOOP);
}

@CheckReturnValue
public SafeFuture<BlockImportResult> importBlock(
final SignedBeaconBlock block,
final Optional<BlockImportPerformance> blockImportPerformance,
final Optional<SafeFuture<BlockImportResult>> consensusValidationListener) {
final BlockBroadcastValidator blockBroadcastValidator) {

final Optional<Boolean> knownOptimistic = recentChainData.isBlockOptimistic(block.getRoot());
if (knownOptimistic.isPresent()) {
Expand All @@ -107,7 +108,7 @@ public SafeFuture<BlockImportResult> importBlock(
.thenCompose(
__ ->
forkChoice.onBlock(
block, blockImportPerformance, consensusValidationListener, executionLayer))
block, blockImportPerformance, blockBroadcastValidator, executionLayer))
.thenApply(
result -> {
if (!result.isSuccessful()) {
Expand Down
Loading

0 comments on commit 657cc3f

Please sign in to comment.