Skip to content

Commit

Permalink
Rebase with OTel Metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
brunobat committed Jul 11, 2024
1 parent 6adfad2 commit 97f329e
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.instrumentation.api.internal.SemconvStability;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
Expand All @@ -35,9 +34,6 @@
@ApplicationScoped
public class InMemoryMetricExporter implements MetricExporter {

private static final List<String> LEGACY_KEY_COMPONENTS = List.of(SemanticAttributes.HTTP_METHOD.getKey(),
SemanticAttributes.HTTP_ROUTE.getKey(),
SemanticAttributes.HTTP_STATUS_CODE.getKey());
private static final List<String> KEY_COMPONENTS = List.of(SemanticAttributes.HTTP_REQUEST_METHOD.getKey(),
SemanticAttributes.HTTP_ROUTE.getKey(),
SemanticAttributes.HTTP_RESPONSE_STATUS_CODE.getKey());
Expand Down Expand Up @@ -77,13 +73,7 @@ public static Map<String, PointData> getMostRecentPointsMap(List<MetricData> fin
.collect(toMap(
pointData -> pointData.getAttributes().asMap().entrySet().stream()
//valid attributes for the resulting map key
.filter(entry -> {
if (SemconvStability.emitOldHttpSemconv()) {
return LEGACY_KEY_COMPONENTS.contains(entry.getKey().getKey());
} else {
return KEY_COMPONENTS.contains(entry.getKey().getKey());
}
})
.filter(entry -> KEY_COMPONENTS.contains(entry.getKey().getKey()))
// ensure order
.sorted(Comparator.comparing(o -> o.getKey().getKey()))
// build key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.zip.GZIPOutputStream;

import io.opentelemetry.exporter.internal.http.HttpSender;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import io.quarkus.vertx.core.runtime.BufferOutputStream;
Expand Down Expand Up @@ -91,7 +92,7 @@ private static String determineBasePath(URI baseUri) {
}

@Override
public void send(Consumer<OutputStream> marshaler,
public void send(Marshaler marshaler,
int contentLength,
Consumer<Response> onHttpResponseRead,
Consumer<Throwable> onError) {
Expand Down Expand Up @@ -178,7 +179,7 @@ private static class ClientRequestSuccessHandler implements Handler<HttpClientRe
private final int contentLength;
private final Consumer<Response> onHttpResponseRead;
private final Consumer<Throwable> onError;
private final Consumer<OutputStream> marshaler;
private final Marshaler marshaler;

private final int attemptNumber;
private final Supplier<Boolean> isShutdown;
Expand All @@ -190,7 +191,7 @@ public ClientRequestSuccessHandler(HttpClient client,
int contentLength,
Consumer<Response> onHttpResponseRead,
Consumer<Throwable> onError,
Consumer<OutputStream> marshaler,
Marshaler marshaler,
int attemptNumber,
Supplier<Boolean> isShutdown) {
this.client = client;
Expand Down Expand Up @@ -281,12 +282,16 @@ public byte[] responseBody() {
if (compressionEnabled) {
clientRequest.putHeader("Content-Encoding", "gzip");
try (var gzos = new GZIPOutputStream(os)) {
marshaler.accept(gzos);
marshaler.writeBinaryTo(gzos);
} catch (IOException e) {
throw new IllegalStateException(e);
}
} else {
marshaler.accept(os);
try {
marshaler.writeBinaryTo(os);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}

if (!headers.isEmpty()) {
Expand Down

0 comments on commit 97f329e

Please sign in to comment.