Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Oct 1, 2024
1 parent 2a99fc4 commit b74fc16
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public final class HttpGatewayClientTransport implements ClientChannel, ClientTr
private static final Logger LOGGER = LoggerFactory.getLogger(HttpGatewayClientTransport.class);

private static final String CONTENT_TYPE = "application/json";
private static final LoopResources LOOP_RESOURCES = LoopResources.create("http-gateway-client");
private static final HttpGatewayClientCodec CLIENT_CODEC =
new HttpGatewayClientCodec(DataCodec.getInstance(CONTENT_TYPE));

Expand All @@ -49,20 +48,24 @@ public final class HttpGatewayClientTransport implements ClientChannel, ClientTr
private final SslProvider sslProvider;
private final boolean shouldWiretap;
private final Map<String, String> headers;
private final boolean ownsLoopResources;

private ConnectionProvider connectionProvider;
private final AtomicReference<HttpClient> httpClientReference = new AtomicReference<>();

private HttpGatewayClientTransport(Builder builder) {
this.clientCodec = builder.clientCodec;
this.loopResources = builder.loopResources;
this.address = builder.address;
this.connectTimeout = builder.connectTimeout;
this.contentType = builder.contentType;
this.followRedirect = builder.followRedirect;
this.sslProvider = builder.sslProvider;
this.shouldWiretap = builder.shouldWiretap;
this.headers = builder.headers;
this.loopResources =
builder.loopResources == null
? LoopResources.create("http-gateway-client")
: builder.loopResources;
this.ownsLoopResources = builder.loopResources == null;
}

@Override
Expand All @@ -73,10 +76,8 @@ public ClientChannel create(ServiceReference serviceReference) {
return oldValue;
}

connectionProvider = ConnectionProvider.create("http-gateway-client");

