Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow pending attestations to trigger recent block fetcher #8907

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.networking.eth2.Eth2P2PNetwork;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.logic.common.util.AsyncBLSSignatureVerifier;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class DefaultSyncServiceFactory implements SyncServiceFactory {
private final BlockImporter blockImporter;
private final BlobSidecarManager blobSidecarManager;
private final PendingPool<SignedBeaconBlock> pendingBlocks;
private final PendingPool<ValidatableAttestation> pendingAttestations;
private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;
private final int getStartupTargetPeerCount;
private final AsyncBLSSignatureVerifier signatureVerifier;
Expand All @@ -87,6 +89,7 @@ public DefaultSyncServiceFactory(
final BlockImporter blockImporter,
final BlobSidecarManager blobSidecarManager,
final PendingPool<SignedBeaconBlock> pendingBlocks,
final PendingPool<ValidatableAttestation> pendingAttestations,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final int getStartupTargetPeerCount,
final SignatureVerificationService signatureVerifier,
Expand All @@ -105,6 +108,7 @@ public DefaultSyncServiceFactory(
this.blockImporter = blockImporter;
this.blobSidecarManager = blobSidecarManager;
this.pendingBlocks = pendingBlocks;
this.pendingAttestations = pendingAttestations;
this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool;
this.getStartupTargetPeerCount = getStartupTargetPeerCount;
this.signatureVerifier = signatureVerifier;
Expand All @@ -126,6 +130,7 @@ public SyncService create(final EventChannels eventChannels) {
RecentBlocksFetchService.create(
asyncRunner,
pendingBlocks,
pendingAttestations,
blockBlobSidecarsTrackersPool,
forwardSyncService,
fetchTaskFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.subscribers.Subscribers;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.util.PendingPool;
Expand All @@ -37,33 +38,38 @@ public class RecentBlocksFetchService

private final ForwardSync forwardSync;
private final PendingPool<SignedBeaconBlock> pendingBlockPool;
private final PendingPool<ValidatableAttestation> pendingAttestationsPool;
private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;
private final FetchTaskFactory fetchTaskFactory;
private final Subscribers<BlockSubscriber> blockSubscribers = Subscribers.create(true);

RecentBlocksFetchService(
final AsyncRunner asyncRunner,
final PendingPool<SignedBeaconBlock> pendingBlockPool,
final PendingPool<ValidatableAttestation> pendingAttestationsPool,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final ForwardSync forwardSync,
final FetchTaskFactory fetchTaskFactory,
final int maxConcurrentRequests) {
super(asyncRunner, maxConcurrentRequests);
this.forwardSync = forwardSync;
this.pendingBlockPool = pendingBlockPool;
this.pendingAttestationsPool = pendingAttestationsPool;
this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool;
this.fetchTaskFactory = fetchTaskFactory;
}

public static RecentBlocksFetchService create(
final AsyncRunner asyncRunner,
final PendingPool<SignedBeaconBlock> pendingBlocksPool,
final PendingPool<ValidatableAttestation> pendingAttestations,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final ForwardSync forwardSync,
final FetchTaskFactory fetchTaskFactory) {
return new RecentBlocksFetchService(
asyncRunner,
pendingBlocksPool,
pendingAttestations,
blockBlobSidecarsTrackersPool,
forwardSync,
fetchTaskFactory,
Expand Down Expand Up @@ -137,6 +143,8 @@ public void onBlockImported(final SignedBeaconBlock block, final boolean executi
}

private void setupSubscribers() {
pendingAttestationsPool.subscribeRequiredBlockRoot(this::requestRecentBlock);
pendingAttestationsPool.subscribeRequiredBlockRootDropped(this::cancelRecentBlockRequest);
pendingBlockPool.subscribeRequiredBlockRoot(this::requestRecentBlock);
pendingBlockPool.subscribeRequiredBlockRootDropped(this::cancelRecentBlockRequest);
blockBlobSidecarsTrackersPool.subscribeRequiredBlockRoot(this::requestRecentBlock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.StubAsyncRunner;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.util.DataStructureUtil;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
Expand All @@ -52,6 +53,10 @@ public class RecentBlocksFetchServiceTest {
@SuppressWarnings("unchecked")
private final PendingPool<SignedBeaconBlock> pendingBlockPool = mock(PendingPool.class);

@SuppressWarnings("unchecked")
private final PendingPool<ValidatableAttestation> pendingAttestationsPool =
mock(PendingPool.class);

private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool =
mock(BlockBlobSidecarsTrackersPool.class);

Expand All @@ -74,6 +79,7 @@ public void setup() {
new RecentBlocksFetchService(
asyncRunner,
pendingBlockPool,
pendingAttestationsPool,
blockBlobSidecarsTrackersPool,
forwardSync,
fetchTaskFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import tech.pegasys.teku.networking.p2p.peer.Peer;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel;
import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannelStub;
Expand Down Expand Up @@ -140,6 +141,8 @@ public static SyncingNodeManager create(
final PoolFactory poolFactory = new PoolFactory(new NoOpMetricsSystem());
final PendingPool<SignedBeaconBlock> pendingBlocks =
poolFactory.createPendingPoolForBlocks(spec);
final PendingPool<ValidatableAttestation> pendingAttestations =
poolFactory.createPendingPoolForAttestations(spec);
final FutureItems<SignedBeaconBlock> futureBlocks =
FutureItems.create(SignedBeaconBlock::getSlot, mock(SettableLabelledGauge.class), "blocks");
final Map<Bytes32, BlockImportResult> invalidBlockRoots = LimitedMap.createSynchronizedLRU(500);
Expand Down Expand Up @@ -206,6 +209,7 @@ public static SyncingNodeManager create(
RecentBlocksFetchService.create(
asyncRunner,
pendingBlocks,
pendingAttestations,
BlockBlobSidecarsTrackersPool.NOOP,
syncService,
fetchBlockTaskFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ public class BeaconChainController extends Service implements BeaconChainControl
protected volatile WeakSubjectivityValidator weakSubjectivityValidator;
protected volatile PerformanceTracker performanceTracker;
protected volatile PendingPool<SignedBeaconBlock> pendingBlocks;
protected volatile PendingPool<ValidatableAttestation> pendingAttestations;
protected volatile BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;
protected volatile Map<Bytes32, BlockImportResult> invalidBlockRoots;
protected volatile CoalescingChainHeadChannel coalescingChainHeadChannel;
Expand Down Expand Up @@ -1049,8 +1050,7 @@ protected void initSignatureVerificationService() {
}

protected void initAttestationManager() {
final PendingPool<ValidatableAttestation> pendingAttestations =
poolFactory.createPendingPoolForAttestations(spec);
pendingAttestations = poolFactory.createPendingPoolForAttestations(spec);
final FutureItems<ValidatableAttestation> futureAttestations =
FutureItems.create(
ValidatableAttestation::getEarliestSlotForForkChoiceProcessing,
Expand Down Expand Up @@ -1292,6 +1292,7 @@ protected SyncServiceFactory createSyncServiceFactory() {
blockImporter,
blobSidecarManager,
pendingBlocks,
pendingAttestations,
blockBlobSidecarsTrackersPool,
beaconConfig.eth2NetworkConfig().getStartupTargetPeerCount(),
signatureVerificationService,
Expand Down
Loading