From c3153056f40183622cb066576f5cacafb46689dc Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Wed, 15 Jan 2025 21:00:53 +0000 Subject: [PATCH] Changes in forward sync and rate limit constants (#8982) --- CHANGELOG.md | 4 +- .../sync/DefaultSyncServiceFactory.java | 1 + .../pegasys/teku/beacon/sync/SyncConfig.java | 31 ++++++++++-- .../multipeer/MultipeerSyncService.java | 4 +- .../multipeer/chains/SyncSourceFactory.java | 23 +++++---- .../sync/forward/singlepeer/PeerSyncTest.java | 4 +- .../eth2/Eth2P2PNetworkBuilder.java | 3 +- .../teku/networking/eth2/P2PConfig.java | 50 +++++++++++++------ .../eth2/peers/Eth2PeerFactory.java | 16 +++--- .../eth2/peers/Eth2PeerManager.java | 6 ++- .../eth2/peers/RateTrackerImpl.java | 27 ++++++---- .../eth2/Eth2P2PNetworkFactory.java | 5 +- .../pegasys/teku/cli/options/P2POptions.java | 42 ++++++++++++---- .../teku/cli/BeaconNodeCommandTest.java | 15 ++++-- .../options/Eth2P2PNetworkOptionsTest.java | 14 ++++-- .../teku/cli/options/P2POptionsTest.java | 17 +++++-- .../src/test/resources/P2POptions_config.yaml | 6 ++- 17 files changed, 191 insertions(+), 77 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f86e3390b3e..b7799779c3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,9 @@ ## Unreleased Changes ### Breaking Changes -- `--Xvalidators-builder-registration-default-gas-limit` is removed in favour of `--validators-builder-registration-default-gas-limit` +- `--Xvalidators-builder-registration-default-gas-limit` CLI option is replaced by `--validators-builder-registration-default-gas-limit` +- `--Xp2p-sync-rate-limit` CLI option is removed in favour of `--Xp2p-sync-blocks-rate-limit` and `--Xp2p-sync-blob-sidecars-rate-limit` +- `--Xpeer-rate-limit` CLI options is removed in favour of `--Xpeer-blocks-rate-limit` and `--Xpeer-blob-sidecars-rate-limit` - With the upgrade of the Prometheus Java Metrics library, there are the following changes: - Gauge names are not allowed to end with `total`, therefore metrics as `beacon_proposers_data_total` and `beacon_eth1_current_period_votes_total` are dropping the `_total` suffix - The `_created` timestamps are not returned by default. diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java index 0c77cad2dab..564d97e2f6e 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java @@ -196,6 +196,7 @@ protected ForwardSyncService createForwardSyncService() { syncConfig.getForwardSyncBatchSize(), syncConfig.getForwardSyncMaxPendingBatches(), syncConfig.getForwardSyncMaxBlocksPerMinute(), + syncConfig.getForwardSyncMaxBlobSidecarsPerMinute(), spec); } else { LOG.info("Using single peer sync"); diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/SyncConfig.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/SyncConfig.java index bc06318aeee..051baf9d734 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/SyncConfig.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/SyncConfig.java @@ -15,16 +15,24 @@ import static com.google.common.base.Preconditions.checkNotNull; +import tech.pegasys.teku.networking.eth2.P2PConfig; + public class SyncConfig { public static final boolean DEFAULT_MULTI_PEER_SYNC_ENABLED = true; public static final boolean DEFAULT_RECONSTRUCT_HISTORIC_STATES_ENABLED = false; public static final boolean DEFAULT_FETCH_ALL_HISTORIC_BLOCKS = true; - public static final int DEFAULT_FORWARD_SYNC_BATCH_SIZE = 50; + public static final int DEFAULT_HISTORICAL_SYNC_BATCH_SIZE = 50; + public static final int DEFAULT_FORWARD_SYNC_BATCH_SIZE = 25; public static final int DEFAULT_FORWARD_SYNC_MAX_PENDING_BATCHES = 5; + + /** Aligned with {@link P2PConfig#DEFAULT_PEER_BLOCKS_RATE_LIMIT} */ public static final int DEFAULT_FORWARD_SYNC_MAX_BLOCKS_PER_MINUTE = 500; + /** Aligned with {@link P2PConfig#DEFAULT_PEER_BLOB_SIDECARS_RATE_LIMIT} */ + public static final int DEFAULT_FORWARD_SYNC_MAX_BLOB_SIDECARS_PER_MINUTE = 2000; + private final boolean isEnabled; private final boolean isMultiPeerSyncEnabled; private final boolean reconstructHistoricStatesEnabled; @@ -33,6 +41,7 @@ public class SyncConfig { private final int forwardSyncBatchSize; private final int forwardSyncMaxPendingBatches; private final int forwardSyncMaxBlocksPerMinute; + private final int forwardSyncMaxBlobSidecarsPerMinute; private SyncConfig( final boolean isEnabled, @@ -42,7 +51,8 @@ private SyncConfig( final int historicalSyncBatchSize, final int forwardSyncBatchSize, final int forwardSyncMaxPendingBatches, - final int forwardSyncMaxBlocksPerMinute) { + final int forwardSyncMaxBlocksPerMinute, + final int forwardSyncMaxBlobSidecarsPerMinute) { this.isEnabled = isEnabled; this.isMultiPeerSyncEnabled = isMultiPeerSyncEnabled; this.reconstructHistoricStatesEnabled = reconstructHistoricStatesEnabled; @@ -51,6 +61,7 @@ private SyncConfig( this.forwardSyncBatchSize = forwardSyncBatchSize; this.forwardSyncMaxPendingBatches = forwardSyncMaxPendingBatches; this.forwardSyncMaxBlocksPerMinute = forwardSyncMaxBlocksPerMinute; + this.forwardSyncMaxBlobSidecarsPerMinute = forwardSyncMaxBlobSidecarsPerMinute; } public static Builder builder() { @@ -89,6 +100,10 @@ public int getForwardSyncMaxBlocksPerMinute() { return forwardSyncMaxBlocksPerMinute; } + public int getForwardSyncMaxBlobSidecarsPerMinute() { + return forwardSyncMaxBlobSidecarsPerMinute; + } + public static class Builder { private Boolean isEnabled; private Boolean isMultiPeerSyncEnabled = DEFAULT_MULTI_PEER_SYNC_ENABLED; @@ -98,6 +113,8 @@ public static class Builder { private Integer forwardSyncBatchSize = DEFAULT_FORWARD_SYNC_BATCH_SIZE; private Integer forwardSyncMaxPendingBatches = DEFAULT_FORWARD_SYNC_MAX_PENDING_BATCHES; private Integer forwardSyncMaxBlocksPerMinute = DEFAULT_FORWARD_SYNC_MAX_BLOCKS_PER_MINUTE; + private Integer forwardSyncMaxBlobSidecarsPerMinute = + DEFAULT_FORWARD_SYNC_MAX_BLOB_SIDECARS_PER_MINUTE; private Builder() {} @@ -111,7 +128,8 @@ public SyncConfig build() { historicalSyncBatchSize, forwardSyncBatchSize, forwardSyncMaxPendingBatches, - forwardSyncMaxBlocksPerMinute); + forwardSyncMaxBlocksPerMinute, + forwardSyncMaxBlobSidecarsPerMinute); } private void initMissingDefaults() { @@ -163,6 +181,13 @@ public Builder forwardSyncMaxBlocksPerMinute(final Integer forwardSyncMaxBlocksP return this; } + public Builder forwardSyncMaxBlobSidecarsPerMinute( + final Integer forwardSyncMaxBlobSidecarsPerMinute) { + checkNotNull(forwardSyncMaxBlobSidecarsPerMinute); + this.forwardSyncMaxBlobSidecarsPerMinute = forwardSyncMaxBlobSidecarsPerMinute; + return this; + } + public Builder reconstructHistoricStatesEnabled( final Boolean reconstructHistoricStatesEnabled) { checkNotNull(reconstructHistoricStatesEnabled); diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java index 5d38e95c8c3..6082e13cfa2 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java @@ -76,6 +76,7 @@ public static MultipeerSyncService create( final int batchSize, final int maxPendingBatches, final int maxBlocksPerMinute, + final int maxBlobSidecarsPerMinute, final Spec spec) { final EventThread eventThread = new AsyncRunnerEventThread("sync", asyncRunnerFactory); final SettableLabelledGauge targetChainCountGauge = @@ -117,7 +118,8 @@ eventThread, blobSidecarManager, new PeerScoringConflictResolutionStrategy()), recentChainData.getSpec(), eventThread, p2pNetwork, - new SyncSourceFactory(asyncRunner, timeProvider, maxBlocksPerMinute, batchSize), + new SyncSourceFactory( + asyncRunner, timeProvider, batchSize, maxBlocksPerMinute, maxBlobSidecarsPerMinute), finalizedTargetChains, nonfinalizedTargetChains); peerChainTracker.subscribeToTargetChainUpdates(syncController::onTargetChainsUpdated); diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/SyncSourceFactory.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/SyncSourceFactory.java index a04d7e8c983..ddee2e030fb 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/SyncSourceFactory.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/SyncSourceFactory.java @@ -26,28 +26,33 @@ public class SyncSourceFactory { private final AsyncRunner asyncRunner; private final TimeProvider timeProvider; - private final Map syncSourcesByPeer = new HashMap<>(); - private final int maxBlocksPerMinute; private final int batchSize; + private final int maxBlocksPerMinute; + private final int maxBlobSidecarsPerMinute; + + private final Map syncSourcesByPeer = new HashMap<>(); public SyncSourceFactory( final AsyncRunner asyncRunner, final TimeProvider timeProvider, + final int batchSize, final int maxBlocksPerMinute, - final int batchSize) { + final int maxBlobSidecarsPerMinute) { this.asyncRunner = asyncRunner; this.timeProvider = timeProvider; - this.maxBlocksPerMinute = maxBlocksPerMinute; this.batchSize = batchSize; + this.maxBlocksPerMinute = maxBlocksPerMinute; + this.maxBlobSidecarsPerMinute = maxBlobSidecarsPerMinute; } public SyncSource getOrCreateSyncSource(final Eth2Peer peer, final Spec spec) { - // Limit request rate to just a little under what we'd accept + // Limit request rates for blocks/blobs to just a little under what we'd accept (see + // Eth2PeerFactory) final int maxBlocksPerMinute = this.maxBlocksPerMinute - batchSize - 1; final Optional maybeMaxBlobsPerBlock = spec.getMaxBlobsPerBlockForHighestMilestone(); - final Optional maxBlobSidecarsPerMinute = - maybeMaxBlobsPerBlock.map(maxBlobsPerBlock -> maxBlocksPerMinute * maxBlobsPerBlock); - + final Optional maybeMaxBlobSidecarsPerMinute = + maybeMaxBlobsPerBlock.map( + maxBlobsPerBlock -> this.maxBlobSidecarsPerMinute - (batchSize * maxBlobsPerBlock) - 1); return syncSourcesByPeer.computeIfAbsent( peer, source -> @@ -57,7 +62,7 @@ public SyncSource getOrCreateSyncSource(final Eth2Peer peer, final Spec spec) { source, maxBlocksPerMinute, maybeMaxBlobsPerBlock, - maxBlobSidecarsPerMinute)); + maybeMaxBlobSidecarsPerMinute)); } public void onPeerDisconnected(final Eth2Peer peer) { diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/PeerSyncTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/PeerSyncTest.java index 7a7f5eec5d8..505cd195228 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/PeerSyncTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/singlepeer/PeerSyncTest.java @@ -57,7 +57,7 @@ public class PeerSyncTest extends AbstractSyncTest { UInt64.valueOf(SyncConfig.DEFAULT_FORWARD_SYNC_BATCH_SIZE); private static final Bytes32 PEER_HEAD_BLOCK_ROOT = Bytes32.fromHexString("0x1234"); - private static final UInt64 PEER_HEAD_SLOT = UInt64.valueOf(30); + private static final UInt64 PEER_HEAD_SLOT = UInt64.valueOf(20); private static final UInt64 PEER_FINALIZED_EPOCH = UInt64.valueOf(3); private final int slotsPerEpoch = spec.getGenesisSpecConfig().getSlotsPerEpoch(); @@ -73,7 +73,7 @@ public class PeerSyncTest extends AbstractSyncTest { PEER_HEAD_BLOCK_ROOT, PEER_HEAD_SLOT)); - private final UInt64 denebPeerSlotsAhead = UInt64.valueOf(30); + private final UInt64 denebPeerSlotsAhead = UInt64.valueOf(20); private final UInt64 denebPeerHeadSlot = denebFirstSlot.plus(denebPeerSlotsAhead); private final UInt64 denebPeerFinalizedEpoch = spec.computeEpochAtSlot(denebPeerHeadSlot); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java index bd470eb3ffb..9467a22d4a3 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java @@ -162,7 +162,8 @@ public Eth2P2PNetwork build() { eth2RpcOutstandingPingThreshold, eth2StatusUpdateInterval, timeProvider, - config.getPeerRateLimit(), + config.getPeerBlocksRateLimit(), + config.getPeerBlobSidecarsRateLimit(), config.getPeerRequestLimit(), spec, kzg, diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/P2PConfig.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/P2PConfig.java index 74d43c7c50a..792ad6b8f49 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/P2PConfig.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/P2PConfig.java @@ -31,10 +31,13 @@ public class P2PConfig { - public static final int DEFAULT_PEER_RATE_LIMIT = 500; + public static final int DEFAULT_PEER_BLOCKS_RATE_LIMIT = 500; + // 250 MB per peer per minute (~ 4.16 MB/s) + public static final int DEFAULT_PEER_BLOB_SIDECARS_RATE_LIMIT = 2000; + + public static final int DEFAULT_PEER_REQUEST_LIMIT = 100; public static final boolean DEFAULT_PEER_ALL_TOPIC_FILTER_ENABLED = true; - public static final int DEFAULT_PEER_REQUEST_LIMIT = 50; public static final int DEFAULT_P2P_TARGET_SUBNET_SUBSCRIBER_COUNT = 2; public static final boolean DEFAULT_SUBSCRIBE_ALL_SUBNETS_ENABLED = false; public static final boolean DEFAULT_GOSSIP_SCORING_ENABLED = true; @@ -54,7 +57,8 @@ public class P2PConfig { private final GossipEncoding gossipEncoding; private final int targetSubnetSubscriberCount; private final boolean subscribeAllSubnetsEnabled; - private final int peerRateLimit; + private final int peerBlocksRateLimit; + private final int peerBlobSidecarsRateLimit; private final int peerRequestLimit; private final int batchVerifyMaxThreads; private final int batchVerifyQueueCapacity; @@ -71,7 +75,8 @@ private P2PConfig( final GossipEncoding gossipEncoding, final int targetSubnetSubscriberCount, final boolean subscribeAllSubnetsEnabled, - final int peerRateLimit, + final int peerBlocksRateLimit, + final int peerBlobSidecarsRateLimit, final int peerRequestLimit, final int batchVerifyMaxThreads, final int batchVerifyQueueCapacity, @@ -86,7 +91,8 @@ private P2PConfig( this.gossipEncoding = gossipEncoding; this.targetSubnetSubscriberCount = targetSubnetSubscriberCount; this.subscribeAllSubnetsEnabled = subscribeAllSubnetsEnabled; - this.peerRateLimit = peerRateLimit; + this.peerBlocksRateLimit = peerBlocksRateLimit; + this.peerBlobSidecarsRateLimit = peerBlobSidecarsRateLimit; this.peerRequestLimit = peerRequestLimit; this.batchVerifyMaxThreads = batchVerifyMaxThreads; this.batchVerifyQueueCapacity = batchVerifyQueueCapacity; @@ -129,8 +135,12 @@ public boolean isSubscribeAllSubnetsEnabled() { return subscribeAllSubnetsEnabled; } - public int getPeerRateLimit() { - return peerRateLimit; + public int getPeerBlocksRateLimit() { + return peerBlocksRateLimit; + } + + public int getPeerBlobSidecarsRateLimit() { + return peerBlobSidecarsRateLimit; } public int getPeerRequestLimit() { @@ -174,7 +184,8 @@ public static class Builder { private final GossipEncoding gossipEncoding = GossipEncoding.SSZ_SNAPPY; private Integer targetSubnetSubscriberCount = DEFAULT_P2P_TARGET_SUBNET_SUBSCRIBER_COUNT; private Boolean subscribeAllSubnetsEnabled = DEFAULT_SUBSCRIBE_ALL_SUBNETS_ENABLED; - private Integer peerRateLimit = DEFAULT_PEER_RATE_LIMIT; + private Integer peerBlocksRateLimit = DEFAULT_PEER_BLOCKS_RATE_LIMIT; + private Integer peerBlobSidecarsRateLimit = DEFAULT_PEER_BLOB_SIDECARS_RATE_LIMIT; private Integer peerRequestLimit = DEFAULT_PEER_REQUEST_LIMIT; private int batchVerifyMaxThreads = DEFAULT_BATCH_VERIFY_MAX_THREADS; private OptionalInt batchVerifyQueueCapacity = OptionalInt.empty(); @@ -225,7 +236,8 @@ public P2PConfig build() { gossipEncoding, targetSubnetSubscriberCount, subscribeAllSubnetsEnabled, - peerRateLimit, + peerBlocksRateLimit, + peerBlobSidecarsRateLimit, peerRequestLimit, batchVerifyMaxThreads, batchVerifyQueueCapacity.orElse(DEFAULT_BATCH_VERIFY_QUEUE_CAPACITY), @@ -277,13 +289,23 @@ public Builder subscribeAllSubnetsEnabled(final Boolean subscribeAllSubnetsEnabl return this; } - public Builder peerRateLimit(final Integer peerRateLimit) { - checkNotNull(peerRateLimit); - if (peerRateLimit < 0) { + public Builder peerBlocksRateLimit(final Integer peerBlocksRateLimit) { + checkNotNull(peerBlocksRateLimit); + if (peerBlocksRateLimit < 0) { + throw new InvalidConfigurationException( + String.format("Invalid peerBlocksRateLimit: %d", peerBlocksRateLimit)); + } + this.peerBlocksRateLimit = peerBlocksRateLimit; + return this; + } + + public Builder peerBlobSidecarsRateLimit(final Integer peerBlobSidecarsRateLimit) { + checkNotNull(peerBlobSidecarsRateLimit); + if (peerBlobSidecarsRateLimit < 0) { throw new InvalidConfigurationException( - String.format("Invalid peerRateLimit: %d", peerRateLimit)); + String.format("Invalid peerBlobSidecarsRateLimit: %d", peerBlobSidecarsRateLimit)); } - this.peerRateLimit = peerRateLimit; + this.peerBlobSidecarsRateLimit = peerBlobSidecarsRateLimit; return this; } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerFactory.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerFactory.java index 2122c93ff19..95dc0962180 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerFactory.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerFactory.java @@ -35,7 +35,8 @@ public class Eth2PeerFactory { private final CombinedChainDataClient chainDataClient; private final TimeProvider timeProvider; private final Optional requiredCheckpoint; - private final int peerRateLimit; + private final int peerBlocksRateLimit; + private final int peerBlobSidecarsRateLimit; private final int peerRequestLimit; private final KZG kzg; private final DiscoveryNodeIdExtractor discoveryNodeIdExtractor; @@ -48,7 +49,8 @@ public Eth2PeerFactory( final MetadataMessagesFactory metadataMessagesFactory, final TimeProvider timeProvider, final Optional requiredCheckpoint, - final int peerRateLimit, + final int peerBlocksRateLimit, + final int peerBlobSidecarsRateLimit, final int peerRequestLimit, final KZG kzg, final DiscoveryNodeIdExtractor discoveryNodeIdExtractor) { @@ -59,7 +61,8 @@ public Eth2PeerFactory( this.statusMessageFactory = statusMessageFactory; this.metadataMessagesFactory = metadataMessagesFactory; this.requiredCheckpoint = requiredCheckpoint; - this.peerRateLimit = peerRateLimit; + this.peerBlocksRateLimit = peerBlocksRateLimit; + this.peerBlobSidecarsRateLimit = peerBlobSidecarsRateLimit; this.peerRequestLimit = peerRequestLimit; this.kzg = kzg; this.discoveryNodeIdExtractor = discoveryNodeIdExtractor; @@ -74,11 +77,8 @@ public Eth2Peer create(final Peer peer, final BeaconChainMethods rpcMethods) { statusMessageFactory, metadataMessagesFactory, PeerChainValidator.create(spec, metricsSystem, chainDataClient, requiredCheckpoint), - RateTracker.create(peerRateLimit, TIME_OUT, timeProvider), - RateTracker.create( - peerRateLimit * spec.getMaxBlobsPerBlockForHighestMilestone().orElse(1), - TIME_OUT, - timeProvider), + RateTracker.create(peerBlocksRateLimit, TIME_OUT, timeProvider), + RateTracker.create(peerBlobSidecarsRateLimit, TIME_OUT, timeProvider), RateTracker.create(peerRequestLimit, TIME_OUT, timeProvider), kzg); } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java index 88e57f491a7..4ea63f4c17d 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java @@ -114,7 +114,8 @@ public static Eth2PeerManager create( final int eth2RpcOutstandingPingThreshold, final Duration eth2StatusUpdateInterval, final TimeProvider timeProvider, - final int peerRateLimit, + final int peerBlocksRateLimit, + final int peerBlobSidecarsRateLimit, final int peerRequestLimit, final Spec spec, final KZG kzg, @@ -140,7 +141,8 @@ public static Eth2PeerManager create( metadataMessagesFactory, timeProvider, requiredCheckpoint, - peerRateLimit, + peerBlocksRateLimit, + peerBlobSidecarsRateLimit, peerRequestLimit, kzg, discoveryNodeIdExtractor), diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/RateTrackerImpl.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/RateTrackerImpl.java index ac0dc87b244..cb7bbb94e76 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/RateTrackerImpl.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/RateTrackerImpl.java @@ -13,27 +13,32 @@ package tech.pegasys.teku.networking.eth2.peers; +import com.google.common.base.Preconditions; import java.util.NavigableMap; import java.util.Optional; import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicInteger; import tech.pegasys.teku.infrastructure.time.TimeProvider; import tech.pegasys.teku.infrastructure.unsigned.UInt64; public class RateTrackerImpl implements RateTracker { - private final NavigableMap requests; + + private final NavigableMap requests = new TreeMap<>(); + private final int peerRateLimit; - private final UInt64 timeoutSeconds; - private long objectsWithinWindow = 0L; + private final long timeoutSeconds; private final TimeProvider timeProvider; - private final AtomicInteger newRequestId = new AtomicInteger(0); + private long objectsWithinWindow = 0L; + private int newRequestId = 0; public RateTrackerImpl( final int peerRateLimit, final long timeoutSeconds, final TimeProvider timeProvider) { - this.timeoutSeconds = UInt64.valueOf(timeoutSeconds); - requests = new TreeMap<>(); + Preconditions.checkArgument( + peerRateLimit > 0, + "peerRateLimit should be a positive number but it was %s", + peerRateLimit); this.peerRateLimit = peerRateLimit; + this.timeoutSeconds = timeoutSeconds; this.timeProvider = timeProvider; } @@ -43,13 +48,13 @@ public RateTrackerImpl( public synchronized Optional approveObjectsRequest(final long objectsCount) { pruneRequests(); final UInt64 currentTime = timeProvider.getTimeInSeconds(); - if ((peerRateLimit - objectsWithinWindow) <= 0) { + if (peerRateLimit - objectsWithinWindow <= 0) { return Optional.empty(); } objectsWithinWindow += objectsCount; final RequestApproval requestApproval = new RequestApproval.RequestApprovalBuilder() - .requestId(newRequestId.getAndIncrement()) + .requestId(newRequestId++) .timeSeconds(currentTime) .objectsCount(objectsCount) .build(); @@ -61,8 +66,8 @@ public synchronized Optional approveObjectsRequest(final long o public synchronized void adjustObjectsRequest( final RequestApproval requestApproval, final long returnedObjectsCount) { pruneRequests(); - if (requests.containsKey(requestApproval.getRequestKey())) { - final long initialObjectsCount = requests.get(requestApproval.getRequestKey()); + final Long initialObjectsCount = requests.get(requestApproval.getRequestKey()); + if (initialObjectsCount != null) { requests.put(requestApproval.getRequestKey(), returnedObjectsCount); objectsWithinWindow = objectsWithinWindow - initialObjectsCount + returnedObjectsCount; } diff --git a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java index 104c074f441..e2d9ea4f4bd 100644 --- a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java +++ b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java @@ -225,8 +225,9 @@ protected Eth2P2PNetwork buildNetwork(final P2PConfig config) { eth2RpcOutstandingPingThreshold, eth2StatusUpdateInterval, timeProvider, - 500, - 50, + P2PConfig.DEFAULT_PEER_BLOCKS_RATE_LIMIT, + P2PConfig.DEFAULT_PEER_BLOB_SIDECARS_RATE_LIMIT, + P2PConfig.DEFAULT_PEER_REQUEST_LIMIT, spec, KZG.NOOP, (__) -> Optional.empty()); diff --git a/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java b/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java index 5065a63c5da..40fd299d68a 100644 --- a/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java +++ b/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java @@ -228,7 +228,7 @@ The network interface(s) on which the node listens for P2P communication. showDefaultValue = Visibility.ALWAYS, description = "Number of blocks/blobs being requested in a single batch to a single peer, while syncing historical data.\n" - + "NOTE: the actual size for blobs batches will be `maxBlobsPerBlock` times the value of this parameter.", + + "NOTE: the actual size for blobs being requested in a single batch will be up to `maxBlobsPerBlock` times the value of this parameter.", hidden = true, arity = "1") private Integer historicalSyncBatchSize = SyncConfig.DEFAULT_HISTORICAL_SYNC_BATCH_SIZE; @@ -239,7 +239,7 @@ The network interface(s) on which the node listens for P2P communication. showDefaultValue = Visibility.ALWAYS, description = "Number of blocks/blobs being requested in a single batch to a single peer, while syncing.\n" - + "NOTE: the actual size for blobs batches will be `maxBlobsPerBlock` times the value of this parameter.", + + "NOTE: the actual size for blobs being requested in a single batch will be up to `maxBlobsPerBlock` times the value of this parameter.", hidden = true, arity = "1") private Integer forwardSyncBatchSize = SyncConfig.DEFAULT_FORWARD_SYNC_BATCH_SIZE; @@ -255,13 +255,24 @@ The network interface(s) on which the node listens for P2P communication. SyncConfig.DEFAULT_FORWARD_SYNC_MAX_PENDING_BATCHES; @Option( - names = {"--Xp2p-sync-rate-limit"}, + names = {"--Xp2p-sync-blocks-rate-limit"}, paramLabel = "", showDefaultValue = Visibility.ALWAYS, - description = "Number of objects being requested per minute to a single peer, while syncing.", + description = "Number of blocks being requested per minute to a single peer, while syncing.", hidden = true, arity = "1") - private Integer forwardSyncRateLimit = SyncConfig.DEFAULT_FORWARD_SYNC_MAX_BLOCKS_PER_MINUTE; + private Integer forwardSyncBlocksRateLimit = + SyncConfig.DEFAULT_FORWARD_SYNC_MAX_BLOCKS_PER_MINUTE; + + @Option( + names = {"--Xp2p-sync-blob-sidecars-rate-limit"}, + paramLabel = "", + showDefaultValue = Visibility.ALWAYS, + description = "Number of blobs being requested per minute to a single peer, while syncing.", + hidden = true, + arity = "1") + private Integer forwardSyncBlobSidecarsRateLimit = + SyncConfig.DEFAULT_FORWARD_SYNC_MAX_BLOB_SIDECARS_PER_MINUTE; @Option( names = {"--p2p-subscribe-all-subnets-enabled"}, @@ -283,13 +294,22 @@ The network interface(s) on which the node listens for P2P communication. private boolean gossipScoringEnabled = P2PConfig.DEFAULT_GOSSIP_SCORING_ENABLED; @Option( - names = {"--Xpeer-rate-limit"}, + names = {"--Xpeer-blocks-rate-limit"}, + paramLabel = "", + description = + "The number of requested blocks per peer to allow per minute before disconnecting the peer.", + arity = "1", + hidden = true) + private Integer peerBlocksRateLimit = P2PConfig.DEFAULT_PEER_BLOCKS_RATE_LIMIT; + + @Option( + names = {"--Xpeer-blob-sidecars-rate-limit"}, paramLabel = "", description = - "The number of requested objects per peer to allow per minute before disconnecting the peer.", + "The number of requested blobs per peer to allow per minute before disconnecting the peer.", arity = "1", hidden = true) - private Integer peerRateLimit = P2PConfig.DEFAULT_PEER_RATE_LIMIT; + private Integer peerBlobSidecarsRateLimit = P2PConfig.DEFAULT_PEER_BLOB_SIDECARS_RATE_LIMIT; @Option( names = {"--Xp2p-gossip-blobs-after-block-enabled"}, @@ -418,7 +438,8 @@ public void configure(final TekuConfiguration.Builder builder) { .batchVerifyStrictThreadLimitEnabled(batchVerifyStrictThreadLimitEnabled) .targetSubnetSubscriberCount(p2pTargetSubnetSubscriberCount) .isGossipScoringEnabled(gossipScoringEnabled) - .peerRateLimit(peerRateLimit) + .peerBlocksRateLimit(peerBlocksRateLimit) + .peerBlobSidecarsRateLimit(peerBlobSidecarsRateLimit) .allTopicsFilterEnabled(allTopicsFilterEnabled) .peerRequestLimit(peerRequestLimit) .floodPublishMaxMessageSizeThreshold(floodPublishMaxMessageSizeThreshold) @@ -490,7 +511,8 @@ public void configure(final TekuConfiguration.Builder builder) { s -> s.isMultiPeerSyncEnabled(multiPeerSyncEnabled) .historicalSyncBatchSize(historicalSyncBatchSize) - .forwardSyncMaxBlocksPerMinute(forwardSyncRateLimit) + .forwardSyncMaxBlocksPerMinute(forwardSyncBlocksRateLimit) + .forwardSyncMaxBlobSidecarsPerMinute(forwardSyncBlobSidecarsRateLimit) .forwardSyncBatchSize(forwardSyncBatchSize) .forwardSyncMaxPendingBatches(forwardSyncMaxPendingBatches)); diff --git a/teku/src/test/java/tech/pegasys/teku/cli/BeaconNodeCommandTest.java b/teku/src/test/java/tech/pegasys/teku/cli/BeaconNodeCommandTest.java index f347901ab8e..c720b5ba468 100644 --- a/teku/src/test/java/tech/pegasys/teku/cli/BeaconNodeCommandTest.java +++ b/teku/src/test/java/tech/pegasys/teku/cli/BeaconNodeCommandTest.java @@ -556,10 +556,12 @@ private String[] createCliArgs() { "127.0.0.1", "--Xrest-api-max-url-length", "65535", - "--Xpeer-rate-limit", + "--Xpeer-blocks-rate-limit", "500", + "--Xpeer-blob-sidecars-rate-limit", + "2000", "--Xpeer-request-limit", - "50" + "100" }; } @@ -582,7 +584,7 @@ private TekuConfiguration.Builder expectedDefaultConfigurationBuilder() { .dataStorageMode(MINIMAL)) .metrics(b -> b.metricsCategories(DEFAULT_METRICS_CATEGORIES)) .restApi(b -> b.eth1DepositContractAddress(networkConfig.getEth1DepositContractAddress())) - .p2p(p -> p.peerRateLimit(500).peerRequestLimit(50)) + .p2p(p -> p.peerBlocksRateLimit(500).peerBlobSidecarsRateLimit(2000).peerRequestLimit(100)) .discovery( d -> d.isDiscoveryEnabled(true) @@ -623,7 +625,12 @@ private TekuConfiguration.Builder expectedConfigurationBuilder() { .dataStorageCreateDbVersion(DatabaseVersion.DEFAULT_VERSION) .maxKnownNodeCacheSize(100_000)) .data(b -> b.dataBasePath(dataPath)) - .p2p(b -> b.targetSubnetSubscriberCount(2).peerRateLimit(500).peerRequestLimit(50)) + .p2p( + b -> + b.targetSubnetSubscriberCount(2) + .peerBlocksRateLimit(500) + .peerBlobSidecarsRateLimit(2000) + .peerRequestLimit(100)) .discovery( d -> d.isDiscoveryEnabled(false) diff --git a/teku/src/test/java/tech/pegasys/teku/cli/options/Eth2P2PNetworkOptionsTest.java b/teku/src/test/java/tech/pegasys/teku/cli/options/Eth2P2PNetworkOptionsTest.java index 2bec62ab205..1ab771eaa56 100644 --- a/teku/src/test/java/tech/pegasys/teku/cli/options/Eth2P2PNetworkOptionsTest.java +++ b/teku/src/test/java/tech/pegasys/teku/cli/options/Eth2P2PNetworkOptionsTest.java @@ -128,11 +128,19 @@ public void usingNetworkFromUrl() { } @Test - public void setPeerRateLimit() { + public void setPeerBlocksRateLimit() { TekuConfiguration tekuConfiguration = - getTekuConfigurationFromArguments("--Xpeer-rate-limit", "10"); + getTekuConfigurationFromArguments("--Xpeer-blocks-rate-limit", "10"); final P2PConfig config = tekuConfiguration.beaconChain().p2pConfig(); - assertThat(config.getPeerRateLimit()).isEqualTo(10); + assertThat(config.getPeerBlocksRateLimit()).isEqualTo(10); + } + + @Test + public void setPeerBlobSidecarsRateLimit() { + TekuConfiguration tekuConfiguration = + getTekuConfigurationFromArguments("--Xpeer-blob-sidecars-rate-limit", "10"); + final P2PConfig config = tekuConfiguration.beaconChain().p2pConfig(); + assertThat(config.getPeerBlobSidecarsRateLimit()).isEqualTo(10); } @Test diff --git a/teku/src/test/java/tech/pegasys/teku/cli/options/P2POptionsTest.java b/teku/src/test/java/tech/pegasys/teku/cli/options/P2POptionsTest.java index 766dec5453c..288068c7674 100644 --- a/teku/src/test/java/tech/pegasys/teku/cli/options/P2POptionsTest.java +++ b/teku/src/test/java/tech/pegasys/teku/cli/options/P2POptionsTest.java @@ -43,7 +43,8 @@ public void shouldReadFromConfigurationFile() { final P2PConfig p2pConfig = tekuConfig.p2p(); assertThat(p2pConfig.getTargetSubnetSubscriberCount()).isEqualTo(5); - assertThat(p2pConfig.getPeerRateLimit()).isEqualTo(100); + assertThat(p2pConfig.getPeerBlocksRateLimit()).isEqualTo(100); + assertThat(p2pConfig.getPeerBlobSidecarsRateLimit()).isEqualTo(400); assertThat(p2pConfig.getPeerRequestLimit()).isEqualTo(101); final DiscoveryConfig discoConfig = tekuConfig.discovery(); @@ -66,7 +67,8 @@ public void shouldReadFromConfigurationFile() { assertThat(syncConfig.getHistoricalSyncBatchSize()).isEqualTo(102); assertThat(syncConfig.getForwardSyncBatchSize()).isEqualTo(103); assertThat(syncConfig.getForwardSyncMaxPendingBatches()).isEqualTo(8); - assertThat(syncConfig.getForwardSyncMaxBlocksPerMinute()).isEqualTo(80); + assertThat(syncConfig.getForwardSyncMaxBlocksPerMinute()).isEqualTo(100); + assertThat(syncConfig.getForwardSyncMaxBlobSidecarsPerMinute()).isEqualTo(400); } @Test @@ -246,12 +248,19 @@ public void forwardSyncMaxPendingBatches_shouldBeSettable() { } @Test - public void forwardSyncRateLimit_shouldBeSettable() { + public void forwardSyncBlocksRateLimit_shouldBeSettable() { TekuConfiguration tekuConfiguration = - getTekuConfigurationFromArguments("--Xp2p-sync-rate-limit", "10"); + getTekuConfigurationFromArguments("--Xp2p-sync-blocks-rate-limit", "10"); assertThat(tekuConfiguration.sync().getForwardSyncMaxBlocksPerMinute()).isEqualTo(10); } + @Test + public void forwardSyncBlobSidecarsRateLimit_shouldBeSettable() { + TekuConfiguration tekuConfiguration = + getTekuConfigurationFromArguments("--Xp2p-sync-blob-sidecars-rate-limit", "10"); + assertThat(tekuConfiguration.sync().getForwardSyncMaxBlobSidecarsPerMinute()).isEqualTo(10); + } + @Test public void forwardSyncBatchSize_greaterThanMessageSizeShouldThrowException() { assertThatThrownBy(() -> createConfigBuilder().sync(s -> s.forwardSyncBatchSize(3000)).build()) diff --git a/teku/src/test/resources/P2POptions_config.yaml b/teku/src/test/resources/P2POptions_config.yaml index 84524a21e8e..44d027d77a4 100644 --- a/teku/src/test/resources/P2POptions_config.yaml +++ b/teku/src/test/resources/P2POptions_config.yaml @@ -11,9 +11,11 @@ p2p-peer-lower-bound: 70 p2p-peer-upper-bound: 85 Xp2p-target-subnet-subscriber-count: 5 Xp2p-minimum-randomly-selected-peer-count: 1 -Xpeer-rate-limit: 100 +Xpeer-blocks-rate-limit: 100 +Xpeer-blob-sidecars-rate-limit: 400 Xpeer-request-limit: 101 Xp2p-historical-sync-batch-size: 102 Xp2p-sync-batch-size: 103 Xp2p-sync-max-pending-batches: 8 -Xp2p-sync-rate-limit: 80 \ No newline at end of file +Xp2p-sync-blocks-rate-limit: 100 +Xp2p-sync-blob-sidecars-rate-limit: 400 \ No newline at end of file