Skip to content

Commit

Permalink
feat: support new OpenTelemetry feature
Browse files Browse the repository at this point in the history
BREAKING CHANGE: Tracing plugin has been removed and is now embedded inside node framework
  • Loading branch information
guillaumelamirand committed Nov 6, 2024
1 parent cc9f527 commit 54672dd
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 100 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version: 2.1
setup: true

orbs:
gravitee: gravitee-io/gravitee@4.1.1
gravitee: gravitee-io/gravitee@4.8.0

# our single workflow, that triggers the setup job defined above, filters on tag and branches are needed otherwise
# some workflow and job will not be triggered for tags (default CircleCI behavior)
Expand Down
32 changes: 11 additions & 21 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,12 @@

<groupId>io.gravitee.connector</groupId>
<artifactId>gravitee-connector-http</artifactId>
<version>4.0.4-alpha.1</version>
<version>5.0.0-archi-401-opentelemetry-SNAPSHOT</version>

<name>Gravitee.io - Connector - HTTP</name>

<properties>
<gravitee-bom.version>7.0.14</gravitee-bom.version>
<gravitee-common.version>4.0.0</gravitee-common.version>
<gravitee-connector-api.version>1.1.5</gravitee-connector-api.version>
<gravitee-gateway-api.version>3.5.0</gravitee-gateway-api.version>
<gravitee-expression-language.version>3.1.0</gravitee-expression-language.version>
<gravitee-node.version>5.11.0</gravitee-node.version>
<gravitee-apim.version>4.6.0-SNAPSHOT</gravitee-apim.version>

