diff --git a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/AbstractBlockPublisher.java b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/AbstractBlockPublisher.java index fca729ce337..c30caa1a952 100644 --- a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/AbstractBlockPublisher.java +++ b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/publisher/AbstractBlockPublisher.java @@ -20,11 +20,10 @@ import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockContainer; import tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel; -import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult; import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult.FailureReason; import tech.pegasys.teku.statetransition.block.BlockImportChannel; import tech.pegasys.teku.statetransition.block.BlockImportChannel.BlockImportAndBroadcastValidationResults; -import tech.pegasys.teku.statetransition.validation.BlockValidator.BroadcastValidationResult; +import tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator.BroadcastValidationResult; import tech.pegasys.teku.validator.api.SendSignedBlockResult; import tech.pegasys.teku.validator.coordinator.BlockFactory; import tech.pegasys.teku.validator.coordinator.DutyMetrics; @@ -59,41 +58,67 @@ public SafeFuture sendSignedBlock( .thenCompose( signedBlockContainer -> gossipAndImportUnblindedSignedBlock(signedBlockContainer, broadcastValidationLevel)) - .thenApply( - result -> { - if (result.isSuccessful()) { - LOG.trace( - "Successfully imported proposed block: {}", - maybeBlindedBlockContainer.getSignedBlock().toLogString()); - dutyMetrics.onBlockPublished(maybeBlindedBlockContainer.getSlot()); - return SendSignedBlockResult.success(maybeBlindedBlockContainer.getRoot()); - } else if (result.getFailureReason() == FailureReason.BLOCK_IS_FROM_FUTURE) { - LOG.debug( - "Delayed processing proposed block {} because it is from the future", - maybeBlindedBlockContainer.getSignedBlock().toLogString()); - dutyMetrics.onBlockPublished(maybeBlindedBlockContainer.getSlot()); - return SendSignedBlockResult.notImported(result.getFailureReason().name()); - } else { - VALIDATOR_LOGGER.proposedBlockImportFailed( - result.getFailureReason().toString(), - maybeBlindedBlockContainer.getSlot(), - maybeBlindedBlockContainer.getRoot(), - result.getFailureCause()); + .thenCompose(result -> calculateResult(maybeBlindedBlockContainer, result)); + } - return SendSignedBlockResult.notImported(result.getFailureReason().name()); + private SafeFuture calculateResult( + final SignedBlockContainer maybeBlindedBlockContainer, + final BlockImportAndBroadcastValidationResults blockImportAndBroadcastValidationResults) { + + // broadcast validation can fail earlier than block import. + // The assumption is that in that block import will fail but not as fast + // (there might be the state transition in progress) + // Thus, to let the API return as soon as possible, let's check broadcast validation first. + return blockImportAndBroadcastValidationResults + .broadcastValidationResult() + .thenCompose( + broadcastValidationResult -> { + if (broadcastValidationResult.isFailure()) { + return SafeFuture.completedFuture( + SendSignedBlockResult.rejected( + "Broadcast validation failed: " + broadcastValidationResult.name())); } + + return blockImportAndBroadcastValidationResults + .blockImportResult() + .thenApply( + importResult -> { + if (importResult.isSuccessful()) { + LOG.trace( + "Successfully imported proposed block: {}", + maybeBlindedBlockContainer.getSignedBlock().toLogString()); + dutyMetrics.onBlockPublished(maybeBlindedBlockContainer.getSlot()); + return SendSignedBlockResult.success( + maybeBlindedBlockContainer.getRoot()); + } + if (importResult.getFailureReason() == FailureReason.BLOCK_IS_FROM_FUTURE) { + LOG.debug( + "Delayed processing proposed block {} because it is from the future", + maybeBlindedBlockContainer.getSignedBlock().toLogString()); + dutyMetrics.onBlockPublished(maybeBlindedBlockContainer.getSlot()); + return SendSignedBlockResult.notImported( + importResult.getFailureReason().name()); + } + VALIDATOR_LOGGER.proposedBlockImportFailed( + importResult.getFailureReason().toString(), + maybeBlindedBlockContainer.getSlot(), + maybeBlindedBlockContainer.getRoot(), + importResult.getFailureCause()); + + return SendSignedBlockResult.notImported( + importResult.getFailureReason().name()); + }); }); } - private SafeFuture gossipAndImportUnblindedSignedBlock( + private SafeFuture gossipAndImportUnblindedSignedBlock( final SignedBlockContainer blockContainer, final BroadcastValidationLevel broadcastValidationLevel) { if (broadcastValidationLevel == BroadcastValidationLevel.NOT_REQUIRED) { // when broadcast validation is disabled, we can publish the block immediately and then import publishBlock(blockContainer); - return importBlock(blockContainer, broadcastValidationLevel) - .thenCompose(BlockImportAndBroadcastValidationResults::blockImportResult); + return importBlock(blockContainer, broadcastValidationLevel); } // when broadcast validation is enabled, we need to wait for the validation to complete before @@ -104,7 +129,7 @@ private SafeFuture gossipAndImportUnblindedSignedBlock( importBlock(blockContainer, broadcastValidationLevel); blockImportAndBroadcastValidationResults - .thenCompose(results -> results.broadcastValidationResult().orElseThrow()) + .thenCompose(BlockImportAndBroadcastValidationResults::broadcastValidationResult) .thenAccept( broadcastValidationResult -> { if (broadcastValidationResult == BroadcastValidationResult.SUCCESS) { @@ -124,8 +149,7 @@ private SafeFuture gossipAndImportUnblindedSignedBlock( blockContainer.getSlot(), err)); - return blockImportAndBroadcastValidationResults.thenCompose( - BlockImportAndBroadcastValidationResults::blockImportResult); + return blockImportAndBroadcastValidationResults; } abstract SafeFuture importBlock( diff --git a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java index 052562ce711..bd23f4d2e64 100644 --- a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java +++ b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java @@ -1242,6 +1242,6 @@ private SafeFuture prepareBlockImportR final BlockImportResult blockImportResult) { return SafeFuture.completedFuture( new BlockImportAndBroadcastValidationResults( - SafeFuture.completedFuture(blockImportResult), Optional.empty())); + SafeFuture.completedFuture(blockImportResult))); } } diff --git a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/publisher/AbstractBlockPublisherTest.java b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/publisher/AbstractBlockPublisherTest.java index 652c14c95f1..90f0b4ee468 100644 --- a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/publisher/AbstractBlockPublisherTest.java +++ b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/publisher/AbstractBlockPublisherTest.java @@ -20,7 +20,6 @@ import static org.mockito.Mockito.when; import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.assertThatSafeFuture; -import java.util.Optional; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import tech.pegasys.teku.infrastructure.async.SafeFuture; @@ -32,7 +31,7 @@ import tech.pegasys.teku.spec.util.DataStructureUtil; import tech.pegasys.teku.statetransition.block.BlockImportChannel; import tech.pegasys.teku.statetransition.block.BlockImportChannel.BlockImportAndBroadcastValidationResults; -import tech.pegasys.teku.statetransition.validation.BlockValidator.BroadcastValidationResult; +import tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator.BroadcastValidationResult; import tech.pegasys.teku.validator.api.SendSignedBlockResult; import tech.pegasys.teku.validator.coordinator.BlockFactory; import tech.pegasys.teku.validator.coordinator.DutyMetrics; @@ -68,8 +67,7 @@ public void setUp() { SafeFuture.completedFuture( new BlockImportAndBroadcastValidationResults( SafeFuture.completedFuture( - BlockImportResult.successful(signedBlockContents.getSignedBlock())), - Optional.empty()))); + BlockImportResult.successful(signedBlockContents.getSignedBlock()))))); assertThatSafeFuture( blockPublisher.sendSignedBlock( @@ -81,10 +79,8 @@ public void setUp() { } @Test - public void - sendSignedBlock_shouldImportImmediatelyAndWaitToPublishWhenBroadcastValidationIsSpecified() { - final Optional> validationResult = - Optional.of(new SafeFuture<>()); + public void sendSignedBlock_shouldWaitToPublishWhenBroadcastValidationIsSpecified() { + final SafeFuture validationResult = new SafeFuture<>(); when(blockPublisher.importBlock( signedBlockContents, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION)) .thenReturn( @@ -94,25 +90,27 @@ public void setUp() { BlockImportResult.successful(signedBlockContents.getSignedBlock())), validationResult))); - assertThatSafeFuture( - blockPublisher.sendSignedBlock( - signedBlockContents, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION)) - .isCompletedWithValue(SendSignedBlockResult.success(signedBlockContents.getRoot())); + final SafeFuture sendSignedBlockResult = + blockPublisher.sendSignedBlock( + signedBlockContents, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION); + + assertThatSafeFuture(sendSignedBlockResult).isNotCompleted(); verify(blockPublisher) .importBlock(signedBlockContents, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION); verify(blockPublisher, never()).publishBlock(signedBlockContents); - validationResult.get().complete(BroadcastValidationResult.SUCCESS); + validationResult.complete(BroadcastValidationResult.SUCCESS); verify(blockPublisher).publishBlock(signedBlockContents); + assertThatSafeFuture(sendSignedBlockResult) + .isCompletedWithValue(SendSignedBlockResult.success(signedBlockContents.getRoot())); } @Test - public void sendSignedBlock_shouldImportImmediatelyAndNotPublishWhenBroadcastValidationFails() { - final Optional> validationResult = - Optional.of(new SafeFuture<>()); + public void sendSignedBlock_shouldNotPublishWhenBroadcastValidationFails() { + final SafeFuture validationResult = new SafeFuture<>(); when(blockPublisher.importBlock( signedBlockContents, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION)) .thenReturn( @@ -122,19 +120,25 @@ public void sendSignedBlock_shouldImportImmediatelyAndNotPublishWhenBroadcastVal BlockImportResult.successful(signedBlockContents.getSignedBlock())), validationResult))); - assertThatSafeFuture( - blockPublisher.sendSignedBlock( - signedBlockContents, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION)) - .isCompletedWithValue(SendSignedBlockResult.success(signedBlockContents.getRoot())); + final SafeFuture sendSignedBlockResult = + blockPublisher.sendSignedBlock( + signedBlockContents, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION); + + assertThatSafeFuture(sendSignedBlockResult).isNotCompleted(); verify(blockPublisher) .importBlock(signedBlockContents, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION); verify(blockPublisher, never()).publishBlock(signedBlockContents); - validationResult.get().complete(BroadcastValidationResult.CONSENSUS_FAILURE); + validationResult.complete(BroadcastValidationResult.CONSENSUS_FAILURE); verify(blockPublisher, never()).publishBlock(signedBlockContents); + assertThatSafeFuture(sendSignedBlockResult) + .isCompletedWithValue( + SendSignedBlockResult.rejected( + "Broadcast validation failed: " + + BroadcastValidationResult.CONSENSUS_FAILURE.name())); } private static class BlockPublisherTest extends AbstractBlockPublisher { diff --git a/eth-reference-tests/src/referenceTest/java/tech/pegasys/teku/reference/phase0/forkchoice/ForkChoiceTestExecutor.java b/eth-reference-tests/src/referenceTest/java/tech/pegasys/teku/reference/phase0/forkchoice/ForkChoiceTestExecutor.java index 0934d468b2a..ed34df3cd08 100644 --- a/eth-reference-tests/src/referenceTest/java/tech/pegasys/teku/reference/phase0/forkchoice/ForkChoiceTestExecutor.java +++ b/eth-reference-tests/src/referenceTest/java/tech/pegasys/teku/reference/phase0/forkchoice/ForkChoiceTestExecutor.java @@ -71,6 +71,7 @@ import tech.pegasys.teku.statetransition.forkchoice.MergeTransitionBlockValidator; import tech.pegasys.teku.statetransition.forkchoice.StubForkChoiceNotifier; import tech.pegasys.teku.statetransition.forkchoice.TickProcessor; +import tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator; import tech.pegasys.teku.statetransition.validation.InternalValidationResult; import tech.pegasys.teku.storage.client.RecentChainData; import tech.pegasys.teku.storage.protoarray.ForkChoiceStrategy; @@ -341,7 +342,7 @@ private void applyBlock( block.getSlot(), block.getParentRoot()); final SafeFuture result = - forkChoice.onBlock(block, Optional.empty(), Optional.empty(), executionLayer); + forkChoice.onBlock(block, Optional.empty(), BlockBroadcastValidator.NOOP, executionLayer); assertThat(result).isCompleted(); final BlockImportResult importResult = safeJoin(result); assertThat(importResult) diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/common/statetransition/results/BlockImportResult.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/common/statetransition/results/BlockImportResult.java index 3068888add6..6c2e3532282 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/common/statetransition/results/BlockImportResult.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/common/statetransition/results/BlockImportResult.java @@ -34,6 +34,9 @@ public interface BlockImportResult { BlockImportResult FAILED_DESCENDANT_OF_INVALID_BLOCK = new FailedBlockImportResult(FailureReason.DESCENDANT_OF_INVALID_BLOCK, Optional.empty()); + BlockImportResult FAILED_BROADCAST_VALIDATION = + new FailedBlockImportResult(FailureReason.FAILED_BROADCAST_VALIDATION, Optional.empty()); + static BlockImportResult failedDataAvailabilityCheckInvalid(final Optional cause) { return new FailedBlockImportResult(FailureReason.FAILED_DATA_AVAILABILITY_CHECK_INVALID, cause); } @@ -80,10 +83,9 @@ enum FailureReason { FAILED_EXECUTION_PAYLOAD_EXECUTION, FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING, DESCENDANT_OF_INVALID_BLOCK, - FAILED_BROADCAST_GOSSIP_VALIDATION, - FAILED_BROADCAST_EQUIVOCATION_VALIDATION, FAILED_DATA_AVAILABILITY_CHECK_INVALID, FAILED_DATA_AVAILABILITY_CHECK_NOT_AVAILABLE, + FAILED_BROADCAST_VALIDATION, INTERNAL_ERROR // A catch-all category for unexpected errors (bugs) } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImportChannel.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImportChannel.java index 60f88a2b03a..784ac6f7adb 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImportChannel.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImportChannel.java @@ -13,13 +13,13 @@ package tech.pegasys.teku.statetransition.block; -import java.util.Optional; +import com.google.common.annotations.VisibleForTesting; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.events.ChannelInterface; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel; import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult; -import tech.pegasys.teku.statetransition.validation.BlockValidator.BroadcastValidationResult; +import tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator.BroadcastValidationResult; public interface BlockImportChannel extends ChannelInterface { @@ -28,5 +28,13 @@ SafeFuture importBlock( record BlockImportAndBroadcastValidationResults( SafeFuture blockImportResult, - Optional> broadcastValidationResult) {} + SafeFuture broadcastValidationResult) { + + /** only used in tests */ + @VisibleForTesting + public BlockImportAndBroadcastValidationResults( + final SafeFuture blockImportResult) { + this(blockImportResult, SafeFuture.completedFuture(BroadcastValidationResult.SUCCESS)); + } + } } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImporter.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImporter.java index 786e626ab59..40c3fa8d54c 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImporter.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockImporter.java @@ -38,6 +38,7 @@ import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel; import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult; import tech.pegasys.teku.statetransition.forkchoice.ForkChoice; +import tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator; import tech.pegasys.teku.storage.client.RecentChainData; import tech.pegasys.teku.weaksubjectivity.WeakSubjectivityValidator; @@ -81,14 +82,14 @@ public BlockImporter( @CheckReturnValue public SafeFuture importBlock(final SignedBeaconBlock block) { - return importBlock(block, Optional.empty(), Optional.empty()); + return importBlock(block, Optional.empty(), BlockBroadcastValidator.NOOP); } @CheckReturnValue public SafeFuture importBlock( final SignedBeaconBlock block, final Optional blockImportPerformance, - final Optional> consensusValidationListener) { + final BlockBroadcastValidator blockBroadcastValidator) { final Optional knownOptimistic = recentChainData.isBlockOptimistic(block.getRoot()); if (knownOptimistic.isPresent()) { @@ -107,7 +108,7 @@ public SafeFuture importBlock( .thenCompose( __ -> forkChoice.onBlock( - block, blockImportPerformance, consensusValidationListener, executionLayer)) + block, blockImportPerformance, blockBroadcastValidator, executionLayer)) .thenApply( result -> { if (!result.isSuccessful()) { diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockManager.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockManager.java index 37642cd41eb..0014e030763 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockManager.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/block/BlockManager.java @@ -34,8 +34,8 @@ import tech.pegasys.teku.statetransition.blobs.BlobSidecarPool; import tech.pegasys.teku.statetransition.util.FutureItems; import tech.pegasys.teku.statetransition.util.PendingPool; +import tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator; import tech.pegasys.teku.statetransition.validation.BlockValidator; -import tech.pegasys.teku.statetransition.validation.BlockValidator.BroadcastValidationResult; import tech.pegasys.teku.statetransition.validation.InternalValidationResult; import tech.pegasys.teku.statetransition.validation.ValidationResultCode; import tech.pegasys.teku.storage.client.RecentChainData; @@ -111,29 +111,19 @@ public SafeFuture importBlock( final SignedBeaconBlock block, final BroadcastValidationLevel broadcastValidationLevel) { LOG.trace("Preparing to import block: {}", block::toLogString); - // NO broadcast validation, import the old way - if (broadcastValidationLevel == BroadcastValidationLevel.NOT_REQUIRED) { - return SafeFuture.completedFuture( - new BlockImportAndBroadcastValidationResults( - doImportBlock(block, Optional.empty(), Optional.empty()), Optional.empty())); - } + final BlockBroadcastValidator blockBroadcastValidator = + blockValidator.initiateBroadcastValidation(block, broadcastValidationLevel); - final SafeFuture consensusValidationListener = new SafeFuture<>(); final SafeFuture importResult = - doImportBlock(block, Optional.empty(), Optional.of(consensusValidationListener)); - - // we want a future that completes as soon as the consensus validation is done, or intercept any - // early import results\exceptions happening before the consensus validation is completed - final SafeFuture consensusValidationResultOrImportFailure = - consensusValidationListener.or(importResult); + doImportBlock(block, Optional.empty(), blockBroadcastValidator); - final SafeFuture broadcastValidationResult = - blockValidator.validateBroadcast( - block, broadcastValidationLevel, consensusValidationResultOrImportFailure); + // we want to intercept any early import exceptions happening before the consensus validation is + // completed + blockBroadcastValidator.attachToBlockImport(importResult); return SafeFuture.completedFuture( new BlockImportAndBroadcastValidationResults( - importResult, Optional.of(broadcastValidationResult))); + importResult, blockBroadcastValidator.getResult())); } @SuppressWarnings("FutureReturnValueIgnored") @@ -163,7 +153,7 @@ public SafeFuture validateAndImportBlock( result -> { if (result.code().equals(ValidationResultCode.ACCEPT) || result.code().equals(ValidationResultCode.SAVE_FOR_FUTURE)) { - doImportBlock(block, blockImportPerformance, Optional.empty()) + doImportBlock(block, blockImportPerformance, BlockBroadcastValidator.NOOP) .finish(err -> LOG.error("Failed to process received block.", err)); } }); @@ -214,18 +204,19 @@ public void onBlockValidated(SignedBeaconBlock block) { } private void importBlockIgnoringResult(final SignedBeaconBlock block) { - doImportBlock(block, Optional.empty(), Optional.empty()).ifExceptionGetsHereRaiseABug(); + doImportBlock(block, Optional.empty(), BlockBroadcastValidator.NOOP) + .ifExceptionGetsHereRaiseABug(); } private SafeFuture doImportBlock( final SignedBeaconBlock block, final Optional blockImportPerformance, - final Optional> consensusValidationListener) { + final BlockBroadcastValidator blockBroadcastValidator) { return handleInvalidBlock(block) .or(() -> handleKnownBlock(block)) .orElseGet( () -> - handleBlockImport(block, blockImportPerformance, consensusValidationListener) + handleBlockImport(block, blockImportPerformance, blockBroadcastValidator) .thenPeek( result -> lateBlockImportCheck(blockImportPerformance, block, result))) .thenPeek( @@ -272,13 +263,13 @@ private Optional> handleKnownBlock(final SignedBea private SafeFuture handleBlockImport( final SignedBeaconBlock block, final Optional blockImportPerformance, - final Optional> consensusValidationListener) { + final BlockBroadcastValidator blockBroadcastValidator) { onBlockValidated(block); blobSidecarPool.onNewBlock(block); return blockImporter - .importBlock(block, blockImportPerformance, consensusValidationListener) + .importBlock(block, blockImportPerformance, blockBroadcastValidator) .thenPeek( result -> { if (result.isSuccessful()) { @@ -332,7 +323,17 @@ private SafeFuture handleBlockImport( LOG.warn("Unable to import block {} due to invalid data", block.toLogString()); blobSidecarPool.removeAllForBlock(block.getRoot()); break; - default: + case FAILED_BROADCAST_VALIDATION: + LOG.warn( + "Unable to import block {} due to failed broadcast validation", + block.toLogString()); + break; + // let's avoid default: so we don't forget to explicitly handle new cases + case DOES_NOT_DESCEND_FROM_LATEST_FINALIZED, + FAILED_STATE_TRANSITION, + FAILED_WEAK_SUBJECTIVITY_CHECKS, + DESCENDANT_OF_INVALID_BLOCK, + INTERNAL_ERROR: LOG.trace( "Unable to import block for reason {}: {}", result.getFailureReason(), diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoice.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoice.java index dff5eb1f1a2..a3039e55773 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoice.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoice.java @@ -69,6 +69,7 @@ import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager; import tech.pegasys.teku.statetransition.block.BlockImportPerformance; import tech.pegasys.teku.statetransition.validation.AttestationStateSelector; +import tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator; import tech.pegasys.teku.statetransition.validation.InternalValidationResult; import tech.pegasys.teku.storage.client.RecentChainData; import tech.pegasys.teku.storage.protoarray.DeferredVotes; @@ -199,7 +200,7 @@ public SafeFuture processHead() { public SafeFuture onBlock( final SignedBeaconBlock block, final Optional blockImportPerformance, - final Optional> consensusValidationListener, + final BlockBroadcastValidator blockBroadcastValidator, final ExecutionLayerChannel executionLayer) { return recentChainData .retrieveStateAtSlot(new SlotAndBlockRoot(block.getSlot(), block.getParentRoot())) @@ -210,7 +211,7 @@ public SafeFuture onBlock( block, blockSlotState, blockImportPerformance, - consensusValidationListener, + blockBroadcastValidator, executionLayer)); } @@ -388,7 +389,7 @@ private SafeFuture onBlock( final SignedBeaconBlock block, final Optional blockSlotState, final Optional blockImportPerformance, - final Optional> consensusValidationListener, + final BlockBroadcastValidator blockBroadcastValidator, final ExecutionLayerChannel executionLayer) { if (blockSlotState.isEmpty()) { return SafeFuture.completedFuture(BlockImportResult.FAILED_UNKNOWN_PARENT); @@ -436,36 +437,47 @@ private SafeFuture onBlock( } blockImportPerformance.ifPresent(BlockImportPerformance::postStateCreated); - final SafeFuture blobSidecarsAvailabilityResult = + final SafeFuture blobSidecarsAvailabilityFuture = blobSidecarsAvailabilityChecker .getAvailabilityCheckResult() .thenPeek( - result -> - consensusValidationListener.ifPresent( - voidSafeFuture -> { - // consensus validation is completed when DA check is completed - if (result.isSuccess()) { - voidSafeFuture.complete(BlockImportResult.successful(block)); - } - })); - - return payloadExecutor - .getExecutionResult() - .thenPeek( - __ -> blockImportPerformance.ifPresent(BlockImportPerformance::executionResultReceived)) - .thenCombineAsync( - blobSidecarsAvailabilityResult, - (payloadResult, blobSidecarsAndValidationResult) -> - importBlockAndState( - block, - blockSlotState.get(), - blockImportPerformance, - forkChoiceUtil, - indexedAttestationCache, - postState, - payloadResult, - blobSidecarsAndValidationResult), - forkChoiceExecutor); + result -> { + // consensus validation is completed when DA check is completed + if (result.isSuccess()) { + blockBroadcastValidator.onConsensusValidationSucceeded(); + } + }); + + final SafeFuture payloadValidationFuture = + payloadExecutor + .getExecutionResult() + .thenPeek( + __ -> + blockImportPerformance.ifPresent( + BlockImportPerformance::executionResultReceived)); + + return blockBroadcastValidator + .getResult() + .thenCompose( + broadcastValidationResult -> { + if (broadcastValidationResult.isFailure()) { + return SafeFuture.completedFuture(BlockImportResult.FAILED_BROADCAST_VALIDATION); + } + + return payloadValidationFuture.thenCombineAsync( + blobSidecarsAvailabilityFuture, + (payloadResult, blobSidecarsAndValidationResult) -> + importBlockAndState( + block, + blockSlotState.get(), + blockImportPerformance, + forkChoiceUtil, + indexedAttestationCache, + postState, + payloadResult, + blobSidecarsAndValidationResult), + forkChoiceExecutor); + }); } private BlockImportResult importBlockAndState( diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/validation/BlockBroadcastValidator.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/validation/BlockBroadcastValidator.java new file mode 100644 index 00000000000..7410104223a --- /dev/null +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/validation/BlockBroadcastValidator.java @@ -0,0 +1,68 @@ +/* + * Copyright Consensys Software Inc., 2023 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.validation; + +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel; +import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult; + +public interface BlockBroadcastValidator { + + BlockBroadcastValidator NOOP = + new BlockBroadcastValidator() { + private static final SafeFuture SUCCESS = + SafeFuture.completedFuture(BroadcastValidationResult.SUCCESS); + + @Override + public void onConsensusValidationSucceeded() {} + + @Override + public void attachToBlockImport(final SafeFuture blockImportResult) {} + + @Override + public SafeFuture getResult() { + return SUCCESS; + } + }; + + static BlockBroadcastValidator create( + final SignedBeaconBlock block, + final BlockGossipValidator blockGossipValidator, + final BroadcastValidationLevel broadcastValidationLevel) { + if (broadcastValidationLevel == BroadcastValidationLevel.NOT_REQUIRED) { + return NOOP; + } + return BlockBroadcastValidatorImpl.create( + block, blockGossipValidator, broadcastValidationLevel); + } + + void onConsensusValidationSucceeded(); + + void attachToBlockImport(SafeFuture blockImportResult); + + SafeFuture getResult(); + + enum BroadcastValidationResult { + SUCCESS, + GOSSIP_FAILURE, + CONSENSUS_FAILURE, + FINAL_EQUIVOCATION_FAILURE; + + // is success + public boolean isFailure() { + return this != SUCCESS; + } + } +} diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/validation/BlockBroadcastValidatorImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/validation/BlockBroadcastValidatorImpl.java new file mode 100644 index 00000000000..858a75ac42e --- /dev/null +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/validation/BlockBroadcastValidatorImpl.java @@ -0,0 +1,147 @@ +/* + * Copyright Consensys Software Inc., 2023 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.validation; + +import static tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel.GOSSIP; +import static tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel.NOT_REQUIRED; +import static tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator.BroadcastValidationResult.CONSENSUS_FAILURE; +import static tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator.BroadcastValidationResult.FINAL_EQUIVOCATION_FAILURE; +import static tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator.BroadcastValidationResult.GOSSIP_FAILURE; +import static tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator.BroadcastValidationResult.SUCCESS; + +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel; +import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult; + +public class BlockBroadcastValidatorImpl implements BlockBroadcastValidator { + private final BlockGossipValidator blockGossipValidator; + private final BroadcastValidationLevel broadcastValidationLevel; + private final SafeFuture consensusValidationSuccessResult; + private final SafeFuture broadcastValidationResult; + + private BlockBroadcastValidatorImpl( + final BlockGossipValidator blockGossipValidator, + final BroadcastValidationLevel broadcastValidationLevel) { + this.blockGossipValidator = blockGossipValidator; + this.broadcastValidationLevel = broadcastValidationLevel; + this.consensusValidationSuccessResult = new SafeFuture<>(); + this.broadcastValidationResult = new SafeFuture<>(); + } + + public static BlockBroadcastValidatorImpl create( + final SignedBeaconBlock block, + final BlockGossipValidator blockGossipValidator, + final BroadcastValidationLevel broadcastValidationLevel) { + final BlockBroadcastValidatorImpl blockBroadcastValidator = + new BlockBroadcastValidatorImpl(blockGossipValidator, broadcastValidationLevel); + blockBroadcastValidator.buildValidationPipeline(block); + return blockBroadcastValidator; + } + + @Override + public void onConsensusValidationSucceeded() { + consensusValidationSuccessResult.complete(true); + } + + @Override + public void attachToBlockImport(final SafeFuture blockImportResult) { + switch (broadcastValidationLevel) { + case NOT_REQUIRED, GOSSIP: + // GOSSIP validation isn't dependent on block import result, + // so not propagating exceptions to consensusValidationSuccessResult allow blocks\blobs + // to be published even in case block import fails before gossip validation completes + return; + case CONSENSUS, CONSENSUS_AND_EQUIVOCATION: + // Any successful block import will be considered as a consensus validation success, but + // more importantly we propagate exceptions to the consensus validation, thus we capture any + // early block import failures + blockImportResult + .thenApply(BlockImportResult::isSuccessful) + .propagateTo(consensusValidationSuccessResult); + } + } + + @Override + public SafeFuture getResult() { + return broadcastValidationResult; + } + + private void buildValidationPipeline(final SignedBeaconBlock block) { + // validateBroadcast should not be called at all but let's cover the case for safety + if (broadcastValidationLevel == NOT_REQUIRED) { + broadcastValidationResult.complete(SUCCESS); + consensusValidationSuccessResult.cancel(true); + return; + } + + // GOSSIP only validation + SafeFuture validationPipeline = + blockGossipValidator + .validate(block, true) + .thenApply( + gossipValidationResult -> { + if (gossipValidationResult.isAccept()) { + return SUCCESS; + } + return GOSSIP_FAILURE; + }); + + if (broadcastValidationLevel == GOSSIP) { + validationPipeline.propagateTo(broadcastValidationResult); + consensusValidationSuccessResult.cancel(true); + return; + } + + // GOSSIP and CONSENSUS validation + validationPipeline = + validationPipeline.thenCompose( + broadcastValidationResult -> { + if (broadcastValidationResult != SUCCESS) { + // forward gossip validation failure + return SafeFuture.completedFuture(broadcastValidationResult); + } + return consensusValidationSuccessResult.thenApply( + consensusValidationSuccess -> { + if (consensusValidationSuccess) { + return SUCCESS; + } + return CONSENSUS_FAILURE; + }); + }); + + if (broadcastValidationLevel == BroadcastValidationLevel.CONSENSUS) { + validationPipeline.propagateTo(broadcastValidationResult); + return; + } + + // GOSSIP, CONSENSUS and additional EQUIVOCATION validation + validationPipeline + .thenApply( + broadcastValidationResult -> { + if (broadcastValidationResult != SUCCESS) { + // forward gossip or consensus validation failure + return broadcastValidationResult; + } + + // perform final equivocation validation + if (blockGossipValidator.blockIsFirstBlockWithValidSignatureForSlot(block)) { + return SUCCESS; + } + + return FINAL_EQUIVOCATION_FAILURE; + }) + .propagateTo(broadcastValidationResult); + } +} diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/validation/BlockValidator.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/validation/BlockValidator.java index 7c430455370..fca77880c42 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/validation/BlockValidator.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/validation/BlockValidator.java @@ -16,7 +16,6 @@ import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel; -import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult; public class BlockValidator { @@ -30,74 +29,8 @@ public SafeFuture validateGossip(final SignedBeaconBlo return blockGossipValidator.validate(block, false); } - public SafeFuture validateBroadcast( - final SignedBeaconBlock block, - final BroadcastValidationLevel broadcastValidationLevel, - final SafeFuture consensusValidationResult) { - - // validateBroadcast should not be called at all but let's cover the case for safety - if (broadcastValidationLevel == BroadcastValidationLevel.NOT_REQUIRED) { - return SafeFuture.completedFuture(BroadcastValidationResult.SUCCESS); - } - - // GOSSIP only validation - SafeFuture validationPipeline = - blockGossipValidator - .validate(block, true) - .thenApply( - gossipValidationResult -> { - if (gossipValidationResult.isAccept()) { - return BroadcastValidationResult.SUCCESS; - } - return BroadcastValidationResult.GOSSIP_FAILURE; - }); - - if (broadcastValidationLevel == BroadcastValidationLevel.GOSSIP) { - return validationPipeline; - } - - // GOSSIP and CONSENSUS validation - validationPipeline = - validationPipeline.thenCompose( - broadcastValidationResult -> { - if (broadcastValidationResult != BroadcastValidationResult.SUCCESS) { - // forward gossip validation failure - return SafeFuture.completedFuture(broadcastValidationResult); - } - return consensusValidationResult.thenApply( - consensusValidation -> { - if (consensusValidation.isSuccessful()) { - return BroadcastValidationResult.SUCCESS; - } - return BroadcastValidationResult.CONSENSUS_FAILURE; - }); - }); - - if (broadcastValidationLevel == BroadcastValidationLevel.CONSENSUS) { - return validationPipeline; - } - - // GOSSIP, CONSENSUS and additional EQUIVOCATION validation - return validationPipeline.thenApply( - broadcastValidationResult -> { - if (broadcastValidationResult != BroadcastValidationResult.SUCCESS) { - // forward gossip or consensus validation failure - return broadcastValidationResult; - } - - // perform final equivocation validation - if (blockGossipValidator.blockIsFirstBlockWithValidSignatureForSlot(block)) { - return BroadcastValidationResult.SUCCESS; - } - - return BroadcastValidationResult.FINAL_EQUIVOCATION_FAILURE; - }); - } - - public enum BroadcastValidationResult { - SUCCESS, - GOSSIP_FAILURE, - CONSENSUS_FAILURE, - FINAL_EQUIVOCATION_FAILURE + public BlockBroadcastValidator initiateBroadcastValidation( + final SignedBeaconBlock block, final BroadcastValidationLevel validationLevel) { + return BlockBroadcastValidator.create(block, blockGossipValidator, validationLevel); } } diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/DataUnavailableBlockPoolTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/DataUnavailableBlockPoolTest.java index f4908e9f328..289619625f4 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/DataUnavailableBlockPoolTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/blobs/DataUnavailableBlockPoolTest.java @@ -66,13 +66,11 @@ void setUp() { when(blockManager.importBlock(block1, NOT_REQUIRED)) .thenReturn( SafeFuture.completedFuture( - new BlockImportAndBroadcastValidationResults( - block1ImportResult, Optional.empty()))); + new BlockImportAndBroadcastValidationResults(block1ImportResult))); when(blockManager.importBlock(block2, NOT_REQUIRED)) .thenReturn( SafeFuture.completedFuture( - new BlockImportAndBroadcastValidationResults( - block2ImportResult, Optional.empty()))); + new BlockImportAndBroadcastValidationResults(block2ImportResult))); } @Test diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockManagerTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockManagerTest.java index cb5572e0282..c52d9d67b78 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockManagerTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/BlockManagerTest.java @@ -15,7 +15,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -45,8 +44,7 @@ import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.PROCESSED_EVENT_LABEL; import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.TRANSACTION_COMMITTED_EVENT_LABEL; import static tech.pegasys.teku.statetransition.block.BlockImportPerformance.TRANSACTION_PREPARED_EVENT_LABEL; -import static tech.pegasys.teku.statetransition.validation.BlockValidator.BroadcastValidationResult.CONSENSUS_FAILURE; -import static tech.pegasys.teku.statetransition.validation.BlockValidator.BroadcastValidationResult.SUCCESS; +import static tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator.BroadcastValidationResult.SUCCESS; import java.util.ArrayList; import java.util.Collections; @@ -98,6 +96,8 @@ import tech.pegasys.teku.statetransition.util.FutureItems; import tech.pegasys.teku.statetransition.util.PendingPool; import tech.pegasys.teku.statetransition.util.PoolFactory; +import tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator; +import tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator.BroadcastValidationResult; import tech.pegasys.teku.statetransition.validation.BlockValidator; import tech.pegasys.teku.statetransition.validation.InternalValidationResult; import tech.pegasys.teku.storage.client.ChainHead; @@ -134,6 +134,8 @@ public class BlockManagerTest { private ExecutionLayerChannelStub executionLayer; private final BlockValidator blockValidator = mock(BlockValidator.class); + private final BlockBroadcastValidator blockBroadcastValidator = + mock(BlockBroadcastValidator.class); private BlockImporter blockImporter; private BlockManager blockManager; @@ -207,6 +209,9 @@ private void setupWithSpec(final Spec spec) { assertThat(blockManager.start()).isCompleted(); when(blobSidecarManager.createAvailabilityChecker(any())) .thenReturn(BlobSidecarsAvailabilityChecker.NOOP); + when(blockValidator.initiateBroadcastValidation(any(), any())) + .thenReturn(blockBroadcastValidator); + when(blockBroadcastValidator.getResult()).thenReturn(SafeFuture.completedFuture(SUCCESS)); } private void forwardBlockImportedNotificationsTo(final BlockManager blockManager) { @@ -340,7 +345,7 @@ public void onGossipedBlock_retryIfParentWasUnknownButIsNowAvailable() { pendingBlocks, futureBlocks, invalidBlockRoots, - mock(BlockValidator.class), + blockValidator, timeProvider, eventLogger, Optional.empty(), @@ -357,15 +362,14 @@ public void onGossipedBlock_retryIfParentWasUnknownButIsNowAvailable() { localChain.chainBuilder().generateBlockAtSlot(nextNextSlot).getBlock(); final SafeFuture blockImportResult = new SafeFuture<>(); - when(blockImporter.importBlock(nextNextBlock, Optional.empty(), Optional.empty())) + when(blockImporter.importBlock(eq(nextNextBlock), eq(Optional.empty()), any())) .thenReturn(blockImportResult) .thenReturn(new SafeFuture<>()); incrementSlot(); incrementSlot(); assertThatBlockImport(nextNextBlock).isNotCompleted(); - ignoreFuture( - verify(blockImporter).importBlock(nextNextBlock, Optional.empty(), Optional.empty())); + ignoreFuture(verify(blockImporter).importBlock(eq(nextNextBlock), eq(Optional.empty()), any())); // Before nextNextBlock imports, it's parent becomes available when(localRecentChainData.containsBlock(nextNextBlock.getParentRoot())).thenReturn(true); @@ -374,7 +378,7 @@ public void onGossipedBlock_retryIfParentWasUnknownButIsNowAvailable() { blockImportResult.complete(BlockImportResult.FAILED_UNKNOWN_PARENT); ignoreFuture( verify(blockImporter, times(2)) - .importBlock(nextNextBlock, Optional.empty(), Optional.empty())); + .importBlock(eq(nextNextBlock), eq(Optional.empty()), any())); assertThat(pendingBlocks.contains(nextNextBlock)).isFalse(); } @@ -652,17 +656,14 @@ public void onValidateAndImportBlock_shouldEarlyRejectInvalidBlocks() { } @Test - void onImportBlock_shouldImportWithBroadcastValidationCompletedWileStillImporting() { + void onImportBlock_shouldImportWithBroadcastValidationCompletedWhileStillImporting() { final UInt64 nextSlot = GENESIS_SLOT.plus(UInt64.ONE); final SignedBeaconBlock nextBlock = localChain.chainBuilder().generateBlockAtSlot(nextSlot).getBlock(); incrementSlot(); - when(blockValidator.validateBroadcast( - eq(nextBlock), - eq(GOSSIP), - argThat(importResult -> safeJoin(importResult).isSuccessful()))) - .thenReturn(SafeFuture.completedFuture(SUCCESS)); + when(blockValidator.initiateBroadcastValidation(eq(nextBlock), eq(GOSSIP))) + .thenReturn(BlockBroadcastValidator.NOOP); // let's delay EL so importResult SafeFuture doesn't complete final SafeFuture payloadStatusSafeFuture = new SafeFuture<>(); @@ -677,9 +678,7 @@ void onImportBlock_shouldImportWithBroadcastValidationCompletedWileStillImportin assertThatSafeFuture(blockImportAndBroadcastValidationResults.blockImportResult()) .isNotCompleted(); assertThatSafeFuture( - blockImportAndBroadcastValidationResults - .broadcastValidationResult() - .orElseThrow()) + blockImportAndBroadcastValidationResults.broadcastValidationResult()) .isCompletedWithValue(SUCCESS); return true; }); @@ -701,39 +700,33 @@ void onImportBlock_shouldImportWithBroadcastValidationCompletedWileStillImportin @Test void onImportBlock_shouldNotImportWithBroadcastValidationWhenImportFails() { + final UInt64 nextSlot = GENESIS_SLOT.plus(UInt64.ONE); + final SignedBeaconBlock nextBlock = + localChain.chainBuilder().generateBlockAtSlot(nextSlot).getBlock(); + incrementSlot(); - final SignedBeaconBlock invalidBlock = - localChain - .chainBuilder() - .generateBlockAtSlot(incrementSlot(), BlockOptions.create().setWrongProposer(true)) - .getBlock(); + final BlockBroadcastValidator blockBroadcastValidator = mock(BlockBroadcastValidator.class); + when(blockValidator.initiateBroadcastValidation(eq(nextBlock), eq(GOSSIP))) + .thenReturn(blockBroadcastValidator); + + when(blockBroadcastValidator.getResult()) + .thenReturn(SafeFuture.completedFuture(BroadcastValidationResult.GOSSIP_FAILURE)); - when(blockValidator.validateBroadcast( - eq(invalidBlock), - eq(BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION), - // expecting consensus import state transition failure - argThat( - importResult -> - safeJoin(importResult) - .getFailureReason() - .equals(FailureReason.FAILED_STATE_TRANSITION)))) - .thenReturn(SafeFuture.completedFuture(CONSENSUS_FAILURE)); - - assertThatBlockImportAndBroadcastValidationResults( - invalidBlock, BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION) + assertThatBlockImportAndBroadcastValidationResults(nextBlock, GOSSIP) .isCompletedWithValueMatching( blockImportAndBroadcastValidationResults -> { assertThatSafeFuture(blockImportAndBroadcastValidationResults.blockImportResult()) .isCompletedWithValueMatching( result -> - result.getFailureReason().equals(FailureReason.FAILED_STATE_TRANSITION)); + result + .getFailureReason() + .equals(FailureReason.FAILED_BROADCAST_VALIDATION)); assertThatSafeFuture( - blockImportAndBroadcastValidationResults - .broadcastValidationResult() - .orElseThrow()) - .isCompletedWithValue(CONSENSUS_FAILURE); + blockImportAndBroadcastValidationResults.broadcastValidationResult()) + .isCompletedWithValue(BroadcastValidationResult.GOSSIP_FAILURE); return true; }); + assertThat(invalidBlockRoots).doesNotContainKeys(nextBlock.getRoot()); } @Test @@ -1044,6 +1037,7 @@ void onDeneb_shouldNotStoreBlockWhenBlobSidecarsIsNotAvailable() { assertThat(localRecentChainData.getBlobSidecars(block1.getSlotAndBlockRoot())).isEmpty(); assertThat(localRecentChainData.retrieveEarliestBlobSidecarSlot()) .isCompletedWithValueMatching(Optional::isEmpty); + assertThat(invalidBlockRoots).doesNotContainKeys(block1.getRoot()); } @Test @@ -1157,7 +1151,6 @@ private void assertValidateAndImportBlockRejectWithoutValidation(final SignedBea assertThat(blockManager.validateAndImportBlock(block, Optional.empty())) .isCompletedWithValueMatching(InternalValidationResult::isReject); verify(blockValidator, never()).validateGossip(eq(block)); - verify(blockValidator, never()).validateBroadcast(any(), any(), any()); } private void assertImportBlockSuccessfully(final SignedBeaconBlock block) { @@ -1180,7 +1173,6 @@ private UInt64 incrementSlot() { private BlockManager setupBlockManagerWithMockRecentChainData( final RecentChainData localRecentChainData, final boolean isChainHeadOptimistic) { - when(localRecentChainData.isChainHeadOptimistic()).thenReturn(isChainHeadOptimistic); return new BlockManager( localRecentChainData, @@ -1189,7 +1181,7 @@ private BlockManager setupBlockManagerWithMockRecentChainData( pendingBlocks, futureBlocks, invalidBlockRoots, - mock(BlockValidator.class), + blockValidator, timeProvider, eventLogger, Optional.empty(), diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/FailedExecutionPoolTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/FailedExecutionPoolTest.java index 5445e7e3198..5cf7ca002ef 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/FailedExecutionPoolTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/block/FailedExecutionPoolTest.java @@ -26,7 +26,6 @@ import java.io.InterruptedIOException; import java.net.SocketTimeoutException; import java.time.Duration; -import java.util.Optional; import org.junit.jupiter.api.Test; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.StubAsyncRunner; @@ -321,8 +320,7 @@ void shouldStopRetryingBlockWhenImportThrowsExceptionInsteadOfReturningFailedFut .thenReturn( SafeFuture.completedFuture( new BlockImportAndBroadcastValidationResults( - SafeFuture.completedFuture(BlockImportResult.successful(block2)), - Optional.empty()))); + SafeFuture.completedFuture(BlockImportResult.successful(block2))))); failurePool.addFailedBlock(block); failurePool.addFailedBlock(block2); @@ -339,8 +337,7 @@ private void withImportResult(final BlockImportResult result) { when(blockManager.importBlock(any(), any())) .thenReturn( SafeFuture.completedFuture( - new BlockImportAndBroadcastValidationResults( - SafeFuture.completedFuture(result), Optional.empty()))); + new BlockImportAndBroadcastValidationResults(SafeFuture.completedFuture(result)))); } private static InterruptedIOException timeoutException() { diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceTest.java index d14d01f671e..0b662b00964 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -94,6 +95,8 @@ import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager; import tech.pegasys.teku.statetransition.forkchoice.ForkChoice.OptimisticHeadSubscriber; import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceUpdatedResultSubscriber.ForkChoiceUpdatedResultNotification; +import tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator; +import tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator.BroadcastValidationResult; import tech.pegasys.teku.storage.api.TrackingChainHeadChannel.ReorgEvent; import tech.pegasys.teku.storage.client.ChainHead; import tech.pegasys.teku.storage.client.ChainUpdater; @@ -121,7 +124,8 @@ class ForkChoiceTest { private final OptimisticHeadSubscriber optimisticSyncStateTracker = mock(OptimisticHeadSubscriber.class); private ExecutionLayerChannelStub executionLayer; - private Optional> consensusValidationResult = Optional.empty(); + private final BlockBroadcastValidator blockBroadcastValidator = + mock(BlockBroadcastValidator.class); private final MergeTransitionBlockValidator transitionBlockValidator = mock(MergeTransitionBlockValidator.class); @@ -176,6 +180,8 @@ private void setupWithSpec(final Spec unmockedSpec) { setForkChoiceNotifierForkChoiceUpdatedResult(PayloadStatus.VALID); when(transitionBlockValidator.verifyAncestorTransitionBlock(any())) .thenReturn(SafeFuture.completedFuture(PayloadValidationResult.VALID)); + when(blockBroadcastValidator.getResult()) + .thenReturn(SafeFuture.completedFuture(BroadcastValidationResult.SUCCESS)); forkChoice.subscribeToOptimisticHeadChangesAndUpdate(optimisticSyncStateTracker); @@ -231,7 +237,8 @@ void onBlock_shouldFailIfBlobsAreNotAvailable() { when(blobSidecarsAvailabilityChecker.getAvailabilityCheckResult()) .thenReturn(SafeFuture.completedFuture(BlobSidecarsAndValidationResult.NOT_AVAILABLE)); - importBlockWithError(blockAndState, FailureReason.FAILED_DATA_AVAILABILITY_CHECK_NOT_AVAILABLE); + importBlockAndAssertFailure( + blockAndState, FailureReason.FAILED_DATA_AVAILABILITY_CHECK_NOT_AVAILABLE); verify(blobSidecarManager).createAvailabilityChecker(blockAndState.getBlock()); verify(blobSidecarsAvailabilityChecker).initiateDataAvailabilityCheck(); @@ -241,16 +248,34 @@ void onBlock_shouldFailIfBlobsAreNotAvailable() { @Test void onBlock_consensusValidationShouldNotResolveWhenDataAvailabilityFails() { setupWithSpec(TestSpecFactory.createMinimalDeneb()); - consensusValidationResult = Optional.of(new SafeFuture<>()); final SignedBlockAndState blockAndState = chainBuilder.generateBlockAtSlot(ONE); storageSystem.chainUpdater().advanceCurrentSlotToAtLeast(blockAndState.getSlot()); when(blobSidecarsAvailabilityChecker.getAvailabilityCheckResult()) .thenReturn(SafeFuture.completedFuture(BlobSidecarsAndValidationResult.NOT_AVAILABLE)); - importBlockWithError(blockAndState, FailureReason.FAILED_DATA_AVAILABILITY_CHECK_NOT_AVAILABLE); + importBlockAndAssertFailure( + blockAndState, FailureReason.FAILED_DATA_AVAILABILITY_CHECK_NOT_AVAILABLE); - assertThatSafeFuture(consensusValidationResult.get()).isNotDone(); + verify(blockBroadcastValidator, never()).onConsensusValidationSucceeded(); + + verify(blobSidecarManager).createAvailabilityChecker(blockAndState.getBlock()); + verify(blobSidecarsAvailabilityChecker).initiateDataAvailabilityCheck(); + verify(blobSidecarsAvailabilityChecker).getAvailabilityCheckResult(); + } + + @Test + void onBlock_shouldFailWhenBroadcastValidationFails() { + setupWithSpec(TestSpecFactory.createMinimalDeneb()); + final SignedBlockAndState blockAndState = chainBuilder.generateBlockAtSlot(ONE); + storageSystem.chainUpdater().advanceCurrentSlotToAtLeast(blockAndState.getSlot()); + + when(blockBroadcastValidator.getResult()) + .thenReturn(SafeFuture.completedFuture(BroadcastValidationResult.GOSSIP_FAILURE)); + + importBlockAndAssertFailure(blockAndState, FailureReason.FAILED_BROADCAST_VALIDATION); + + verify(blockBroadcastValidator).onConsensusValidationSucceeded(); verify(blobSidecarManager).createAvailabilityChecker(blockAndState.getBlock()); verify(blobSidecarsAvailabilityChecker).initiateDataAvailabilityCheck(); @@ -260,23 +285,21 @@ void onBlock_consensusValidationShouldNotResolveWhenDataAvailabilityFails() { @Test void onBlock_consensusValidationShouldNotResolveWhenEarlyFails() { setupWithSpec(TestSpecFactory.createMinimalDeneb()); - consensusValidationResult = Optional.of(new SafeFuture<>()); final List signedBlockAndStates = chainBuilder.generateBlocksUpToSlot(2); final SignedBlockAndState wrongBlockAndState = signedBlockAndStates.get(signedBlockAndStates.size() - 1); storageSystem.chainUpdater().advanceCurrentSlotToAtLeast(wrongBlockAndState.getSlot()); - importBlockWithError(wrongBlockAndState, FailureReason.UNKNOWN_PARENT); + importBlockAndAssertFailure(wrongBlockAndState, FailureReason.UNKNOWN_PARENT); - assertThatSafeFuture(consensusValidationResult.get()).isNotDone(); + verify(blockBroadcastValidator, never()).onConsensusValidationSucceeded(); } @Test void onBlock_consensusValidationShouldNotResolveWhenStateTransitionFails() throws StateTransitionException { setupWithSpec(TestSpecFactory.createMinimalDeneb()); - consensusValidationResult = Optional.of(new SafeFuture<>()); final SignedBlockAndState blockAndState = chainBuilder.generateBlockAtSlot(ONE); storageSystem.chainUpdater().advanceCurrentSlotToAtLeast(blockAndState.getSlot()); @@ -285,17 +308,20 @@ void onBlock_consensusValidationShouldNotResolveWhenStateTransitionFails() when(blockProcessor.processAndValidateBlock(any(), any(), any(), any())) .thenThrow(new StateTransitionException("error!")); - importBlockWithError(blockAndState, FailureReason.FAILED_STATE_TRANSITION); + importBlockAndAssertFailure(blockAndState, FailureReason.FAILED_STATE_TRANSITION); - assertThatSafeFuture(consensusValidationResult.get()).isNotDone(); + verify(blockBroadcastValidator, never()).onConsensusValidationSucceeded(); } @Test void onBlock_consensusValidationShouldReturnRegardlessExecutionPayloadValidation() { setupWithSpec(TestSpecFactory.createMinimalDeneb()); - consensusValidationResult = Optional.of(new SafeFuture<>()); final SignedBlockAndState blockAndState = chainBuilder.generateBlockAtSlot(ONE); importBlock(blockAndState); + reset(blockBroadcastValidator); + + when(blockBroadcastValidator.getResult()) + .thenReturn(SafeFuture.completedFuture(BroadcastValidationResult.SUCCESS)); // let's prepare a mocked EL with lazy newPayload executionLayer = mock(ExecutionLayerChannelStub.class); @@ -307,8 +333,7 @@ void onBlock_consensusValidationShouldReturnRegardlessExecutionPayloadValidation importBlockNoResultCheck(chainBuilder.generateNextBlock()); // successful consensus check prior to EL validation - assertThatSafeFuture(consensusValidationResult.get()) - .isCompletedWithValueMatching(BlockImportResult::isSuccessful); + verify(blockBroadcastValidator).onConsensusValidationSucceeded(); assertThatSafeFuture(importResult).isNotDone(); @@ -340,7 +365,10 @@ void onBlock_shouldImmediatelyMakeChildOfCurrentHeadTheNewHead() { storageSystem.chainUpdater().advanceCurrentSlotToAtLeast(blockAndState.getSlot()); final SafeFuture importResult = forkChoice.onBlock( - blockAndState.getBlock(), Optional.empty(), Optional.empty(), executionLayer); + blockAndState.getBlock(), + Optional.empty(), + BlockBroadcastValidator.NOOP, + executionLayer); assertBlockImportedSuccessfully(importResult, false); assertThat(recentChainData.getHeadBlock().map(MinimalBeaconBlockSummary::getRoot)) @@ -358,7 +386,7 @@ void onBlock_shouldNotTriggerReorgWhenSelectingChildOfChainHeadWhenForkChoiceSlo final SignedBlockAndState blockAndState = chainBuilder.generateBlockAtSlot(ONE); final SafeFuture importResult = forkChoice.onBlock( - blockAndState.getBlock(), Optional.empty(), Optional.empty(), executionLayer); + blockAndState.getBlock(), Optional.empty(), blockBroadcastValidator, executionLayer); assertBlockImportedSuccessfully(importResult, false); assertThat(recentChainData.getHeadBlock().map(MinimalBeaconBlockSummary::getRoot)) @@ -665,7 +693,10 @@ void onBlock_shouldSendForkChoiceUpdatedNotification() { storageSystem.chainUpdater().advanceCurrentSlotToAtLeast(blockAndState.getSlot()); final SafeFuture importResult = forkChoice.onBlock( - blockAndState.getBlock(), Optional.empty(), Optional.empty(), executionLayer); + blockAndState.getBlock(), + Optional.empty(), + BlockBroadcastValidator.NOOP, + executionLayer); assertBlockImportedSuccessfully(importResult, false); assertForkChoiceUpdateNotification(blockAndState, false); @@ -677,7 +708,8 @@ void onBlock_shouldNotOptimisticallyImportRecentMergeBlock() { // make EL returning SYNCING executionLayer.setPayloadStatus(PayloadStatus.SYNCING); - importBlockWithError(epoch4Block, FailureReason.FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING); + importBlockAndAssertFailure( + epoch4Block, FailureReason.FAILED_EXECUTION_PAYLOAD_EXECUTION_SYNCING); } @Test @@ -691,7 +723,7 @@ void onBlock_shouldNotOptimisticallyImportOnELFailure() { // generate block which finalize epoch 2 final SignedBlockAndState epoch4Block = chainBuilder.generateBlockAtSlot(slotToImport); - importBlockWithError(epoch4Block, FailureReason.FAILED_EXECUTION_PAYLOAD_EXECUTION); + importBlockAndAssertFailure(epoch4Block, FailureReason.FAILED_EXECUTION_PAYLOAD_EXECUTION); } @Test @@ -706,7 +738,8 @@ void onBlock_shouldNotOptimisticallyImportInvalidExecutionPayload() { executionLayer.setPayloadStatus(PayloadStatus.invalid(Optional.empty(), Optional.empty())); storageSystem.chainUpdater().setCurrentSlot(slotToImport.increment()); - importBlockWithError(chainBuilder.generateNextBlock(), FailureReason.FAILED_STATE_TRANSITION); + importBlockAndAssertFailure( + chainBuilder.generateNextBlock(), FailureReason.FAILED_STATE_TRANSITION); } @Test @@ -741,7 +774,7 @@ void onBlock_shouldChangeForkChoiceForLatestValidHashOnInvalidExecutionPayload() Optional.of(maybeValidBlock.getExecutionBlockHash().get()), Optional.empty())); storageSystem.chainUpdater().setCurrentSlot(latestOptimisticBlock.getSlot().increment()); SignedBlockAndState invalidBlock = chainBuilder.generateNextBlock(); - importBlockWithError(invalidBlock, FailureReason.FAILED_STATE_TRANSITION); + importBlockAndAssertFailure(invalidBlock, FailureReason.FAILED_STATE_TRANSITION); assertThat(forkChoice.processHead(invalidBlock.getSlot())).isCompleted(); assertHeadIsOptimistic(maybeValidBlock); @@ -933,7 +966,10 @@ void onBlock_shouldUseLatestValidHashFromForkChoiceUpdated() { setForkChoiceNotifierForkChoiceUpdatedResult(PayloadStatus.SYNCING); final SafeFuture result = forkChoice.onBlock( - blockAndState.getBlock(), Optional.empty(), Optional.empty(), executionLayer); + blockAndState.getBlock(), + Optional.empty(), + BlockBroadcastValidator.NOOP, + executionLayer); assertBlockImportedSuccessfully(result, true); assertForkChoiceUpdateNotification(blockAndState, true); @@ -1281,14 +1317,14 @@ private void assertBlockImportedSuccessfully( private SafeFuture importBlockNoResultCheck(final SignedBlockAndState block) { storageSystem.chainUpdater().advanceCurrentSlotToAtLeast(block.getSlot()); return forkChoice.onBlock( - block.getBlock(), Optional.empty(), consensusValidationResult, executionLayer); + block.getBlock(), Optional.empty(), blockBroadcastValidator, executionLayer); } private void importBlock(final SignedBlockAndState block) { storageSystem.chainUpdater().advanceCurrentSlotToAtLeast(block.getSlot()); final SafeFuture result = forkChoice.onBlock( - block.getBlock(), Optional.empty(), consensusValidationResult, executionLayer); + block.getBlock(), Optional.empty(), blockBroadcastValidator, executionLayer); assertBlockImportedSuccessfully(result, false); } @@ -1296,7 +1332,7 @@ private void importBlockOptimistically(final SignedBlockAndState block) { storageSystem.chainUpdater().advanceCurrentSlotToAtLeast(block.getSlot()); final SafeFuture result = forkChoice.onBlock( - block.getBlock(), Optional.empty(), consensusValidationResult, executionLayer); + block.getBlock(), Optional.empty(), blockBroadcastValidator, executionLayer); assertBlockImportedSuccessfully(result, true); } @@ -1307,12 +1343,12 @@ private void assertBlockImportFailure( assertThat(result.getFailureReason()).isEqualTo(failureReason); } - private void importBlockWithError( + private void importBlockAndAssertFailure( final SignedBlockAndState block, final FailureReason failureReason) { storageSystem.chainUpdater().advanceCurrentSlotToAtLeast(block.getSlot()); final SafeFuture result = forkChoice.onBlock( - block.getBlock(), Optional.empty(), consensusValidationResult, executionLayer); + block.getBlock(), Optional.empty(), blockBroadcastValidator, executionLayer); assertBlockImportFailure(result, failureReason); } diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/validation/BlockBroadcastValidatorTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/validation/BlockBroadcastValidatorTest.java new file mode 100644 index 00000000000..77a65241836 --- /dev/null +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/validation/BlockBroadcastValidatorTest.java @@ -0,0 +1,212 @@ +/* + * Copyright Consensys Software Inc., 2023 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.validation; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION; +import static tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel.GOSSIP; +import static tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel.NOT_REQUIRED; +import static tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator.BroadcastValidationResult.CONSENSUS_FAILURE; +import static tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator.BroadcastValidationResult.FINAL_EQUIVOCATION_FAILURE; +import static tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator.BroadcastValidationResult.GOSSIP_FAILURE; +import static tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator.BroadcastValidationResult.SUCCESS; + +import java.util.Arrays; +import java.util.stream.Stream; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel; +import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult; +import tech.pegasys.teku.spec.util.DataStructureUtil; + +public class BlockBroadcastValidatorTest { + private final Spec spec = TestSpecFactory.createMinimalPhase0(); + private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec); + private final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(); + + private final BlockGossipValidator blockGossipValidator = mock(BlockGossipValidator.class); + + private BlockBroadcastValidator blockBroadcastValidator; + + final SafeFuture blockImportResult = new SafeFuture<>(); + + @Test + public void shouldReturnSuccessWhenValidationIsGossipAndGossipValidationReturnsAccept() { + when(blockGossipValidator.validate(eq(block), eq(true))) + .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); + + prepareBlockBroadcastValidator(GOSSIP); + + assertThat(blockBroadcastValidator.getResult()) + .isCompletedWithValueMatching(result -> result.equals(SUCCESS)); + verify(blockGossipValidator).validate(eq(block), eq(true)); + verifyNoMoreInteractions(blockGossipValidator); + } + + @Test + public void + shouldReturnSuccessWhenValidationIsGossipAndGossipValidationReturnsAcceptEvenWhenBlockImportFails() { + when(blockGossipValidator.validate(eq(block), eq(true))) + .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); + + blockImportResult.completeExceptionally(new RuntimeException("error")); + + prepareBlockBroadcastValidator(GOSSIP); + + assertThat(blockBroadcastValidator.getResult()) + .isCompletedWithValueMatching(result -> result.equals(SUCCESS)); + verify(blockGossipValidator).validate(eq(block), eq(true)); + verifyNoMoreInteractions(blockGossipValidator); + } + + @ParameterizedTest + @MethodSource("provideBroadcastValidationsAndGossipFailures") + public void shouldReturnGossipFailureImmediatelyWhenGossipValidationIsNotAccept( + final BroadcastValidationLevel broadcastValidation, + final InternalValidationResult internalValidationResult) { + + if (broadcastValidation == NOT_REQUIRED) { + prepareBlockBroadcastValidator(broadcastValidation); + + assertThat(blockBroadcastValidator.getResult()) + .isCompletedWithValueMatching(result -> result.equals(SUCCESS)); + verifyNoMoreInteractions(blockGossipValidator); + return; + } + + when(blockGossipValidator.validate(eq(block), eq(true))) + .thenReturn(SafeFuture.completedFuture(internalValidationResult)); + + prepareBlockBroadcastValidator(broadcastValidation); + + // consensus validation success should not affect the result + blockBroadcastValidator.onConsensusValidationSucceeded(); + + assertThat(blockBroadcastValidator.getResult()) + .isCompletedWithValueMatching(result -> result.equals(GOSSIP_FAILURE)); + verify(blockGossipValidator).validate(eq(block), eq(true)); + verifyNoMoreInteractions(blockGossipValidator); + } + + @ParameterizedTest + @EnumSource( + value = BroadcastValidationLevel.class, + names = {"CONSENSUS", "CONSENSUS_AND_EQUIVOCATION"}) + public void shouldReturnConsensusFailureImmediatelyWhenConsensusValidationIsNotSuccessful( + final BroadcastValidationLevel broadcastValidation) { + + when(blockGossipValidator.validate(eq(block), eq(true))) + .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); + + prepareBlockBroadcastValidator(broadcastValidation); + + blockImportResult.complete( + BlockImportResult.failedStateTransition(new RuntimeException("error"))); + + assertThat(blockBroadcastValidator.getResult()) + .isCompletedWithValueMatching(result -> result.equals(CONSENSUS_FAILURE)); + verify(blockGossipValidator).validate(eq(block), eq(true)); + verifyNoMoreInteractions(blockGossipValidator); + } + + @ParameterizedTest + @EnumSource( + value = BroadcastValidationLevel.class, + names = {"CONSENSUS", "CONSENSUS_AND_EQUIVOCATION"}) + public void shouldReturnConsensusFailureImmediatelyWhenConsensusCompleteExceptionally( + final BroadcastValidationLevel broadcastValidation) { + when(blockGossipValidator.validate(eq(block), eq(true))) + .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); + + prepareBlockBroadcastValidator(broadcastValidation); + + blockImportResult.completeExceptionally(new RuntimeException("error")); + + assertThat(blockBroadcastValidator.getResult()).isCompletedExceptionally(); + verify(blockGossipValidator).validate(eq(block), eq(true)); + verifyNoMoreInteractions(blockGossipValidator); + } + + @Test + public void shouldReturnSuccessWhenSecondEquivocationCheckIsValidated() { + when(blockGossipValidator.validate(eq(block), eq(true))) + .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); + + when(blockGossipValidator.blockIsFirstBlockWithValidSignatureForSlot(eq(block))) + .thenReturn(Boolean.TRUE); + + prepareBlockBroadcastValidator(CONSENSUS_AND_EQUIVOCATION); + + assertThat(blockBroadcastValidator.getResult()).isNotDone(); + + blockBroadcastValidator.onConsensusValidationSucceeded(); + + // any subsequent failures won't affect the result + blockImportResult.completeExceptionally(new RuntimeException("error")); + + assertThat(blockBroadcastValidator.getResult()) + .isCompletedWithValueMatching(result -> result.equals(SUCCESS)); + verify(blockGossipValidator).validate(eq(block), eq(true)); + verify(blockGossipValidator).blockIsFirstBlockWithValidSignatureForSlot(eq(block)); + verifyNoMoreInteractions(blockGossipValidator); + } + + @Test + public void shouldReturnFinalEquivocationFailureWhenSecondEquivocationCheckFails() { + when(blockGossipValidator.validate(eq(block), eq(true))) + .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); + + when(blockGossipValidator.blockIsFirstBlockWithValidSignatureForSlot(eq(block))) + .thenReturn(Boolean.FALSE); + + prepareBlockBroadcastValidator(CONSENSUS_AND_EQUIVOCATION); + + blockBroadcastValidator.onConsensusValidationSucceeded(); + + assertThat(blockBroadcastValidator.getResult()) + .isCompletedWithValueMatching(result -> result.equals(FINAL_EQUIVOCATION_FAILURE)); + verify(blockGossipValidator).validate(eq(block), eq(true)); + verify(blockGossipValidator).blockIsFirstBlockWithValidSignatureForSlot(eq(block)); + verifyNoMoreInteractions(blockGossipValidator); + } + + private static Stream provideBroadcastValidationsAndGossipFailures() { + return Arrays.stream(BroadcastValidationLevel.values()) + .flatMap( + broadcastValidation -> + Stream.of( + Arguments.of(broadcastValidation, InternalValidationResult.IGNORE), + Arguments.of(broadcastValidation, InternalValidationResult.SAVE_FOR_FUTURE))); + } + + private void prepareBlockBroadcastValidator( + final BroadcastValidationLevel broadcastValidationLevel) { + blockBroadcastValidator = + BlockBroadcastValidator.create(block, blockGossipValidator, broadcastValidationLevel); + + blockBroadcastValidator.attachToBlockImport(blockImportResult); + } +} diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/validation/BlockValidatorTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/validation/BlockValidatorTest.java index 35272ab3521..a04297ea88d 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/validation/BlockValidatorTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/validation/BlockValidatorTest.java @@ -20,21 +20,12 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import java.util.Arrays; -import java.util.stream.Stream; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.EnumSource; -import org.junit.jupiter.params.provider.MethodSource; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; -import tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel; -import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult; import tech.pegasys.teku.spec.util.DataStructureUtil; -import tech.pegasys.teku.statetransition.validation.BlockValidator.BroadcastValidationResult; public class BlockValidatorTest { private final Spec spec = TestSpecFactory.createMinimalPhase0(); @@ -44,8 +35,6 @@ public class BlockValidatorTest { private final BlockValidator blockValidator = new BlockValidator(blockGossipValidator); - final SafeFuture consensusValidationResult = new SafeFuture<>(); - @Test public void shouldExposeGossipValidation() { final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(); @@ -58,144 +47,4 @@ public void shouldExposeGossipValidation() { verify(blockGossipValidator).validate(eq(block), eq(false)); verifyNoMoreInteractions(blockGossipValidator); } - - @Test - public void shouldReturnSuccessWhenValidationIsGossipAndGossipValidationReturnsAccept() { - final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(); - - when(blockGossipValidator.validate(eq(block), eq(true))) - .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); - - assertThat( - blockValidator.validateBroadcast( - block, BroadcastValidationLevel.GOSSIP, consensusValidationResult)) - .isCompletedWithValueMatching(result -> result.equals(BroadcastValidationResult.SUCCESS)); - verify(blockGossipValidator).validate(eq(block), eq(true)); - verifyNoMoreInteractions(blockGossipValidator); - } - - @ParameterizedTest - @MethodSource("provideBroadcastValidationsAndGossipFailures") - public void shouldReturnGossipFailureImmediatelyWhenGossipValidationIsNotAccept( - final BroadcastValidationLevel broadcastValidation, - final InternalValidationResult internalValidationResult) { - final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(); - - if (broadcastValidation == BroadcastValidationLevel.NOT_REQUIRED) { - assertThat( - blockValidator.validateBroadcast( - block, broadcastValidation, consensusValidationResult)) - .isCompletedWithValueMatching(result -> result.equals(BroadcastValidationResult.SUCCESS)); - verifyNoMoreInteractions(blockGossipValidator); - return; - } - - when(blockGossipValidator.validate(eq(block), eq(true))) - .thenReturn(SafeFuture.completedFuture(internalValidationResult)); - - assertThat( - blockValidator.validateBroadcast(block, broadcastValidation, consensusValidationResult)) - .isCompletedWithValueMatching( - result -> result.equals(BroadcastValidationResult.GOSSIP_FAILURE)); - verify(blockGossipValidator).validate(eq(block), eq(true)); - verifyNoMoreInteractions(blockGossipValidator); - } - - @ParameterizedTest - @EnumSource( - value = BroadcastValidationLevel.class, - names = {"CONSENSUS", "CONSENSUS_AND_EQUIVOCATION"}) - public void shouldReturnConsensusFailureImmediatelyWhenConsensusValidationIsNotSuccessful( - final BroadcastValidationLevel broadcastValidation) { - final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(); - - when(blockGossipValidator.validate(eq(block), eq(true))) - .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); - - consensusValidationResult.complete( - BlockImportResult.failedStateTransition(new RuntimeException("error"))); - - assertThat( - blockValidator.validateBroadcast(block, broadcastValidation, consensusValidationResult)) - .isCompletedWithValueMatching( - result -> result.equals(BroadcastValidationResult.CONSENSUS_FAILURE)); - verify(blockGossipValidator).validate(eq(block), eq(true)); - verifyNoMoreInteractions(blockGossipValidator); - } - - @ParameterizedTest - @EnumSource( - value = BroadcastValidationLevel.class, - names = {"CONSENSUS", "CONSENSUS_AND_EQUIVOCATION"}) - public void shouldReturnConsensusFailureImmediatelyWhenConsensusCompleteExceptionally( - final BroadcastValidationLevel broadcastValidation) { - final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(); - - when(blockGossipValidator.validate(eq(block), eq(true))) - .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); - - consensusValidationResult.completeExceptionally(new RuntimeException("error")); - - assertThat( - blockValidator.validateBroadcast(block, broadcastValidation, consensusValidationResult)) - .isCompletedExceptionally(); - verify(blockGossipValidator).validate(eq(block), eq(true)); - verifyNoMoreInteractions(blockGossipValidator); - } - - @Test - public void shouldReturnSuccessWhenSecondEquivocationCheckIsValidated() { - final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(); - - when(blockGossipValidator.validate(eq(block), eq(true))) - .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); - - consensusValidationResult.complete(BlockImportResult.successful(block)); - - when(blockGossipValidator.blockIsFirstBlockWithValidSignatureForSlot(eq(block))) - .thenReturn(Boolean.valueOf(true)); - - assertThat( - blockValidator.validateBroadcast( - block, - BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION, - consensusValidationResult)) - .isCompletedWithValueMatching(result -> result.equals(BroadcastValidationResult.SUCCESS)); - verify(blockGossipValidator).validate(eq(block), eq(true)); - verify(blockGossipValidator).blockIsFirstBlockWithValidSignatureForSlot(eq(block)); - verifyNoMoreInteractions(blockGossipValidator); - } - - @Test - public void shouldReturnFinalEquivocationFailureWhenSecondEquivocationCheckFails() { - final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(); - - when(blockGossipValidator.validate(eq(block), eq(true))) - .thenReturn(SafeFuture.completedFuture(InternalValidationResult.ACCEPT)); - - consensusValidationResult.complete(BlockImportResult.successful(block)); - - when(blockGossipValidator.blockIsFirstBlockWithValidSignatureForSlot(eq(block))) - .thenReturn(Boolean.valueOf(false)); - - assertThat( - blockValidator.validateBroadcast( - block, - BroadcastValidationLevel.CONSENSUS_AND_EQUIVOCATION, - consensusValidationResult)) - .isCompletedWithValueMatching( - result -> result.equals(BroadcastValidationResult.FINAL_EQUIVOCATION_FAILURE)); - verify(blockGossipValidator).validate(eq(block), eq(true)); - verify(blockGossipValidator).blockIsFirstBlockWithValidSignatureForSlot(eq(block)); - verifyNoMoreInteractions(blockGossipValidator); - } - - private static Stream provideBroadcastValidationsAndGossipFailures() { - return Arrays.stream(BroadcastValidationLevel.values()) - .flatMap( - broadcastValidation -> - Stream.of( - Arguments.of(broadcastValidation, InternalValidationResult.IGNORE), - Arguments.of(broadcastValidation, InternalValidationResult.SAVE_FOR_FUTURE))); - } } diff --git a/ethereum/statetransition/src/testFixtures/java/tech/pegasys/teku/statetransition/BeaconChainUtil.java b/ethereum/statetransition/src/testFixtures/java/tech/pegasys/teku/statetransition/BeaconChainUtil.java index 3ceca1bdc96..61ac89930b2 100644 --- a/ethereum/statetransition/src/testFixtures/java/tech/pegasys/teku/statetransition/BeaconChainUtil.java +++ b/ethereum/statetransition/src/testFixtures/java/tech/pegasys/teku/statetransition/BeaconChainUtil.java @@ -50,6 +50,7 @@ import tech.pegasys.teku.statetransition.forkchoice.ForkChoice; import tech.pegasys.teku.statetransition.forkchoice.MergeTransitionBlockValidator; import tech.pegasys.teku.statetransition.forkchoice.StubForkChoiceNotifier; +import tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator; import tech.pegasys.teku.storage.client.ChainHead; import tech.pegasys.teku.storage.client.RecentChainData; import tech.pegasys.teku.storage.store.UpdatableStore.StoreTransaction; @@ -204,7 +205,7 @@ private SignedBeaconBlock createAndImportBlockAtSlot( .onBlock( block, Optional.empty(), - Optional.empty(), + BlockBroadcastValidator.NOOP, new ExecutionLayerChannelStub(spec, false, Optional.empty())) .join(); if (!importResult.isSuccessful()) { diff --git a/fork-choice-tests/src/integration-test/java/tech/pegasys/teku/forkChoiceTests/ForkChoiceIntegrationTest.java b/fork-choice-tests/src/integration-test/java/tech/pegasys/teku/forkChoiceTests/ForkChoiceIntegrationTest.java index 63169c85fac..f81dd08ab40 100644 --- a/fork-choice-tests/src/integration-test/java/tech/pegasys/teku/forkChoiceTests/ForkChoiceIntegrationTest.java +++ b/fork-choice-tests/src/integration-test/java/tech/pegasys/teku/forkChoiceTests/ForkChoiceIntegrationTest.java @@ -56,6 +56,7 @@ import tech.pegasys.teku.statetransition.forkchoice.ForkChoice; import tech.pegasys.teku.statetransition.forkchoice.MergeTransitionBlockValidator; import tech.pegasys.teku.statetransition.forkchoice.StubForkChoiceNotifier; +import tech.pegasys.teku.statetransition.validation.BlockBroadcastValidator; import tech.pegasys.teku.storage.client.MemoryOnlyRecentChainData; import tech.pegasys.teku.storage.client.RecentChainData; import tech.pegasys.teku.storage.store.UpdatableStore; @@ -287,7 +288,7 @@ private boolean processBlock(ForkChoice fc, SignedBeaconBlock block) { fc.onBlock( block, Optional.empty(), - Optional.empty(), + BlockBroadcastValidator.NOOP, new ExecutionLayerChannelStub(SPEC, false, Optional.empty())) .join(); return blockImportResult.isSuccessful();