Skip to content

Commit

Permalink
Revert "Reduce blobs lookup min wait time to 0 (#8864)" and "Deprecat…
Browse files Browse the repository at this point in the history
…e TTFB, RESP_TIMEOUT, ... (#8839)" (#8911)
  • Loading branch information
tbenr authored Dec 11, 2024
1 parent d633566 commit 6115f18
Show file tree
Hide file tree
Showing 14 changed files with 206 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,4 @@ public class NetworkConstants {
public static final int DEFAULT_SAFE_SLOTS_TO_IMPORT_OPTIMISTICALLY = 128;

public static final int NODE_ID_BITS = 256;

// https://github.com/ethereum/consensus-specs/pull/3767
public static final int MAX_CONCURRENT_REQUESTS = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
static final String GAUGE_BLOB_SIDECARS_TRACKERS_LABEL = "blob_sidecars_trackers";

static final UInt64 MAX_WAIT_RELATIVE_TO_ATT_DUE_MILLIS = UInt64.valueOf(1500);
static final UInt64 MIN_WAIT_MILLIS = UInt64.ZERO;
static final UInt64 MIN_WAIT_MILLIS = UInt64.valueOf(500);
static final UInt64 TARGET_WAIT_MILLIS = UInt64.valueOf(1000);

private final SettableLabelledGauge sizeGauge;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ public class ThrottlingTaskQueue {

private int inflightTaskCount = 0;

public static ThrottlingTaskQueue create(final int maximumConcurrentTasks) {
return new ThrottlingTaskQueue(maximumConcurrentTasks);
}

public static ThrottlingTaskQueue create(
final int maximumConcurrentTasks,
final MetricsSystem metricsSystem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import tech.pegasys.teku.networking.p2p.peer.Peer;
import tech.pegasys.teku.networking.p2p.peer.PeerConnectedSubscriber;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.config.SpecConfig;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessageSchema;
import tech.pegasys.teku.spec.datastructures.state.Checkpoint;
Expand All @@ -50,8 +51,6 @@
public class Eth2PeerManager implements PeerLookup, PeerHandler {
private static final Logger LOG = LogManager.getLogger();

private static final Duration STATUS_RECEIVED_TIMEOUT = Duration.ofSeconds(10);

private final AsyncRunner asyncRunner;
private final RecentChainData recentChainData;
private final Eth2PeerFactory eth2PeerFactory;
Expand All @@ -67,6 +66,7 @@ public class Eth2PeerManager implements PeerLookup, PeerHandler {
private final int eth2RpcOutstandingPingThreshold;

private final Duration eth2StatusUpdateInterval;
private final SpecConfig specConfig;

Eth2PeerManager(
final Spec spec,
Expand Down Expand Up @@ -99,6 +99,7 @@ public class Eth2PeerManager implements PeerLookup, PeerHandler {
this.eth2RpcPingInterval = eth2RpcPingInterval;
this.eth2RpcOutstandingPingThreshold = eth2RpcOutstandingPingThreshold;
this.eth2StatusUpdateInterval = eth2StatusUpdateInterval;
this.specConfig = spec.getGenesisSpecConfig();
}

public static Eth2PeerManager create(
Expand Down Expand Up @@ -236,7 +237,7 @@ private void ensureStatusReceived(final Eth2Peer peer) {
.ifExceptionGetsHereRaiseABug();
}
},
STATUS_RECEIVED_TIMEOUT)
Duration.ofSeconds(specConfig.getRespTimeout()))
.finish(
() -> {},
error -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ public static BeaconChainMethods create(
final MetadataMessagesFactory metadataMessagesFactory,
final RpcEncoding rpcEncoding) {
return new BeaconChainMethods(
createStatus(asyncRunner, statusMessageFactory, peerLookup, rpcEncoding),
createGoodBye(asyncRunner, metricsSystem, peerLookup, rpcEncoding),
createStatus(spec, asyncRunner, statusMessageFactory, peerLookup, rpcEncoding),
createGoodBye(spec, asyncRunner, metricsSystem, peerLookup, rpcEncoding),
createBeaconBlocksByRoot(
spec, metricsSystem, asyncRunner, recentChainData, peerLookup, rpcEncoding),
createBeaconBlocksByRange(
Expand Down Expand Up @@ -144,10 +144,11 @@ public static BeaconChainMethods create(
rpcEncoding,
recentChainData),
createMetadata(spec, asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding),
createPing(asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding));
createPing(spec, asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding));
}

private static Eth2RpcMethod<StatusMessage, StatusMessage> createStatus(
final Spec spec,
final AsyncRunner asyncRunner,
final StatusMessageFactory statusMessageFactory,
final PeerLookup peerLookup,
Expand All @@ -164,10 +165,12 @@ private static Eth2RpcMethod<StatusMessage, StatusMessage> createStatus(
true,
contextCodec,
statusHandler,
peerLookup);
peerLookup,
spec.getNetworkingConfig());
}

private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
final Spec spec,
final AsyncRunner asyncRunner,
final MetricsSystem metricsSystem,
final PeerLookup peerLookup,
Expand All @@ -184,7 +187,8 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
false,
contextCodec,
goodbyeHandler,
peerLookup);
peerLookup,
spec.getNetworkingConfig());
}

private static Eth2RpcMethod<BeaconBlocksByRootRequestMessage, SignedBeaconBlock>
Expand Down Expand Up @@ -217,7 +221,8 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
expectResponseToRequest,
forkDigestContextCodec,
beaconBlocksByRootHandler,
peerLookup);
peerLookup,
spec.getNetworkingConfig());

