Skip to content

Commit

Permalink
Done with http
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Oct 1, 2024
1 parent c26977a commit b5a9cf8
Showing 1 changed file with 22 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOption;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.scalecube.services.Address;
import io.scalecube.services.ServiceReference;
Expand All @@ -14,10 +15,10 @@
import io.scalecube.services.transport.api.DataCodec;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,7 +30,6 @@
import reactor.netty.http.client.HttpClientResponse;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.SslProvider;

public final class HttpGatewayClientTransport implements ClientChannel, ClientTransport {

Expand All @@ -38,29 +38,18 @@ public final class HttpGatewayClientTransport implements ClientChannel, ClientTr
private static final String CONTENT_TYPE = "application/json";
private static final HttpGatewayClientCodec CLIENT_CODEC =
new HttpGatewayClientCodec(DataCodec.getInstance(CONTENT_TYPE));
private static final int CONNECT_TIMEOUT_MILLIS = (int) Duration.ofSeconds(5).toMillis();

private final GatewayClientCodec clientCodec;
private final LoopResources loopResources;
private final Address address;
private final Duration connectTimeout;
private final String contentType;
private final boolean followRedirect;
private final SslProvider sslProvider;
private final boolean shouldWiretap;
private final Map<String, String> headers;
private final Function<HttpClient, HttpClient> operator;
private final boolean ownsLoopResources;

private final AtomicReference<HttpClient> httpClientReference = new AtomicReference<>();

private HttpGatewayClientTransport(Builder builder) {
this.clientCodec = builder.clientCodec;
this.address = builder.address;
this.connectTimeout = builder.connectTimeout;
this.contentType = builder.contentType;
this.followRedirect = builder.followRedirect;
this.sslProvider = builder.sslProvider;
this.shouldWiretap = builder.shouldWiretap;
this.headers = builder.headers;
this.operator = builder.operator;
this.loopResources =
builder.loopResources == null
? LoopResources.create("http-gateway-client")
Expand All @@ -76,23 +65,12 @@ public ClientChannel create(ServiceReference serviceReference) {
return oldValue;
}

HttpClient httpClient =
return operator.apply(
HttpClient.create(ConnectionProvider.create("http-gateway-client"))
.headers(entries -> headers.forEach(entries::add))
.headers(entries -> entries.set("Content-Type", contentType))
.followRedirect(followRedirect)
.wiretap(shouldWiretap)
.runOn(loopResources)
.host(address.host())
.port(address.port())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) connectTimeout.toMillis())
.option(ChannelOption.TCP_NODELAY, true);

if (sslProvider != null) {
httpClient = httpClient.secure(sslProvider);
}

return httpClient;
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MILLIS)
.option(ChannelOption.TCP_NODELAY, true)
.headers(headers -> headers.set(HttpHeaderNames.CONTENT_TYPE, CONTENT_TYPE)));
});
return this;
}
Expand Down Expand Up @@ -168,95 +146,43 @@ public static class Builder {

private GatewayClientCodec clientCodec = CLIENT_CODEC;
private LoopResources loopResources;
private Address address;
private Duration connectTimeout = Duration.ofSeconds(5);
private String contentType = CONTENT_TYPE;
private boolean followRedirect;
private SslProvider sslProvider;
private boolean shouldWiretap;
private Map<String, String> headers = new HashMap<>();
private Function<HttpClient, HttpClient> operator = client -> client;

public Builder() {}

public GatewayClientCodec clientCodec() {
return clientCodec;
}

public Builder clientCodec(GatewayClientCodec clientCodec) {
this.clientCodec = clientCodec;
return this;
}

public LoopResources loopResources() {
return loopResources;
}

public Builder loopResources(LoopResources loopResources) {
this.loopResources = loopResources;
return this;
}

public Address address() {
return address;
}

public Builder address(Address address) {
this.address = address;
public Builder httpClient(UnaryOperator<HttpClient> operator) {
this.operator = this.operator.andThen(operator);
return this;
}

public Duration connectTimeout() {
return connectTimeout;
public Builder address(Address address) {
return httpClient(client -> client.host(address.host()).port(address.port()));
}

public Builder connectTimeout(Duration connectTimeout) {
this.connectTimeout = connectTimeout;
return this;
}

public String contentType() {
return contentType;
return httpClient(
client ->
client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) connectTimeout.toMillis()));
}

public Builder contentType(String contentType) {
this.contentType = contentType;
return this;
}

public boolean followRedirect() {
return followRedirect;
}

public Builder followRedirect(boolean followRedirect) {
this.followRedirect = followRedirect;
return this;
}

public SslProvider sslProvider() {
return sslProvider;
}

public Builder sslProvider(SslProvider sslProvider) {
this.sslProvider = sslProvider;
return this;
}

public boolean shouldWiretap() {
return shouldWiretap;
}

public Builder shouldWiretap(boolean wiretap) {
this.shouldWiretap = wiretap;
return this;
}

public Map<String, String> headers() {
return headers;
return httpClient(
client ->
client.headers(headers -> headers.set(HttpHeaderNames.CONTENT_TYPE, contentType)));
}

public Builder headers(Map<String, String> headers) {
this.headers = Collections.unmodifiableMap(new HashMap<>(headers));
return this;
return httpClient(client -> client.headers(entries -> headers.forEach(entries::set)));
}

public HttpGatewayClientTransport build() {
Expand Down

0 comments on commit b5a9cf8

Please sign in to comment.