Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
tbenr authored Dec 19, 2024
1 parent 4dbdd08 commit 7551460
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.networking.eth2.Eth2P2PNetwork;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.logic.common.util.AsyncBLSSignatureVerifier;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class DefaultSyncServiceFactory implements SyncServiceFactory {
private final BlockImporter blockImporter;
private final BlobSidecarManager blobSidecarManager;
private final PendingPool<SignedBeaconBlock> pendingBlocks;
private final PendingPool<ValidatableAttestation> pendingAttestations;
private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;
private final int getStartupTargetPeerCount;
private final AsyncBLSSignatureVerifier signatureVerifier;
Expand All @@ -87,6 +89,7 @@ public DefaultSyncServiceFactory(
final BlockImporter blockImporter,
final BlobSidecarManager blobSidecarManager,
final PendingPool<SignedBeaconBlock> pendingBlocks,
final PendingPool<ValidatableAttestation> pendingAttestations,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final int getStartupTargetPeerCount,
final SignatureVerificationService signatureVerifier,
Expand All @@ -105,6 +108,7 @@ public DefaultSyncServiceFactory(
this.blockImporter = blockImporter;
this.blobSidecarManager = blobSidecarManager;
this.pendingBlocks = pendingBlocks;
this.pendingAttestations = pendingAttestations;
this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool;
this.getStartupTargetPeerCount = getStartupTargetPeerCount;
this.signatureVerifier = signatureVerifier;
Expand All @@ -126,6 +130,7 @@ public SyncService create(final EventChannels eventChannels) {
RecentBlocksFetchService.create(
asyncRunner,
pendingBlocks,
pendingAttestations,
blockBlobSidecarsTrackersPool,
forwardSyncService,
fetchTaskFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.subscribers.Subscribers;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.util.PendingPool;
Expand All @@ -37,33 +38,38 @@ public class RecentBlocksFetchService

private final ForwardSync forwardSync;
private final PendingPool<SignedBeaconBlock> pendingBlockPool;
private final PendingPool<ValidatableAttestation> pendingAttestationsPool;
private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;
private final FetchTaskFactory fetchTaskFactory;
private final Subscribers<BlockSubscriber> blockSubscribers = Subscribers.create(true);

RecentBlocksFetchService(
final AsyncRunner asyncRunner,
final PendingPool<SignedBeaconBlock> pendingBlockPool,
final PendingPool<ValidatableAttestation> pendingAttestationsPool,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final ForwardSync forwardSync,
final FetchTaskFactory fetchTaskFactory,
final int maxConcurrentRequests) {
super(asyncRunner, maxConcurrentRequests);
this.forwardSync = forwardSync;
this.pendingBlockPool = pendingBlockPool;
this.pendingAttestationsPool = pendingAttestationsPool;
this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool;
this.fetchTaskFactory = fetchTaskFactory;
}

public static RecentBlocksFetchService create(
final AsyncRunner asyncRunner,
final PendingPool<SignedBeaconBlock> pendingBlocksPool,
final PendingPool<ValidatableAttestation> pendingAttestations,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final ForwardSync forwardSync,
final FetchTaskFactory fetchTaskFactory) {
return new RecentBlocksFetchService(
asyncRunner,
pendingBlocksPool,
pendingAttestations,
blockBlobSidecarsTrackersPool,
forwardSync,
fetchTaskFactory,
Expand Down Expand Up @@ -137,6 +143,8 @@ public void onBlockImported(final SignedBeaconBlock block, final boolean executi
}

private void setupSubscribers() {
pendingAttestationsPool.subscribeRequiredBlockRoot(this::requestRecentBlock);
pendingAttestationsPool.subscribeRequiredBlockRootDropped(this::cancelRecentBlockRequest);
pendingBlockPool.subscribeRequiredBlockRoot(this::requestRecentBlock);
pendingBlockPool.subscribeRequiredBlockRootDropped(this::cancelRecentBlockRequest);
blockBlobSidecarsTrackersPool.subscribeRequiredBlockRoot(this::requestRecentBlock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.StubAsyncRunner;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.util.DataStructureUtil;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
Expand All @@ -52,6 +53,10 @@ public class RecentBlocksFetchServiceTest {
@SuppressWarnings("unchecked")
private final PendingPool<SignedBeaconBlock> pendingBlockPool = mock(PendingPool.class);

@SuppressWarnings("unchecked")
private final PendingPool<ValidatableAttestation> pendingAttestationsPool =
mock(PendingPool.class);

private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool =
mock(BlockBlobSidecarsTrackersPool.class);

Expand All @@ -74,6 +79,7 @@ public void setup() {
new RecentBlocksFetchService(
asyncRunner,
pendingBlockPool,
pendingAttestationsPool,
blockBlobSidecarsTrackersPool,
forwardSync,
fetchTaskFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import tech.pegasys.teku.networking.p2p.peer.Peer;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel;
import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannelStub;
Expand Down Expand Up @@ -140,6 +141,8 @@ public static SyncingNodeManager create(
final PoolFactory poolFactory = new PoolFactory(new NoOpMetricsSystem());
final PendingPool<SignedBeaconBlock> pendingBlocks =
poolFactory.createPendingPoolForBlocks(spec);
final PendingPool<ValidatableAttestation> pendingAttestations =
poolFactory.createPendingPoolForAttestations(spec);
final FutureItems<SignedBeaconBlock> futureBlocks =
FutureItems.create(SignedBeaconBlock::getSlot, mock(SettableLabelledGauge.class), "blocks");
final Map<Bytes32, BlockImportResult> invalidBlockRoots = LimitedMap.createSynchronizedLRU(500);
Expand Down Expand Up @@ -206,6 +209,7 @@ public static SyncingNodeManager create(
RecentBlocksFetchService.create(
asyncRunner,
pendingBlocks,
pendingAttestations,
BlockBlobSidecarsTrackersPool.NOOP,
syncService,
fetchBlockTaskFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ public class BeaconChainController extends Service implements BeaconChainControl
protected volatile WeakSubjectivityValidator weakSubjectivityValidator;
protected volatile PerformanceTracker performanceTracker;
protected volatile PendingPool<SignedBeaconBlock> pendingBlocks;
protected volatile PendingPool<ValidatableAttestation> pendingAttestations;
protected volatile BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;
protected volatile Map<Bytes32, BlockImportResult> invalidBlockRoots;
protected volatile CoalescingChainHeadChannel coalescingChainHeadChannel;
Expand Down Expand Up @@ -1049,8 +1050,7 @@ protected void initSignatureVerificationService() {
}

protected void initAttestationManager() {
final PendingPool<ValidatableAttestation> pendingAttestations =
poolFactory.createPendingPoolForAttestations(spec);
pendingAttestations = poolFactory.createPendingPoolForAttestations(spec);
final FutureItems<ValidatableAttestation> futureAttestations =
FutureItems.create(
ValidatableAttestation::getEarliestSlotForForkChoiceProcessing,
Expand Down Expand Up @@ -1292,6 +1292,7 @@ protected SyncServiceFactory createSyncServiceFactory() {
blockImporter,
blobSidecarManager,
pendingBlocks,
pendingAttestations,
blockBlobSidecarsTrackersPool,
beaconConfig.eth2NetworkConfig().getStartupTargetPeerCount(),
signatureVerificationService,
Expand Down

0 comments on commit 7551460

Please sign in to comment.