Skip to content

Commit

Permalink
Refactor filtering to improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
dstepanov committed Nov 28, 2023
1 parent db2e530 commit 631160b
Show file tree
Hide file tree
Showing 19 changed files with 660 additions and 455 deletions.
32 changes: 32 additions & 0 deletions core/src/main/java/io/micronaut/core/execution/ExecutionFlow.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,38 @@ static <T> ExecutionFlow<T> async(@NonNull Executor executor, @NonNull Supplier<
@Nullable
ImperativeExecutionFlow<T> tryComplete();

/**
* Alternative to {@link #tryComplete()} which will unwrap the flow's value.
*
* @return The imperative flow then returns its value, or {@code null} if this flow is not complete or does not
* support this operation
* @since 4.3
*/
@Nullable
default T tryCompleteValue() {
ImperativeExecutionFlow<T> imperativeFlow = tryComplete();
if (imperativeFlow != null) {
return imperativeFlow.getValue();
}
return null;
}

/**
* Alternative to {@link #tryComplete()} which will unwrap the flow's error.
*
* @return The imperative flow then returns its error, or {@code null} if this flow is not complete or does not
* support this operation
* @since 4.3
*/
@Nullable
default Throwable tryCompleteError() {
ImperativeExecutionFlow<T> imperativeFlow = tryComplete();
if (imperativeFlow != null) {
return imperativeFlow.getError();
}
return null;
}

/**
* Converts the existing flow into the completable future.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.execution.ExecutionFlow;
import io.micronaut.core.propagation.PropagatedContext;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.StringUtils;
import io.micronaut.http.HttpResponse;
Expand Down Expand Up @@ -371,10 +372,18 @@ protected <I, R extends io.micronaut.http.HttpResponse<?>> Publisher<R> applyFil
filterResolver.resolveFilters(request, clientFilterEntries);

FilterRunner.sortReverse(filters);
filters.add(GenericHttpFilter.terminalReactiveFilter(responsePublisher));

FilterRunner runner = new FilterRunner(filters);
return Mono.from(ReactiveExecutionFlow.fromFlow((ExecutionFlow<R>) runner.run(request)).toPublisher());
FilterRunner runner = new FilterRunner(filters, (filteredRequest, propagatedContext) -> {
try {
try (PropagatedContext.Scope ignore = propagatedContext.propagate()) {
return ReactiveExecutionFlow.fromPublisher((Publisher<HttpResponse<?>>) responsePublisher);
}
} catch (Throwable e) {
return ExecutionFlow.error(e);
}
});
return (Publisher<R>) Mono.from(ReactiveExecutionFlow.fromFlow(runner.run(request)).toPublisher())
.map(x -> x);
}

protected <O> Publisher<io.micronaut.http.HttpResponse<O>> responsePublisher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.micronaut.core.io.buffer.ByteBufferFactory;
import io.micronaut.core.io.buffer.ReferenceCounted;
import io.micronaut.core.order.Ordered;
import io.micronaut.core.propagation.PropagatedContext;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.ArgumentUtils;
import io.micronaut.core.util.ArrayUtils;
Expand Down Expand Up @@ -830,7 +831,7 @@ private <I, O, E> Flux<HttpResponse<O>> exchange(io.micronaut.http.HttpRequest<I
final io.micronaut.http.HttpRequest<Object> parentRequest = ServerRequestContext.currentRequest().orElse(null);
Publisher<URI> uriPublisher = resolveRequestURI(request);
return Flux.from(uriPublisher)
.switchMap(uri -> exchangeImpl(uri, parentRequest, toMutableRequest(request), bodyType, errorType, blockHint));
.switchMap(uri -> (Publisher) exchangeImpl(uri, parentRequest, toMutableRequest(request), bodyType, errorType, blockHint));
}

@Override
Expand Down Expand Up @@ -995,14 +996,14 @@ private <I> Flux<ByteBuffer<?>> dataStreamImpl(MutableHttpRequest<I> request, Ar
* Implementation of {@link #jsonStream}, {@link #dataStream}, {@link #exchangeStream}.
*/
@SuppressWarnings("MagicNumber")
private <I> Publisher<MutableHttpResponse<?>> buildStreamExchange(
private <I> Publisher<HttpResponse<?>> buildStreamExchange(
@Nullable io.micronaut.http.HttpRequest<?> parentRequest,
@NonNull MutableHttpRequest<I> request,
@NonNull URI requestURI,
@Nullable Argument<?> errorType) {

AtomicReference<MutableHttpRequest<?>> requestWrapper = new AtomicReference<>(request);
Flux<MutableHttpResponse<?>> streamResponsePublisher = connectAndStream(parentRequest, request, requestURI, requestWrapper, false, true);
Flux<HttpResponse<?>> streamResponsePublisher = connectAndStream(parentRequest, request, requestURI, requestWrapper, false, true);

streamResponsePublisher = readBodyOnError(errorType, streamResponsePublisher);

Expand Down Expand Up @@ -1031,18 +1032,18 @@ public Publisher<MutableHttpResponse<?>> proxy(@NonNull io.micronaut.http.HttpRe
}

AtomicReference<MutableHttpRequest<?>> requestWrapper = new AtomicReference<>(httpRequest);
Flux<MutableHttpResponse<?>> proxyResponsePublisher = connectAndStream(request, request, requestURI, requestWrapper, true, false);
Flux<HttpResponse<?>> proxyResponsePublisher = connectAndStream(request, request, requestURI, requestWrapper, true, false);
// apply filters
//noinspection unchecked
proxyResponsePublisher = Flux.from(
applyFilterToResponsePublisher(
request,
requestWrapper.get(),
requestURI,
(Publisher) proxyResponsePublisher
proxyResponsePublisher
)
);
return proxyResponsePublisher;
return proxyResponsePublisher.map(HttpResponse::toMutableResponse);
});
}