HttpClient httpClient =
HttpClient.create(connectionProvider)
HttpClient.create(ConnectionProvider.create("http-gateway-client"))
.headers(entries -> headers.forEach(entries::add))
.headers(entries -> entries.set("Content-Type", contentType))
.followRedirect(followRedirect)
Expand Down Expand Up @@ -158,15 +159,15 @@ private static boolean isError(HttpResponseStatus status) {

@Override
public void close() {
if (connectionProvider != null) {
connectionProvider.dispose();
if (ownsLoopResources) {
loopResources.dispose();
}
}

public static class Builder {

private GatewayClientCodec clientCodec = CLIENT_CODEC;
private LoopResources loopResources = LOOP_RESOURCES;
private LoopResources loopResources;
private Address address;
private Duration connectTimeout = Duration.ofSeconds(5);
private String contentType = CONTENT_TYPE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ public final class WebsocketGatewayClientTransport implements ClientChannel, Cli
private static final String CONTENT_TYPE = "application/json";
private static final String STREAM_ID = "sid";

private static final LoopResources LOOP_RESOURCES =
LoopResources.create("websocket-gateway-client");
private static final WebsocketGatewayClientCodec CLIENT_CODEC = new WebsocketGatewayClientCodec();

private final GatewayClientCodec clientCodec;
Expand All @@ -50,15 +48,14 @@ public final class WebsocketGatewayClientTransport implements ClientChannel, Cli
private final boolean shouldWiretap;
private final Duration keepAliveInterval;
private final Map<String, String> headers;
private final boolean ownsLoopResources;

private final AtomicLong sidCounter = new AtomicLong();
private ConnectionProvider connectionProvider;
private final AtomicReference<WebsocketGatewayClientSession> clientSessionReference =
new AtomicReference<>();

private WebsocketGatewayClientTransport(Builder builder) {
this.clientCodec = builder.clientCodec;
this.loopResources = builder.loopResources;
this.address = builder.address;
this.connectTimeout = builder.connectTimeout;
this.contentType = builder.contentType;
Expand All @@ -67,6 +64,11 @@ private WebsocketGatewayClientTransport(Builder builder) {
this.shouldWiretap = builder.shouldWiretap;
this.keepAliveInterval = builder.keepAliveInterval;
this.headers = builder.headers;
this.loopResources =
builder.loopResources == null
? LoopResources.create("websocket-gateway-client")
: builder.loopResources;
this.ownsLoopResources = builder.loopResources == null;
}

@Override
Expand All @@ -77,10 +79,8 @@ public ClientChannel create(ServiceReference serviceReference) {
return oldValue;
}

connectionProvider = ConnectionProvider.newConnection();

HttpClient httpClient =
HttpClient.create(connectionProvider)
HttpClient.create(ConnectionProvider.newConnection())
.headers(entries -> headers.forEach(entries::add))
.headers(entries -> entries.set("Content-Type", contentType))
.followRedirect(followRedirect)
Expand Down Expand Up @@ -208,15 +208,15 @@ private ByteBuf encodeRequest(ServiceMessage message, long sid) {

@Override
public void close() {
if (connectionProvider != null) {
connectionProvider.dispose();
if (ownsLoopResources) {
loopResources.dispose();
}
}

public static class Builder {

private GatewayClientCodec clientCodec = CLIENT_CODEC;
private LoopResources loopResources = LOOP_RESOURCES;
private LoopResources loopResources;
private Address address;
private Duration connectTimeout = Duration.ofSeconds(5);
private String contentType = CONTENT_TYPE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@

class HttpClientConnectionTest extends BaseTest {

private static final Duration TIMEOUT = Duration.ofSeconds(10);

private Microservices gateway;
private Address gatewayAddress;
private Microservices microservices;

private final AtomicInteger onCloseCounter = new AtomicInteger();
private final AtomicInteger onCancelCounter = new AtomicInteger();

@BeforeEach
void beforEach() {
Expand All @@ -56,7 +58,7 @@ void beforEach() {
.membership(
opts -> opts.seedMembers(gateway.discoveryAddress().toString())))
.transport(RSocketServiceTransport::new)
.services(new TestServiceImpl(onCloseCounter))
.services(new TestServiceImpl(onCancelCounter))
.startAwait();
}

Expand All @@ -73,17 +75,17 @@ void afterEach() {
void testCloseServiceStreamAfterLostConnection() {
try (ServiceCall serviceCall = serviceCall(gatewayAddress)) {
StepVerifier.create(serviceCall.api(TestService.class).oneNever("body").log("<<<"))
.thenAwait(Duration.ofSeconds(5))
.thenAwait(Duration.ofSeconds(3))
.then(serviceCall::close)
.expectError(IOException.class)
.verify(Duration.ofSeconds(10));
.verify(TIMEOUT);

Mono.delay(Duration.ofMillis(100))
.repeat(() -> onCloseCounter.get() != 1)
.repeat(() -> onCancelCounter.get() != 1)
.then()
.block(Duration.ofSeconds(10));
.block(TIMEOUT);

assertEquals(1, onCloseCounter.get());
assertEquals(1, onCancelCounter.get());
}
}

Expand All @@ -95,7 +97,7 @@ public void testCallRepeatedlyByInvalidAddress() {
StepVerifier.create(serviceCall.api(TestService.class).oneNever("body").log("<<<"))
.thenAwait(Duration.ofSeconds(1))
.expectError(IOException.class)
.verify(Duration.ofSeconds(10));
.verify(TIMEOUT);
}
}
}
Expand All @@ -110,20 +112,20 @@ private static ServiceCall serviceCall(Address address) {
public interface TestService {

@ServiceMethod
Mono<Long> oneNever(String name);
Mono<Void> oneNever(String name);
}

private static class TestServiceImpl implements TestService {

private final AtomicInteger onCloseCounter;
private final AtomicInteger onCancelCounter;

public TestServiceImpl(AtomicInteger onCloseCounter) {
this.onCloseCounter = onCloseCounter;
public TestServiceImpl(AtomicInteger onCancelCounter) {
this.onCancelCounter = onCancelCounter;
}

@Override
public Mono<Long> oneNever(String name) {
return Mono.<Long>never().log(">>>").doOnCancel(onCloseCounter::incrementAndGet);
public Mono<Void> oneNever(String name) {
return Mono.never().log(">>>").doOnCancel(onCancelCounter::incrementAndGet).then();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void testCloseServiceStreamAfterLostConnection() {
.thenAwait(Duration.ofSeconds(5))
.then(serviceCall::close)
.expectError(IOException.class)
.verify(Duration.ofSeconds(10));
.verify(TIMEOUT);

Mono.delay(Duration.ofMillis(100))
.repeat(() -> onCloseCounter.get() != 1)
Expand All @@ -109,7 +109,7 @@ public void testCallRepeatedlyByInvalidAddress() {
StepVerifier.create(serviceCall.api(TestService.class).manyNever().log("<<<"))
.thenAwait(Duration.ofSeconds(1))
.expectError(IOException.class)
.verify(Duration.ofSeconds(10));
.verify(TIMEOUT);
}
}
}
Expand Down

0 comments on commit b74fc16

Please sign in to comment.