Skip to content

Commit

Permalink
[ePBS] Block production (part 1)
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Sep 12, 2024
1 parent e9deba9 commit 624d66c
Show file tree
Hide file tree
Showing 17 changed files with 324 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.gossip.BlobSidecarGossipChannel;
import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel;
import tech.pegasys.teku.networking.eth2.gossip.ExecutionPayloadGossipChannel;
import tech.pegasys.teku.networking.eth2.gossip.subnets.AttestationTopicSubscriber;
import tech.pegasys.teku.networking.eth2.gossip.subnets.SyncCommitteeSubscriptionManager;
import tech.pegasys.teku.spec.Spec;
Expand All @@ -50,6 +51,8 @@
import tech.pegasys.teku.statetransition.attestation.PayloadAttestationManager;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.block.BlockImportChannel;
import tech.pegasys.teku.statetransition.execution.ExecutionPayloadHeaderPool;
import tech.pegasys.teku.statetransition.execution.ExecutionPayloadManager;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger;
import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager;
import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool;
Expand All @@ -73,6 +76,8 @@ public class ValidatorApiHandlerIntegrationTest {
// Other dependencies are mocked, but these can be updated as needed
private final SyncStateProvider syncStateProvider = mock(SyncStateTracker.class);
private final BlockFactory blockFactory = mock(BlockFactory.class);
private final ExecutionPayloadHeaderFactory executionPayloadHeaderFactory =
mock(ExecutionPayloadHeaderFactory.class);
private final AggregatingAttestationPool attestationPool = mock(AggregatingAttestationPool.class);
private final AttestationManager attestationManager = mock(AttestationManager.class);
private final PayloadAttestationManager payloadAttestationManager =
Expand All @@ -88,6 +93,8 @@ public class ValidatorApiHandlerIntegrationTest {
mock(BlockBlobSidecarsTrackersPool.class);
private final BlobSidecarGossipChannel blobSidecarGossipChannel =
mock(BlobSidecarGossipChannel.class);
private final ExecutionPayloadGossipChannel executionPayloadGossipChannel =
mock(ExecutionPayloadGossipChannel.class);
private final ChainDataProvider chainDataProvider = mock(ChainDataProvider.class);
private final NodeDataProvider nodeDataProvider = mock(NodeDataProvider.class);
private final NetworkDataProvider networkDataProvider = mock(NetworkDataProvider.class);
Expand All @@ -101,6 +108,10 @@ public class ValidatorApiHandlerIntegrationTest {
mock(SyncCommitteeContributionPool.class);
private final SyncCommitteeSubscriptionManager syncCommitteeSubscriptionManager =
mock(SyncCommitteeSubscriptionManager.class);
private final ExecutionPayloadHeaderPool executionPayloadHeaderPool =
mock(ExecutionPayloadHeaderPool.class);
private final ExecutionPayloadManager executionPayloadManager =
mock(ExecutionPayloadManager.class);

private final DutyMetrics dutyMetrics = mock(DutyMetrics.class);
private final ValidatorApiHandler handler =
Expand All @@ -111,10 +122,12 @@ public class ValidatorApiHandlerIntegrationTest {
combinedChainDataClient,
syncStateProvider,
blockFactory,
executionPayloadHeaderFactory,
blockImportChannel,
blockGossipChannel,
blockBlobSidecarsTrackersPool,
blobSidecarGossipChannel,
executionPayloadGossipChannel,
attestationPool,
attestationManager,
payloadAttestationManager,
Expand All @@ -128,6 +141,8 @@ public class ValidatorApiHandlerIntegrationTest {
syncCommitteeMessagePool,
syncCommitteeContributionPool,
syncCommitteeSubscriptionManager,
executionPayloadHeaderPool,
executionPayloadManager,
new BlockProductionAndPublishingPerformanceFactory(
new SystemTimeProvider(), __ -> UInt64.ZERO, true, 0, 0, 0, 0));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.validator.coordinator;

import tech.pegasys.teku.bls.BLSPublicKey;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayloadHeader;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconStateCache;
import tech.pegasys.teku.spec.executionlayer.ExecutionLayerBlockProductionManager;

@SuppressWarnings("unused")
public class ExecutionPayloadHeaderFactory {

private final Spec spec;
private final ExecutionLayerBlockProductionManager executionLayerBlockProductionManager;

public ExecutionPayloadHeaderFactory(
final Spec spec,
final ExecutionLayerBlockProductionManager executionLayerBlockProductionManager) {
this.spec = spec;
this.executionLayerBlockProductionManager = executionLayerBlockProductionManager;
}

// EIP7732 TODO: implement
public SafeFuture<ExecutionPayloadHeader> createUnsignedHeader(
final BeaconState state, final UInt64 slot, final BLSPublicKey builderPublicKey) {
int builderIndex =
BeaconStateCache.getTransitionCaches(state)
.getValidatorIndexCache()
.getValidatorIndex(state, builderPublicKey)
.orElseThrow(
() ->
new IllegalArgumentException(
"There is no index assigned to a builder with a public key "
+ builderPublicKey));
return SafeFuture.completedFuture(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.gossip.BlobSidecarGossipChannel;
import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel;
import tech.pegasys.teku.networking.eth2.gossip.ExecutionPayloadGossipChannel;
import tech.pegasys.teku.networking.eth2.gossip.subnets.AttestationTopicSubscriber;
import tech.pegasys.teku.networking.eth2.gossip.subnets.SyncCommitteeSubscriptionManager;
import tech.pegasys.teku.spec.Spec;
Expand All @@ -82,6 +83,7 @@
import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayloadHeader;
import tech.pegasys.teku.spec.datastructures.execution.SignedExecutionPayloadEnvelope;
import tech.pegasys.teku.spec.datastructures.execution.SignedExecutionPayloadHeader;
import tech.pegasys.teku.spec.datastructures.execution.versions.eip7732.ExecutionPayloadHeaderEip7732;
import tech.pegasys.teku.spec.datastructures.genesis.GenesisData;
import tech.pegasys.teku.spec.datastructures.metadata.BlockContainerAndMetaData;
import tech.pegasys.teku.spec.datastructures.operations.Attestation;
Expand All @@ -103,6 +105,8 @@
import tech.pegasys.teku.statetransition.attestation.PayloadAttestationManager;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.block.BlockImportChannel;
import tech.pegasys.teku.statetransition.execution.ExecutionPayloadHeaderPool;
import tech.pegasys.teku.statetransition.execution.ExecutionPayloadManager;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger;
import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager;
import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool;
Expand All @@ -117,6 +121,7 @@
import tech.pegasys.teku.validator.coordinator.duties.AttesterDutiesGenerator;
import tech.pegasys.teku.validator.coordinator.performance.PerformanceTracker;
import tech.pegasys.teku.validator.coordinator.publisher.BlockPublisher;
import tech.pegasys.teku.validator.coordinator.publisher.ExecutionPayloadPublisher;
import tech.pegasys.teku.validator.coordinator.publisher.MilestoneBasedBlockPublisher;

public class ValidatorApiHandler implements ValidatorApiChannel {
Expand All @@ -141,6 +146,7 @@ public class ValidatorApiHandler implements ValidatorApiChannel {
private final CombinedChainDataClient combinedChainDataClient;
private final SyncStateProvider syncStateProvider;
private final BlockFactory blockFactory;
private final ExecutionPayloadHeaderFactory executionPayloadHeaderFactory;
private final AggregatingAttestationPool attestationPool;
private final AttestationManager attestationManager;
private final PayloadAttestationManager payloadAttestationManager;
Expand All @@ -153,8 +159,10 @@ public class ValidatorApiHandler implements ValidatorApiChannel {
private final SyncCommitteeMessagePool syncCommitteeMessagePool;
private final SyncCommitteeSubscriptionManager syncCommitteeSubscriptionManager;
private final SyncCommitteeContributionPool syncCommitteeContributionPool;
private final ExecutionPayloadHeaderPool executionPayloadHeaderPool;
private final ProposersDataManager proposersDataManager;
private final BlockPublisher blockPublisher;
private final ExecutionPayloadPublisher executionPayloadPublisher;
private final AttesterDutiesGenerator attesterDutiesGenerator;

public ValidatorApiHandler(
Expand All @@ -164,10 +172,12 @@ public ValidatorApiHandler(
final CombinedChainDataClient combinedChainDataClient,
final SyncStateProvider syncStateProvider,
final BlockFactory blockFactory,
final ExecutionPayloadHeaderFactory executionPayloadHeaderFactory,
final BlockImportChannel blockImportChannel,
final BlockGossipChannel blockGossipChannel,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final BlobSidecarGossipChannel blobSidecarGossipChannel,
final ExecutionPayloadGossipChannel executionPayloadGossipChannel,
final AggregatingAttestationPool attestationPool,
final AttestationManager attestationManager,
final PayloadAttestationManager payloadAttestationManager,
Expand All @@ -181,6 +191,8 @@ public ValidatorApiHandler(
final SyncCommitteeMessagePool syncCommitteeMessagePool,
final SyncCommitteeContributionPool syncCommitteeContributionPool,
final SyncCommitteeSubscriptionManager syncCommitteeSubscriptionManager,
final ExecutionPayloadHeaderPool executionPayloadHeaderPool,
final ExecutionPayloadManager executionPayloadManager,
final BlockProductionAndPublishingPerformanceFactory
blockProductionAndPublishingPerformanceFactory) {
this.blockProductionAndPublishingPerformanceFactory =
Expand All @@ -191,6 +203,7 @@ public ValidatorApiHandler(
this.combinedChainDataClient = combinedChainDataClient;
this.syncStateProvider = syncStateProvider;
this.blockFactory = blockFactory;
this.executionPayloadHeaderFactory = executionPayloadHeaderFactory;
this.attestationPool = attestationPool;
this.attestationManager = attestationManager;
this.payloadAttestationManager = payloadAttestationManager;
Expand All @@ -203,6 +216,7 @@ public ValidatorApiHandler(
this.syncCommitteeMessagePool = syncCommitteeMessagePool;
this.syncCommitteeContributionPool = syncCommitteeContributionPool;
this.syncCommitteeSubscriptionManager = syncCommitteeSubscriptionManager;
this.executionPayloadHeaderPool = executionPayloadHeaderPool;
this.proposersDataManager = proposersDataManager;
this.blockPublisher =
new MilestoneBasedBlockPublisher(
Expand All @@ -214,6 +228,8 @@ public ValidatorApiHandler(
blobSidecarGossipChannel,
performanceTracker,
dutyMetrics);
this.executionPayloadPublisher =
new ExecutionPayloadPublisher(executionPayloadManager, executionPayloadGossipChannel);
this.attesterDutiesGenerator = new AttesterDutiesGenerator(spec);
}

Expand Down Expand Up @@ -243,6 +259,27 @@ public SafeFuture<Map<BLSPublicKey, Integer>> getValidatorIndices(
});
}

@Override
public SafeFuture<Optional<Map<BLSPublicKey, ValidatorStatus>>> getValidatorStatuses(
final Collection<BLSPublicKey> validatorIdentifiers) {
return isSyncActive()
? SafeFuture.completedFuture(Optional.empty())
: chainDataProvider
.getStateValidators(
"head",
validatorIdentifiers.stream().map(BLSPublicKey::toString).toList(),
new HashSet<>())
.thenApply(
(maybeList) ->
maybeList.map(
list ->
list.getData().stream()
.collect(
toMap(
StateValidatorData::getPublicKey,
StateValidatorData::getStatus))));
}

@Override
public SafeFuture<Optional<AttesterDuties>> getAttestationDuties(
final UInt64 epoch, final IntCollection validatorIndices) {
Expand Down Expand Up @@ -355,29 +392,27 @@ public SafeFuture<Optional<PeerCount>> getPeerCount() {
}

@Override
public SafeFuture<Optional<ExecutionPayloadHeader>> getHeader(final UInt64 slot) {
throw new UnsupportedOperationException("This method is not implemented by the Beacon Node");
}

@Override
public SafeFuture<Optional<Map<BLSPublicKey, ValidatorStatus>>> getValidatorStatuses(
final Collection<BLSPublicKey> validatorIdentifiers) {
return isSyncActive()
? SafeFuture.completedFuture(Optional.empty())
: chainDataProvider
.getStateValidators(
"head",
validatorIdentifiers.stream().map(BLSPublicKey::toString).toList(),
new HashSet<>())
.thenApply(
(maybeList) ->
maybeList.map(
list ->
list.getData().stream()
.collect(
toMap(
StateValidatorData::getPublicKey,
StateValidatorData::getStatus))));
public SafeFuture<Optional<ExecutionPayloadHeader>> getHeader(
final UInt64 slot, final BLSPublicKey builderPublicKey) {
LOG.info(
"Creating unsigned header for slot {} and builder with public key {}",
slot,
builderPublicKey);
if (isSyncActive()) {
return NodeSyncingException.failedFuture();
}
// Not sure the best state to use
final Optional<SafeFuture<BeaconState>> maybeBestState = combinedChainDataClient.getBestState();
if (maybeBestState.isEmpty()) {
return SafeFuture.completedFuture(Optional.empty());
}
return maybeBestState
.get()
.thenCompose(
state ->
executionPayloadHeaderFactory
.createUnsignedHeader(state, slot, builderPublicKey)
.thenApply(Optional::of));
}

@Override
Expand Down Expand Up @@ -624,7 +659,7 @@ public SafeFuture<Optional<SyncCommitteeContribution>> createSyncCommitteeContri

@Override
public SafeFuture<Optional<ExecutionPayloadEnvelope>> getExecutionPayloadEnvelope(
final UInt64 slot, final Bytes32 parentBlockRoot) {
final UInt64 slot, final BLSPublicKey builderPublicKey) {
throw new UnsupportedOperationException("This method is not implemented by the Beacon Node");
}

Expand Down Expand Up @@ -742,7 +777,17 @@ public SafeFuture<List<SubmitDataError>> sendAggregateAndProofs(

@Override
public SafeFuture<Void> sendSignedHeader(final SignedExecutionPayloadHeader signedHeader) {
throw new UnsupportedOperationException("This method is not implemented by the Beacon Node");
return executionPayloadHeaderPool
.addLocal(signedHeader)
.thenAccept(
result -> {
if (result.isNotProcessable()) {
final UInt64 slot =
ExecutionPayloadHeaderEip7732.required(signedHeader.getMessage()).getSlot();
VALIDATOR_LOGGER.producedInvalidBid(
slot, result.getDescription().orElse("Unknown reason"));
}
});
}

private SafeFuture<InternalValidationResult> processAggregateAndProof(
Expand Down Expand Up @@ -788,7 +833,16 @@ public SafeFuture<SendSignedBlockResult> sendSignedBlock(
@Override
public SafeFuture<Void> sendSignedExecutionPayloadEnvelope(
final SignedExecutionPayloadEnvelope signedExecutionPayloadEnvelope) {
throw new UnsupportedOperationException("This method is not implemented by the Beacon Node");
return executionPayloadPublisher
.sendExecutionPayload(signedExecutionPayloadEnvelope)
.thenAccept(
result -> {
if (result.isNotProcessable()) {
VALIDATOR_LOGGER.producedInvalidExecutionPayload(
signedExecutionPayloadEnvelope.getMessage().getBeaconBlockRoot(),
result.getDescription().orElse("Unknown reason"));
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.validator.coordinator.publisher;

import java.util.Optional;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.networking.eth2.gossip.ExecutionPayloadGossipChannel;
import tech.pegasys.teku.spec.datastructures.execution.SignedExecutionPayloadEnvelope;
import tech.pegasys.teku.statetransition.execution.ExecutionPayloadManager;
import tech.pegasys.teku.statetransition.validation.InternalValidationResult;

public class ExecutionPayloadPublisher {

private final ExecutionPayloadManager executionPayloadManager;
private final ExecutionPayloadGossipChannel executionPayloadGossipChannel;

public ExecutionPayloadPublisher(
final ExecutionPayloadManager executionPayloadManager,
final ExecutionPayloadGossipChannel executionPayloadGossipChannel) {
this.executionPayloadManager = executionPayloadManager;
this.executionPayloadGossipChannel = executionPayloadGossipChannel;
}

public SafeFuture<InternalValidationResult> sendExecutionPayload(
final SignedExecutionPayloadEnvelope executionPayload) {
return executionPayloadManager
.validateAndImportExecutionPayload(executionPayload, Optional.empty())
.thenPeek(
result -> {
if (result.isAccept()) {
executionPayloadGossipChannel.publishExecutionPayload(executionPayload);
}
});
}
}
Loading

0 comments on commit 624d66c

Please sign in to comment.