return VersionedEth2RpcMethod.create(
rpcEncoding, requestType, expectResponseToRequest, List.of(v2Method));
Expand Down Expand Up @@ -254,7 +259,8 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
expectResponseToRequest,
forkDigestContextCodec,
beaconBlocksByRangeHandler,
peerLookup);
peerLookup,
spec.getNetworkingConfig());

return VersionedEth2RpcMethod.create(
rpcEncoding, requestType, expectResponseToRequest, List.of(v2Method));
Expand Down Expand Up @@ -293,7 +299,8 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
true,
forkDigestContextCodec,
blobSidecarsByRootHandler,
peerLookup));
peerLookup,
spec.getNetworkingConfig()));
}

private static Optional<Eth2RpcMethod<BlobSidecarsByRangeRequestMessage, BlobSidecar>>
Expand Down Expand Up @@ -329,7 +336,8 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
true,
forkDigestContextCodec,
blobSidecarsByRangeHandler,
peerLookup));
peerLookup,
spec.getNetworkingConfig()));
}

private static Eth2RpcMethod<EmptyMessage, MetadataMessage> createMetadata(
Expand Down Expand Up @@ -361,7 +369,8 @@ private static Eth2RpcMethod<EmptyMessage, MetadataMessage> createMetadata(
expectResponse,
phase0ContextCodec,
messageHandler,
peerLookup);
peerLookup,
spec.getNetworkingConfig());

if (spec.isMilestoneSupported(SpecMilestone.ALTAIR)) {
final SszSchema<MetadataMessage> altairMetadataSchema =
Expand All @@ -383,7 +392,8 @@ private static Eth2RpcMethod<EmptyMessage, MetadataMessage> createMetadata(
expectResponse,
altairContextCodec,
messageHandler,
peerLookup);
peerLookup,
spec.getNetworkingConfig());
return VersionedEth2RpcMethod.create(
rpcEncoding, requestType, expectResponse, List.of(v2Method, v1Method));
} else {
Expand All @@ -392,6 +402,7 @@ private static Eth2RpcMethod<EmptyMessage, MetadataMessage> createMetadata(
}

private static Eth2RpcMethod<PingMessage, PingMessage> createPing(
final Spec spec,
final AsyncRunner asyncRunner,
final MetadataMessagesFactory metadataMessagesFactory,
final PeerLookup peerLookup,
Expand All @@ -408,7 +419,8 @@ private static Eth2RpcMethod<PingMessage, PingMessage> createPing(
true,
contextCodec,
statusHandler,
peerLookup);
peerLookup,
spec.getNetworkingConfig());
}

public Collection<RpcMethod<?, ?, ?>> all() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import tech.pegasys.teku.networking.p2p.rpc.RpcRequestHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcStream;
import tech.pegasys.teku.networking.p2p.rpc.StreamClosedException;
import tech.pegasys.teku.spec.config.NetworkingSpecConfig;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.RpcRequest;

public class Eth2IncomingRequestHandler<
TRequest extends RpcRequest & SszData, TResponse extends SszData>
implements RpcRequestHandler {
private static final Logger LOG = LogManager.getLogger();
private static final Duration RECEIVE_INCOMING_REQUEST_TIMEOUT = Duration.ofSeconds(10);

private final PeerLookup peerLookup;
private final LocalMessageHandler<TRequest, TResponse> localMessageHandler;
Expand All @@ -45,20 +45,23 @@ public class Eth2IncomingRequestHandler<
private final String protocolId;
private final AsyncRunner asyncRunner;
private final AtomicBoolean requestHandled = new AtomicBoolean(false);
private final Duration respTimeout;

public Eth2IncomingRequestHandler(
final String protocolId,
final RpcResponseEncoder<TResponse, ?> responseEncoder,
final RpcRequestDecoder<TRequest> requestDecoder,
final AsyncRunner asyncRunner,
final PeerLookup peerLookup,
final LocalMessageHandler<TRequest, TResponse> localMessageHandler) {
final LocalMessageHandler<TRequest, TResponse> localMessageHandler,
final NetworkingSpecConfig networkingConfig) {
this.protocolId = protocolId;
this.asyncRunner = asyncRunner;
this.peerLookup = peerLookup;
this.localMessageHandler = localMessageHandler;
this.responseEncoder = responseEncoder;
this.requestDecoder = requestDecoder;
this.respTimeout = Duration.ofSeconds(networkingConfig.getRespTimeout());
}

@Override
Expand Down Expand Up @@ -118,14 +121,15 @@ private void handleRequest(
}

private void ensureRequestReceivedWithinTimeLimit(final RpcStream stream) {
final Duration timeout = respTimeout;
asyncRunner
.getDelayedFuture(RECEIVE_INCOMING_REQUEST_TIMEOUT)
.getDelayedFuture(timeout)
.thenAccept(
(__) -> {
if (!requestHandled.get()) {
LOG.debug(
"Failed to receive incoming request data within {} sec for protocol {}. Close stream.",
RECEIVE_INCOMING_REQUEST_TIMEOUT.toSeconds(),
timeout.toSeconds(),
protocolId);
stream.closeAbruptly().ifExceptionGetsHereRaiseABug();
}
Expand Down
Loading

0 comments on commit 6115f18

Please sign in to comment.