Expand All @@ -1052,7 +1053,7 @@ private void setupConversionService(io.micronaut.http.HttpRequest<?> httpRequest
}
}

private <I> Flux<MutableHttpResponse<?>> connectAndStream(
private <I> Flux<HttpResponse<?>> connectAndStream(
io.micronaut.http.HttpRequest<?> parentRequest,
io.micronaut.http.HttpRequest<I> request,
URI requestURI,
Expand Down Expand Up @@ -1087,11 +1088,11 @@ private <I> Flux<MutableHttpResponse<?>> connectAndStream(
/**
* Implementation of {@link #exchange(io.micronaut.http.HttpRequest, Argument, Argument)} (after URI resolution).
*/
private <I, O, E> Publisher<? extends io.micronaut.http.HttpResponse<O>> exchangeImpl(
private <I, E> Publisher<io.micronaut.http.HttpResponse<?>> exchangeImpl(
URI requestURI,
io.micronaut.http.HttpRequest<?> parentRequest,
MutableHttpRequest<I> request,
@NonNull Argument<O> bodyType,
@NonNull Argument<?> bodyType,
@NonNull Argument<E> errorType,
@Nullable BlockHint blockHint) {
AtomicReference<MutableHttpRequest<?>> requestWrapper = new AtomicReference<>(request);
Expand All @@ -1105,7 +1106,7 @@ private <I, O, E> Publisher<? extends io.micronaut.http.HttpResponse<O>> exchang

Mono<ConnectionManager.PoolHandle> handlePublisher = connectionManager.connect(requestKey, blockHint);

Flux<io.micronaut.http.HttpResponse<O>> responsePublisher = handlePublisher.flatMapMany(poolHandle -> {
Flux<io.micronaut.http.HttpResponse<?>> responsePublisher = handlePublisher.flatMapMany(poolHandle -> {
poolHandle.channel.pipeline()
.addLast(ChannelPipelineCustomizer.HANDLER_HTTP_AGGREGATOR, new HttpObjectAggregator(configuration.getMaxContentLength()) {
@Override
Expand Down Expand Up @@ -1134,13 +1135,13 @@ protected void finishAggregation(FullHttpMessage aggregated) throws Exception {
});
});

Publisher<io.micronaut.http.HttpResponse<O>> finalPublisher = applyFilterToResponsePublisher(
Publisher<io.micronaut.http.HttpResponse<?>> finalPublisher = applyFilterToResponsePublisher(
parentRequest,
request,
requestURI,
responsePublisher
);
Flux<io.micronaut.http.HttpResponse<O>> finalReactiveSequence = Flux.from(finalPublisher);
Flux<io.micronaut.http.HttpResponse<?>> finalReactiveSequence = Flux.from(finalPublisher);
// apply timeout to flowable too in case a filter applied another policy
Optional<Duration> readTimeout = configuration.getReadTimeout();
if (readTimeout.isPresent()) {
Expand Down Expand Up @@ -1219,11 +1220,11 @@ protected Object getLoadBalancerDiscriminator() {
return null;
}

private <I, R extends io.micronaut.http.HttpResponse<?>> Publisher<R> applyFilterToResponsePublisher(
private <I> Publisher<io.micronaut.http.HttpResponse<?>> applyFilterToResponsePublisher(
io.micronaut.http.HttpRequest<?> parentRequest,
io.micronaut.http.HttpRequest<I> request,
URI requestURI,
Publisher<R> responsePublisher) {
Publisher<io.micronaut.http.HttpResponse<?>> responsePublisher) {

if (!(request instanceof MutableHttpRequest<?> mutRequest)) {
return responsePublisher;
Expand All @@ -1245,10 +1246,17 @@ private <I, R extends io.micronaut.http.HttpResponse<?>> Publisher<R> applyFilte
}

FilterRunner.sortReverse(filters);
filters.add(GenericHttpFilter.terminalReactiveFilter(responsePublisher));

FilterRunner runner = new FilterRunner(filters);
Mono<R> responseMono = Mono.from(ReactiveExecutionFlow.fromFlow((ExecutionFlow<R>) runner.run(request)).toPublisher());
FilterRunner runner = new FilterRunner(filters, (filteredRequest, propagatedContext) -> {
try {
try (PropagatedContext.Scope ignore = propagatedContext.propagate()) {
return ReactiveExecutionFlow.fromPublisher(responsePublisher);
}
} catch (Throwable e) {
return ExecutionFlow.error(e);
}
});
Mono<io.micronaut.http.HttpResponse<?>> responseMono = Mono.from(ReactiveExecutionFlow.fromFlow(runner.run(request)).toPublisher());
if (parentRequest != null) {
responseMono = responseMono.contextWrite(c -> {
// existing entry takes precedence. The parentRequest is derived from a thread
Expand Down Expand Up @@ -1387,15 +1395,15 @@ private static FullHttpRequest withBytes(HttpRequest request, ByteBuf bytes) {
);
}

private Flux<MutableHttpResponse<?>> readBodyOnError(@Nullable Argument<?> errorType, @NonNull Flux<MutableHttpResponse<?>> publisher) {
private Flux<HttpResponse<?>> readBodyOnError(@Nullable Argument<?> errorType, @NonNull Flux<HttpResponse<?>> publisher) {
if (errorType != null && errorType != HttpClient.DEFAULT_ERROR_TYPE) {
return publisher.onErrorResume(clientException -> {
if (clientException instanceof HttpClientResponseException exception) {
final HttpResponse<?> response = exception.getResponse();
if (response instanceof NettyStreamedHttpResponse streamedResponse) {
if (response instanceof NettyStreamedHttpResponse<?> streamedResponse) {
return Mono.create(emitter -> {
final StreamedHttpResponse nettyResponse = streamedResponse.getNettyResponse();
nettyResponse.subscribe(new Subscriber<HttpContent>() {
nettyResponse.subscribe(new Subscriber<>() {
final CompositeByteBuf buffer = byteBufferFactory.getNativeAllocator().compositeBuffer();
Subscription s;
@Override
Expand Down Expand Up @@ -2142,7 +2150,7 @@ public FullHttpResponseHandler(

@Override
protected Function<URI, Publisher<? extends HttpResponse<O>>> makeRedirectHandler(io.micronaut.http.HttpRequest<?> parentRequest, MutableHttpRequest<Object> redirectRequest) {
return uri -> exchangeImpl(uri, parentRequest, redirectRequest, bodyType, errorType, null);
return uri -> (Publisher) exchangeImpl(uri, parentRequest, redirectRequest, bodyType, errorType, null);
}

@Override
Expand Down Expand Up @@ -2423,7 +2431,7 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

@Override
protected Function<URI, Publisher<? extends MutableHttpResponse<?>>> makeRedirectHandler(io.micronaut.http.HttpRequest<?> parentRequest, MutableHttpRequest<Object> redirectRequest) {
return uri -> buildStreamExchange(parentRequest, redirectRequest, uri, null);
return uri -> Mono.from(buildStreamExchange(parentRequest, redirectRequest, uri, null)).map(HttpResponse::toMutableResponse);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.execution.ExecutionFlow;
import io.micronaut.core.execution.ImperativeExecutionFlow;
import io.micronaut.http.HttpMethod;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MutableHttpResponse;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.server.RequestLifecycle;
import io.micronaut.http.server.netty.body.ByteBody;
Expand Down Expand Up @@ -50,50 +51,57 @@ final class NettyRequestLifecycle extends RequestLifecycle {

/**
* Should only be used where netty-specific stuff is needed, such as reading the body or
* writing the response. Otherwise, use {@link #request()} which can be updated by filters
* writing the response.
*/
private final NettyHttpRequest<?> nettyRequest;
private NettyHttpRequest<?> nettyRequest;

NettyRequestLifecycle(RoutingInBoundHandler rib, PipeliningServerHandler.OutboundAccess outboundAccess, NettyHttpRequest<?> request) {
super(rib.routeExecutor, request);
NettyRequestLifecycle(RoutingInBoundHandler rib, PipeliningServerHandler.OutboundAccess outboundAccess) {
super(rib.routeExecutor);
this.rib = rib;
this.outboundAccess = outboundAccess;
this.nettyRequest = request;

multipartEnabled(rib.multipartEnabled);
}

void handleNormal() {
void handleNormal(NettyHttpRequest<?> request) {
this.nettyRequest = request;

if (LOG.isDebugEnabled()) {
HttpMethod httpMethod = request().getMethod();
LOG.debug("Request {} {}", httpMethod, request().getUri());
HttpMethod httpMethod = request.getMethod();
LOG.debug("Request {} {}", httpMethod, request.getUri());
}

ExecutionFlow<MutableHttpResponse<?>> result;
ExecutionFlow<HttpResponse<?>> result;

try {
// handle decoding failure
DecoderResult decoderResult = nettyRequest.getNativeRequest().decoderResult();
DecoderResult decoderResult = request.getNativeRequest().decoderResult();
if (decoderResult.isFailure()) {
Throwable cause = decoderResult.cause();
HttpStatus status = cause instanceof TooLongFrameException ? HttpStatus.REQUEST_ENTITY_TOO_LARGE : HttpStatus.BAD_REQUEST;
result = onStatusError(
request,
HttpResponse.status(status),
status.getReason()
);
} else {
result = normalFlow();
result = normalFlow(request);
}
ImperativeExecutionFlow<HttpResponse<?>> imperativeFlow = result.tryComplete();
if (imperativeFlow != null) {
rib.writeResponse(outboundAccess, request, imperativeFlow.getValue(), imperativeFlow.getError());
} else {
result.onComplete((response, throwable) -> rib.writeResponse(outboundAccess, request, response, throwable));
}
result.onComplete((response, throwable) -> rib.writeResponse(outboundAccess, nettyRequest, response, throwable));
} catch (Exception e) {
handleException(e);
handleException(request, e);
}
}

@Nullable
@Override
protected FileCustomizableResponseType findFile() {
Optional<URL> optionalUrl = rib.staticResourceResolver.resolve(request().getUri().getPath());
protected FileCustomizableResponseType findFile(HttpRequest<?> request) {
Optional<URL> optionalUrl = rib.staticResourceResolver.resolve(request.getUri().getPath());
if (optionalUrl.isPresent()) {
try {
URL url = optionalUrl.get();
Expand All @@ -112,13 +120,13 @@ protected FileCustomizableResponseType findFile() {
}

@Override
protected ExecutionFlow<RouteMatch<?>> fulfillArguments(RouteMatch<?> routeMatch) {
protected ExecutionFlow<RouteMatch<?>> fulfillArguments(RouteMatch<?> routeMatch, HttpRequest<?> request) {
// handle decoding failure
DecoderResult decoderResult = nettyRequest.getNativeRequest().decoderResult();
if (decoderResult.isFailure()) {
return ExecutionFlow.error(decoderResult.cause());
}
return super.fulfillArguments(routeMatch).flatMap(this::waitForBody);
return super.fulfillArguments(routeMatch, request).flatMap(this::waitForBody);
}

/**
Expand All @@ -143,8 +151,8 @@ private ExecutionFlow<RouteMatch<?>> waitForBody(RouteMatch<?> routeMatch) {
return nettyRequest.getRouteWaitsFor().map(v -> routeMatch);
}

void handleException(Throwable cause) {
onError(cause).onComplete((response, throwable) -> rib.writeResponse(outboundAccess, nettyRequest, response, throwable));
void handleException(NettyHttpRequest<?> nettyRequest, Throwable cause) {
onError(nettyRequest, cause).onComplete((response, throwable) -> rib.writeResponse(outboundAccess, nettyRequest, response, throwable));
}

}
Loading

0 comments on commit 631160b

Please sign in to comment.