Skip to content

Commit

Permalink
Introduce HTTP/2 keep alive (#1029)
Browse files Browse the repository at this point in the history
__Motivation__

HTTP/2 PING frames are useful to keep a connection alive relatively cheaply in presence of long running streams. ServiceTalk should support adding this functionality for both clients and servers.

__Modification__

- Add `KeepAlivePolicy` to `H2ProtocolConfig` that can be configured to enable keep-alive behavior on either clients or servers. Following features are provided:
    -- Specify an idleness threshold, after which a PING frame will be sent on the connection to detect liveness.
    -- Specify an ack timeout, within which we expect an ack for the sent PING. If no ack is received, the connection is closed.
    -- Specify whether PING frames should be sent even when there are no active streams. This defaults to `false`.

__Result__

Keep alive behavior can be enabled for either client or server.
  • Loading branch information
Nitesh Kant authored Apr 24, 2020
1 parent b22e5ea commit 29c4f7d
Show file tree
Hide file tree
Showing 12 changed files with 1,133 additions and 183 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Copyright © 2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.grpc.netty;

import io.servicetalk.concurrent.api.CompositeCloseable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.internal.ServiceTalkTestTimeout;
import io.servicetalk.grpc.api.GrpcClientBuilder;
import io.servicetalk.grpc.api.GrpcServerBuilder;
import io.servicetalk.grpc.api.GrpcServiceContext;
import io.servicetalk.grpc.netty.TesterProto.TestRequest;
import io.servicetalk.grpc.netty.TesterProto.TestResponse;
import io.servicetalk.grpc.netty.TesterProto.Tester.ServiceFactory;
import io.servicetalk.grpc.netty.TesterProto.Tester.TesterClient;
import io.servicetalk.grpc.netty.TesterProto.Tester.TesterService;
import io.servicetalk.http.netty.H2KeepAlivePolicies;
import io.servicetalk.http.netty.H2ProtocolConfig;
import io.servicetalk.http.netty.HttpProtocolConfigs;
import io.servicetalk.transport.api.HostAndPort;
import io.servicetalk.transport.api.ServerContext;
import io.servicetalk.transport.api.ServiceTalkSocketOptions;

import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;

import static io.servicetalk.concurrent.api.AsyncCloseables.newCompositeCloseable;
import static io.servicetalk.concurrent.api.Publisher.never;
import static io.servicetalk.concurrent.api.Single.succeeded;
import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress;
import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort;
import static java.time.Duration.ofSeconds;
import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeThat;

@RunWith(Parameterized.class)
public class KeepAliveTest {
private final TesterClient client;
private final ServerContext ctx;

@Rule
public final Timeout timeout = new ServiceTalkTestTimeout(1, MINUTES);
private final long idleTimeoutMillis;

public KeepAliveTest(final boolean keepAlivesFromClient,
final Function<String, H2ProtocolConfig> protocolConfigSupplier,
final long idleTimeoutMillis) throws Exception {
this.idleTimeoutMillis = idleTimeoutMillis;
GrpcServerBuilder serverBuilder = GrpcServers.forAddress(localAddress(0));
if (!keepAlivesFromClient) {
serverBuilder.protocols(protocolConfigSupplier.apply("servicetalk-tests-server-wire-logger"));
} else {
serverBuilder.socketOption(ServiceTalkSocketOptions.IDLE_TIMEOUT, idleTimeoutMillis)
.protocols(HttpProtocolConfigs.h2()
.enableFrameLogging("servicetalk-tests-server-wire-logger").build());
}
ctx = serverBuilder.listenAndAwait(new ServiceFactory(new InfiniteStreamsService()));
GrpcClientBuilder<HostAndPort, InetSocketAddress> clientBuilder =
GrpcClients.forAddress(serverHostAndPort(ctx));
if (keepAlivesFromClient) {
clientBuilder.protocols(protocolConfigSupplier.apply("servicetalk-tests-client-wire-logger"));
} else {
clientBuilder.socketOption(ServiceTalkSocketOptions.IDLE_TIMEOUT, idleTimeoutMillis)
.protocols(HttpProtocolConfigs.h2()
.enableFrameLogging("servicetalk-tests-client-wire-logger").build());
}
client = clientBuilder.build(new TesterProto.Tester.ClientFactory());
}

@Parameterized.Parameters(name = "keepAlivesFromClient? {0}, idleTimeout: {2}")
public static Collection<Object[]> data() {
return asList(newParam(true, ofSeconds(10), ofSeconds(12)),
newParam(false, ofSeconds(10), ofSeconds(12)));
}

private static Object[] newParam(final boolean keepAlivesFromClient, final Duration keepAliveIdleDuration,
final Duration idleTimeoutDuration) {
return new Object[] {keepAlivesFromClient,
(Function<String, H2ProtocolConfig>) frameLogger ->
HttpProtocolConfigs.h2()
.keepAlivePolicy(H2KeepAlivePolicies.whenIdleFor(keepAliveIdleDuration))
.enableFrameLogging(frameLogger).build(),
idleTimeoutDuration.toMillis()};
}

@After
public void tearDown() throws Exception {
CompositeCloseable closeable = newCompositeCloseable().appendAll(client, ctx);
closeable.close();
}

@Test
public void bidiStream() throws Exception {
// Ignore test on CI due to high timeouts
assumeThat(ServiceTalkTestTimeout.CI, is(false));

try {
client.testBiDiStream(never()).toFuture().get(idleTimeoutMillis + 100, MILLISECONDS);
fail("Unexpected response available.");
} catch (TimeoutException e) {
// expected
}
}

@Test
public void requestStream() throws Exception {
// Ignore test on CI due to high timeouts
assumeThat(ServiceTalkTestTimeout.CI, is(false));

try {
client.testRequestStream(never()).toFuture().get(idleTimeoutMillis + 100, MILLISECONDS);
fail("Unexpected response available.");
} catch (TimeoutException e) {
// expected
}
}

@Test
public void responseStream() throws Exception {
// Ignore test on CI due to high timeouts
assumeThat(ServiceTalkTestTimeout.CI, is(false));

try {
client.testResponseStream(TestRequest.newBuilder().build())
.toFuture().get(idleTimeoutMillis + 100, MILLISECONDS);
fail("Unexpected response available.");
} catch (TimeoutException e) {
// expected
}
}

private static final class InfiniteStreamsService implements TesterService {

@Override
public Publisher<TestResponse> testBiDiStream(final GrpcServiceContext ctx,
final Publisher<TestRequest> request) {
return request.map(testRequest -> TestResponse.newBuilder().build());
}

@Override
public Single<TestResponse> testRequestStream(final GrpcServiceContext ctx,
final Publisher<TestRequest> request) {
return request.collect(() -> null, (testResponse, testRequest) -> null)
.map(__ -> TestResponse.newBuilder().build());
}

@Override
public Publisher<TestResponse> testResponseStream(final GrpcServiceContext ctx, final TestRequest request) {
return never();
}

@Override
public Single<TestResponse> test(final GrpcServiceContext ctx, final TestRequest request) {
return succeeded(TestResponse.newBuilder().build());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright © 2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.netty;

import io.servicetalk.http.netty.H2ProtocolConfig.KeepAlivePolicy;

import java.time.Duration;

import static java.util.Objects.requireNonNull;

final class DefaultKeepAlivePolicy implements KeepAlivePolicy {
private final Duration idleDuration;
private final Duration ackTimeout;
private final boolean withoutActiveStreams;

DefaultKeepAlivePolicy(final Duration idleDuration, final Duration ackTimeout, final boolean withoutActiveStreams) {
this.idleDuration = requireNonNull(idleDuration);
this.ackTimeout = requireNonNull(ackTimeout);
this.withoutActiveStreams = withoutActiveStreams;
}

@Override
public Duration idleDuration() {
return idleDuration;
}

@Override
public Duration ackTimeout() {
return ackTimeout;
}

@Override
public boolean withoutActiveStreams() {
return withoutActiveStreams;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@
final class H2ClientParentConnectionContext extends H2ParentConnectionContext {
private H2ClientParentConnectionContext(Channel channel, BufferAllocator allocator, Executor executor,
FlushStrategy flushStrategy, @Nullable Long idleTimeoutMs,
HttpExecutionStrategy executionStrategy) {
super(channel, allocator, executor, flushStrategy, idleTimeoutMs, executionStrategy);
HttpExecutionStrategy executionStrategy,
final KeepAliveManager keepAliveManager) {
super(channel, allocator, executor, flushStrategy, idleTimeoutMs, executionStrategy, keepAliveManager);
}

interface H2ClientParentConnection extends FilterableStreamingHttpConnection, NettyConnectionContext {
Expand All @@ -103,8 +104,10 @@ protected void handleSubscribe(final Subscriber<? super H2ClientParentConnection
final ChannelPipeline pipeline;
try {
delayedCancellable = new DelayedCancellable();
KeepAliveManager keepAliveManager = new KeepAliveManager(channel, config.keepAlivePolicy());
H2ClientParentConnectionContext connection = new H2ClientParentConnectionContext(channel,
allocator, executor, parentFlushStrategy, idleTimeoutMs, executionStrategy);
allocator, executor, parentFlushStrategy, idleTimeoutMs, executionStrategy,
keepAliveManager);
channel.attr(CHANNEL_CLOSEABLE_KEY).set(connection);
// We need the NettyToStChannelInboundHandler to be last in the pipeline. We accomplish that by
// calling the ChannelInitializer before we do addLast for the NettyToStChannelInboundHandler.
Expand Down
Loading

0 comments on commit 29c4f7d

Please sign in to comment.