diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java index 6379bfd4308..0c77cad2dab 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java @@ -35,6 +35,7 @@ import tech.pegasys.teku.infrastructure.time.TimeProvider; import tech.pegasys.teku.networking.eth2.Eth2P2PNetwork; import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.logic.common.util.AsyncBLSSignatureVerifier; import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager; @@ -67,6 +68,7 @@ public class DefaultSyncServiceFactory implements SyncServiceFactory { private final BlockImporter blockImporter; private final BlobSidecarManager blobSidecarManager; private final PendingPool pendingBlocks; + private final PendingPool pendingAttestations; private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool; private final int getStartupTargetPeerCount; private final AsyncBLSSignatureVerifier signatureVerifier; @@ -87,6 +89,7 @@ public DefaultSyncServiceFactory( final BlockImporter blockImporter, final BlobSidecarManager blobSidecarManager, final PendingPool pendingBlocks, + final PendingPool pendingAttestations, final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool, final int getStartupTargetPeerCount, final SignatureVerificationService signatureVerifier, @@ -105,6 +108,7 @@ public DefaultSyncServiceFactory( this.blockImporter = blockImporter; this.blobSidecarManager = blobSidecarManager; this.pendingBlocks = pendingBlocks; + this.pendingAttestations = pendingAttestations; this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool; this.getStartupTargetPeerCount = getStartupTargetPeerCount; this.signatureVerifier = signatureVerifier; @@ -126,6 +130,7 @@ public SyncService create(final EventChannels eventChannels) { RecentBlocksFetchService.create( asyncRunner, pendingBlocks, + pendingAttestations, blockBlobSidecarsTrackersPool, forwardSyncService, fetchTaskFactory); diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchService.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchService.java index 1fe3354dc28..bc3359dfa86 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchService.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchService.java @@ -23,6 +23,7 @@ import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.subscribers.Subscribers; +import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool; import tech.pegasys.teku.statetransition.util.PendingPool; @@ -37,6 +38,7 @@ public class RecentBlocksFetchService private final ForwardSync forwardSync; private final PendingPool pendingBlockPool; + private final PendingPool pendingAttestationsPool; private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool; private final FetchTaskFactory fetchTaskFactory; private final Subscribers blockSubscribers = Subscribers.create(true); @@ -44,6 +46,7 @@ public class RecentBlocksFetchService RecentBlocksFetchService( final AsyncRunner asyncRunner, final PendingPool pendingBlockPool, + final PendingPool pendingAttestationsPool, final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool, final ForwardSync forwardSync, final FetchTaskFactory fetchTaskFactory, @@ -51,6 +54,7 @@ public class RecentBlocksFetchService super(asyncRunner, maxConcurrentRequests); this.forwardSync = forwardSync; this.pendingBlockPool = pendingBlockPool; + this.pendingAttestationsPool = pendingAttestationsPool; this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool; this.fetchTaskFactory = fetchTaskFactory; } @@ -58,12 +62,14 @@ public class RecentBlocksFetchService public static RecentBlocksFetchService create( final AsyncRunner asyncRunner, final PendingPool pendingBlocksPool, + final PendingPool pendingAttestations, final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool, final ForwardSync forwardSync, final FetchTaskFactory fetchTaskFactory) { return new RecentBlocksFetchService( asyncRunner, pendingBlocksPool, + pendingAttestations, blockBlobSidecarsTrackersPool, forwardSync, fetchTaskFactory, @@ -137,6 +143,8 @@ public void onBlockImported(final SignedBeaconBlock block, final boolean executi } private void setupSubscribers() { + pendingAttestationsPool.subscribeRequiredBlockRoot(this::requestRecentBlock); + pendingAttestationsPool.subscribeRequiredBlockRootDropped(this::cancelRecentBlockRequest); pendingBlockPool.subscribeRequiredBlockRoot(this::requestRecentBlock); pendingBlockPool.subscribeRequiredBlockRootDropped(this::cancelRecentBlockRequest); blockBlobSidecarsTrackersPool.subscribeRequiredBlockRoot(this::requestRecentBlock); diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchServiceTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchServiceTest.java index 06a67a98ce1..9acf89f0804 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchServiceTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchServiceTest.java @@ -39,6 +39,7 @@ import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.StubAsyncRunner; import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.util.DataStructureUtil; import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool; @@ -52,6 +53,10 @@ public class RecentBlocksFetchServiceTest { @SuppressWarnings("unchecked") private final PendingPool pendingBlockPool = mock(PendingPool.class); + @SuppressWarnings("unchecked") + private final PendingPool pendingAttestationsPool = + mock(PendingPool.class); + private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool = mock(BlockBlobSidecarsTrackersPool.class); @@ -74,6 +79,7 @@ public void setup() { new RecentBlocksFetchService( asyncRunner, pendingBlockPool, + pendingAttestationsPool, blockBlobSidecarsTrackersPool, forwardSync, fetchTaskFactory, diff --git a/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java b/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java index b94030b4214..5256e4a6e7a 100644 --- a/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java +++ b/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/SyncingNodeManager.java @@ -50,6 +50,7 @@ import tech.pegasys.teku.networking.p2p.peer.Peer; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel; import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannelStub; @@ -140,6 +141,8 @@ public static SyncingNodeManager create( final PoolFactory poolFactory = new PoolFactory(new NoOpMetricsSystem()); final PendingPool pendingBlocks = poolFactory.createPendingPoolForBlocks(spec); + final PendingPool pendingAttestations = + poolFactory.createPendingPoolForAttestations(spec); final FutureItems futureBlocks = FutureItems.create(SignedBeaconBlock::getSlot, mock(SettableLabelledGauge.class), "blocks"); final Map invalidBlockRoots = LimitedMap.createSynchronizedLRU(500); @@ -206,6 +209,7 @@ public static SyncingNodeManager create( RecentBlocksFetchService.create( asyncRunner, pendingBlocks, + pendingAttestations, BlockBlobSidecarsTrackersPool.NOOP, syncService, fetchBlockTaskFactory); diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index 5fc563a9f4b..90508b0303b 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -268,6 +268,7 @@ public class BeaconChainController extends Service implements BeaconChainControl protected volatile WeakSubjectivityValidator weakSubjectivityValidator; protected volatile PerformanceTracker performanceTracker; protected volatile PendingPool pendingBlocks; + protected volatile PendingPool pendingAttestations; protected volatile BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool; protected volatile Map invalidBlockRoots; protected volatile CoalescingChainHeadChannel coalescingChainHeadChannel; @@ -1049,8 +1050,7 @@ protected void initSignatureVerificationService() { } protected void initAttestationManager() { - final PendingPool pendingAttestations = - poolFactory.createPendingPoolForAttestations(spec); + pendingAttestations = poolFactory.createPendingPoolForAttestations(spec); final FutureItems futureAttestations = FutureItems.create( ValidatableAttestation::getEarliestSlotForForkChoiceProcessing, @@ -1292,6 +1292,7 @@ protected SyncServiceFactory createSyncServiceFactory() { blockImporter, blobSidecarManager, pendingBlocks, + pendingAttestations, blockBlobSidecarsTrackersPool, beaconConfig.eth2NetworkConfig().getStartupTargetPeerCount(), signatureVerificationService,