<maven-assembly-plugin.version>3.7.1</maven-assembly-plugin.version>
<!-- Property used by the publication job in CI-->
Expand All @@ -50,53 +45,43 @@
<dependencies>
<!-- Import bom to properly inherit all dependencies -->
<dependency>
<groupId>io.gravitee</groupId>
<artifactId>gravitee-bom</artifactId>
<version>${gravitee-bom.version}</version>
<groupId>io.gravitee.apim</groupId>
<artifactId>gravitee-apim-bom</artifactId>
<version>${gravitee-apim.version}</version>
<scope>import</scope>
<type>pom</type>
</dependency>
<dependency>
<groupId>io.gravitee.gateway</groupId>
<artifactId>gravitee-gateway-api</artifactId>
<version>${gravitee-gateway-api.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>io.gravitee.common</groupId>
<artifactId>gravitee-common</artifactId>
<version>${gravitee-common.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.gravitee.connector</groupId>
<artifactId>gravitee-connector-api</artifactId>
<version>${gravitee-connector-api.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.gravitee.el</groupId>
<artifactId>gravitee-expression-language</artifactId>
<version>${gravitee-expression-language.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.gravitee.node</groupId>
<artifactId>gravitee-node-api</artifactId>
<version>${gravitee-node.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.gravitee.node</groupId>
<artifactId>gravitee-node-vertx</artifactId>
<version>${gravitee-node.version}</version>
<scope>provided</scope>
</dependency>

Expand All @@ -123,7 +108,12 @@
<dependency>
<groupId>io.gravitee.apim.gateway</groupId>
<artifactId>gravitee-apim-gateway-buffer</artifactId>
<version>4.0.25</version>
<version>${gravitee-apim.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.gravitee.node</groupId>
<artifactId>gravitee-node-opentelemetry</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.gravitee.connector.api.Response;
import io.gravitee.connector.http.endpoint.HttpEndpoint;
import io.gravitee.gateway.api.ExecutionContext;
import io.gravitee.gateway.api.handler.Handler;
import io.vertx.core.http.HttpClient;

Expand All @@ -33,6 +34,7 @@ public AbstractHttpConnection(E endpoint) {
}

public abstract void connect(
final ExecutionContext context,
HttpClient httpClient,
int port,
String host,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.net.*;
import io.vertx.core.net.JksOptions;
import io.vertx.core.net.OpenSSLEngineOptions;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.core.net.PemTrustOptions;
import io.vertx.core.net.PfxOptions;
import io.vertx.core.net.ProxyOptions;
import io.vertx.core.net.ProxyType;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
Expand Down Expand Up @@ -133,6 +139,7 @@ public void request(ExecutionContext context, ProxyRequest request, Handler<Conn

// Connect to the upstream
connection.connect(
context,
client,
port,
url.getHost(),
Expand Down
130 changes: 78 additions & 52 deletions src/main/java/io/gravitee/connector/http/HttpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,26 @@
import io.gravitee.connector.api.response.ClientConnectionErrorResponse;
import io.gravitee.connector.api.response.ClientConnectionTimeoutResponse;
import io.gravitee.connector.http.endpoint.HttpEndpoint;
import io.gravitee.gateway.api.ExecutionContext;
import io.gravitee.gateway.api.buffer.Buffer;
import io.gravitee.gateway.api.handler.Handler;
import io.gravitee.gateway.api.http.HttpHeaders;
import io.gravitee.gateway.api.http2.HttpFrame;
import io.gravitee.gateway.api.proxy.ProxyRequest;
import io.gravitee.gateway.api.stream.WriteStream;
import io.gravitee.node.api.opentelemetry.Span;
import io.gravitee.node.api.opentelemetry.http.ObservableHttpClientRequest;
import io.gravitee.node.api.opentelemetry.http.ObservableHttpClientResponse;
import io.netty.channel.ConnectTimeoutException;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.http.*;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.RequestOptions;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -84,7 +92,15 @@ public HttpConnection(HttpEndpoint endpoint, ProxyRequest request) {
}

@Override
public void connect(HttpClient httpClient, int port, String host, String uri, Handler<Void> connectionHandler, Handler<Void> tracker) {
public void connect(
final ExecutionContext ctx,
HttpClient httpClient,
int port,
String host,
String uri,
Handler<Void> connectionHandler,
Handler<Void> tracker
) {
// Remove HOP-by-HOP headers
for (CharSequence header : HOP_HEADERS) {
request.headers().remove(header.toString());
Expand All @@ -95,48 +111,50 @@ public void connect(HttpClient httpClient, int port, String host, String uri, Ha
request.headers().remove(io.gravitee.common.http.HttpHeaders.ACCEPT_ENCODING);
}

Future<HttpClientRequest> requestFuture = prepareUpstreamRequest(httpClient, port, host, uri);
requestFuture.onComplete(
new io.vertx.core.Handler<>() {
@Override
public void handle(AsyncResult<HttpClientRequest> event) {
cancelHandler(tracker);

if (event.succeeded()) {
httpClientRequest = event.result();

httpClientRequest.response(response -> {
// Prepare upstream response
handleUpstreamResponse(response, tracker);
});

httpClientRequest
.connection()
.exceptionHandler(t -> {
LOGGER.debug(
"Exception occurs during HTTP connection for api [{}] & request id [{}]: {}",
request.metrics().getApi(),
request.metrics().getRequestId(),
t.getMessage()
);
request.metrics().setMessage(t.getMessage());
});

httpClientRequest.exceptionHandler(exEvent -> {
if (!isCanceled() && !isTransmitted()) {
handleException(event.cause());
tracker.handle(null);
}
});
connectionHandler.handle(null);
} else {
connectionHandler.handle(null);
RequestOptions requestOptions = prepareRequestOptions(port, host, uri);
ObservableHttpClientRequest observableHttpClientRequest = new ObservableHttpClientRequest(requestOptions);
Span requestSpan = ctx.getTracer().startSpanFrom(observableHttpClientRequest);
Future<HttpClientRequest> requestFuture = prepareUpstreamRequest(httpClient, requestOptions);
requestFuture.onComplete(event -> {
cancelHandler(tracker);

if (event.succeeded()) {
httpClientRequest = event.result();
observableHttpClientRequest.httpClientRequest(httpClientRequest);

httpClientRequest.response(response -> {
// Prepare upstream response
handleUpstreamResponse(ctx, response, tracker, requestSpan);
});

httpClientRequest
.connection()
.exceptionHandler(t -> {
ctx.getTracer().endOnError(requestSpan, t);
LOGGER.debug(
"Exception occurs during HTTP connection for api [{}] & request id [{}]: {}",
request.metrics().getApi(),
request.metrics().getRequestId(),
t.getMessage()
);
request.metrics().setMessage(t.getMessage());
});

httpClientRequest.exceptionHandler(exEvent -> {
ctx.getTracer().endOnError(requestSpan, event.cause());
if (!isCanceled() && !isTransmitted()) {
handleException(event.cause());
tracker.handle(null);
}
}
});
connectionHandler.handle(null);
} else {
ctx.getTracer().endOnError(requestSpan, event.cause());
connectionHandler.handle(null);
handleException(event.cause());
tracker.handle(null);
}
);
});
}

private void handleException(Throwable cause) {
Expand All @@ -163,27 +181,34 @@ private void handleException(Throwable cause) {
}
}

protected Future<HttpClientRequest> prepareUpstreamRequest(HttpClient httpClient, int port, String host, String uri) {
protected RequestOptions prepareRequestOptions(int port, String host, String uri) {
return new RequestOptions()
.setHost(host)
.setMethod(HttpMethod.valueOf(request.method().name()))
.setPort(port)
.setURI(uri)
.setTimeout(endpoint.getHttpClientOptions().getReadTimeout())
.setFollowRedirects(endpoint.getHttpClientOptions().isFollowRedirects());
}

protected Future<HttpClientRequest> prepareUpstreamRequest(HttpClient httpClient, RequestOptions requestOptions) {
// Prepare HTTP request
return httpClient.request(
new RequestOptions()
.setHost(host)
.setMethod(HttpMethod.valueOf(request.method().name()))
.setPort(port)
.setURI(uri)
.setTimeout(endpoint.getHttpClientOptions().getReadTimeout())
.setFollowRedirects(endpoint.getHttpClientOptions().isFollowRedirects())
);
return httpClient.request(requestOptions);
}

protected T createProxyResponse(HttpClientResponse clientResponse) {
return (T) new HttpResponse(clientResponse);
}

protected T handleUpstreamResponse(final AsyncResult<HttpClientResponse> clientResponseFuture, Handler<Void> tracker) {
protected T handleUpstreamResponse(
final ExecutionContext ctx,
final AsyncResult<HttpClientResponse> clientResponseFuture,
Handler<Void> tracker,
final Span requestSpan
) {
if (clientResponseFuture.succeeded()) {
HttpClientResponse clientResponse = clientResponseFuture.result();

ctx.getTracer().endWithResponse(requestSpan, new ObservableHttpClientResponse(clientResponse));
response = createProxyResponse(clientResponse);

if (isSse(request)) {
Expand Down Expand Up @@ -225,6 +250,7 @@ protected T handleUpstreamResponse(final AsyncResult<HttpClientResponse> clientR
// And send it to the client
sendToClient(response);
} else {
ctx.getTracer().endWithResponseAndError(requestSpan, clientResponseFuture.result(), clientResponseFuture.cause());
handleException(clientResponseFuture.cause());
tracker.handle(null);
}
Expand Down
33 changes: 20 additions & 13 deletions src/main/java/io/gravitee/connector/http/grpc/GrpcConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,29 @@ public GrpcConnection(HttpEndpoint endpoint, ProxyRequest request) {
super(endpoint, request);
}

protected RequestOptions prepareRequestOptions(
int port,
String host,
String uri,
final io.gravitee.gateway.api.http.HttpHeaders headers
) {
return new RequestOptions()
.setHost(host)
.setMethod(HttpMethod.POST)
.setPort(port)
.setURI(uri)
// Ensure required gRPC headers
.putHeader(HttpHeaderNames.CONTENT_TYPE, MediaType.APPLICATION_GRPC)
.putHeader(HttpHeaderNames.TE, GRPC_TRAILERS_TE)
.setTimeout(endpoint.getHttpClientOptions().getReadTimeout())
.setFollowRedirects(endpoint.getHttpClientOptions().isFollowRedirects());
}

@Override
protected Future<HttpClientRequest> prepareUpstreamRequest(HttpClient httpClient, int port, String host, String uri) {
protected Future<HttpClientRequest> prepareUpstreamRequest(HttpClient httpClient, RequestOptions requestOptions) {
// Prepare HTTP request
return httpClient
.request(
new RequestOptions()
.setHost(host)
.setMethod(HttpMethod.POST)
.setPort(port)
.setURI(uri)
// Ensure required gRPC headers
.putHeader(HttpHeaderNames.CONTENT_TYPE, MediaType.APPLICATION_GRPC)
.putHeader(HttpHeaderNames.TE, GRPC_TRAILERS_TE)
.setTimeout(endpoint.getHttpClientOptions().getReadTimeout())
.setFollowRedirects(endpoint.getHttpClientOptions().isFollowRedirects())
)
.request(requestOptions)
.map(httpClientRequest -> {
// Always set chunked mode for gRPC transport
return httpClientRequest.setChunked(true);
Expand Down
Loading

0 comments on commit 54672dd

Please sign in to comment.