diff --git a/core-reactive/src/main/java/io/micronaut/core/async/subscriber/LazySendingSubscriber.java b/core-reactive/src/main/java/io/micronaut/core/async/subscriber/LazySendingSubscriber.java new file mode 100644 index 0000000000..7f56e6fac2 --- /dev/null +++ b/core-reactive/src/main/java/io/micronaut/core/async/subscriber/LazySendingSubscriber.java @@ -0,0 +1,175 @@ +/* + * Copyright 2017-2024 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.core.async.subscriber; + +import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.NonNull; +import io.micronaut.core.execution.DelayedExecutionFlow; +import io.micronaut.core.execution.ExecutionFlow; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import reactor.core.CorePublisher; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Operators; +import reactor.core.publisher.Signal; +import reactor.util.context.Context; + +/** + * This class waits for the first item of a publisher before completing an ExecutionFlow with a + * publisher containing the same items. + * + * @param The publisher item type + * @since 4.8.0 + * @author Jonas Konrad + */ +@Internal +public final class LazySendingSubscriber implements CoreSubscriber, CorePublisher, Subscription { + private final DelayedExecutionFlow> result = DelayedExecutionFlow.create(); + private boolean receivedFirst = false; + private volatile boolean sentFirst = false; + private boolean sendingFirst = false; + private T first; + private Subscription upstream; + private volatile CoreSubscriber downstream; + private Signal heldBackSignal; + + private LazySendingSubscriber() { + } + + /** + * Create an {@link ExecutionFlow} that waits for the first item of the given publisher. If + * there is an error before the first item, the flow will fail. If there is no error, the flow + * will complete with a publisher containing all items, including the first one. + * + * @param input The input stream + * @return A flow that will complete with the same stream + * @param The item type + */ + @NonNull + public static ExecutionFlow> create(@NonNull Publisher input) { + LazySendingSubscriber subscriber = new LazySendingSubscriber<>(); + input.subscribe(subscriber); + return subscriber.result; + } + + @Override + public Context currentContext() { + return downstream == null ? Context.empty() : downstream.currentContext(); + } + + @Override + public void onSubscribe(Subscription s) { + upstream = s; + s.request(1); + } + + @Override + public void onNext(T t) { + if (!receivedFirst) { + receivedFirst = true; + first = t; + result.complete(this); + } else { + downstream.onNext(t); + } + } + + @Override + public void onError(Throwable t) { + if (receivedFirst) { + Subscriber d; + synchronized (this) { + d = downstream; + if (d == null || !sentFirst) { + heldBackSignal = Signal.error(t); + return; + } + } + d.onError(t); + } else { + receivedFirst = true; + result.completeExceptionally(t); + } + } + + @Override + public void onComplete() { + if (!receivedFirst) { + onNext(null); + } + + Subscriber d; + synchronized (this) { + d = downstream; + if (d == null || !sentFirst) { + heldBackSignal = Signal.complete(); + return; + } + } + d.onComplete(); + } + + @Override + public void subscribe(CoreSubscriber subscriber) { + synchronized (this) { + downstream = subscriber; + } + subscriber.onSubscribe(this); + } + + @Override + public void subscribe(Subscriber s) { + subscribe(Operators.toCoreSubscriber(s)); + } + + @Override + public void request(long n) { + if (!sentFirst && !sendingFirst) { + sendingFirst = true; + if (first != null) { + downstream.onNext(first); // note: this can trigger reentrancy! + first = null; + } + Signal heldBackSignal; + synchronized (this) { + sentFirst = true; + heldBackSignal = this.heldBackSignal; + } + if (heldBackSignal != null) { + heldBackSignal.accept(downstream); + return; + } + n--; + if (n <= 0) { + return; + } + } + + upstream.request(n); + } + + @Override + public void cancel() { + if (!sentFirst) { + sentFirst = true; + T t = first; + first = null; + Operators.onNextDropped(t, currentContext()); + } + upstream.cancel(); + } +} diff --git a/core/src/main/java/io/micronaut/core/util/functional/ThrowingConsumer.java b/core/src/main/java/io/micronaut/core/util/functional/ThrowingConsumer.java new file mode 100644 index 0000000000..8dceb46b55 --- /dev/null +++ b/core/src/main/java/io/micronaut/core/util/functional/ThrowingConsumer.java @@ -0,0 +1,35 @@ +/* + * Copyright 2017-2024 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.core.util.functional; + +/** + * Consumer with a generic exception. + * + * @param the type accepted by this consumer + * @param the type of exception thrown from the supplier + * @author Jonas Konrad + * @since 4.8.0 + */ +@FunctionalInterface +public interface ThrowingConsumer { + /** + * Consume the value. + * + * @param t The value + * @throws E The generic exception + */ + void accept(T t) throws E; // parameter nullability is inherited from TYPE_USE on T +} diff --git a/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/ByteBodySubscriber.java b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/ByteBodySubscriber.java new file mode 100644 index 0000000000..8047aabfa6 --- /dev/null +++ b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/ByteBodySubscriber.java @@ -0,0 +1,135 @@ +/* + * Copyright 2017-2024 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.client.jdk; + +import io.micronaut.core.annotation.Internal; +import io.micronaut.http.body.CloseableByteBody; +import io.micronaut.http.body.ReactiveByteBufferByteBody; +import io.micronaut.http.body.stream.BodySizeLimits; +import io.micronaut.http.body.stream.BufferConsumer; + +import java.net.http.HttpResponse; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicLong; + +/** + * {@link HttpResponse.BodySubscriber} implementation that pushes data into a + * {@link ReactiveByteBufferByteBody.SharedBuffer}. + * + * @since 4.8.0 + * @author Jonas Konrad + */ +@Internal +final class ByteBodySubscriber implements HttpResponse.BodySubscriber, BufferConsumer.Upstream { + private final ReactiveByteBufferByteBody.SharedBuffer sharedBuffer; + private final CloseableByteBody root; + private final AtomicLong demand = new AtomicLong(0); + private Flow.Subscription subscription; + private boolean cancelled; + private volatile boolean disregardBackpressure; + + public ByteBodySubscriber(BodySizeLimits limits) { + sharedBuffer = new ReactiveByteBufferByteBody.SharedBuffer(limits, this); + root = new ReactiveByteBufferByteBody(sharedBuffer); + } + + @Override + public CompletionStage getBody() { + return CompletableFuture.completedFuture(root); + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + boolean initialDemand; + boolean cancelled; + synchronized (this) { + this.subscription = subscription; + cancelled = this.cancelled; + initialDemand = demand.get() > 0; + } + if (cancelled) { + subscription.cancel(); + } else if (initialDemand) { + subscription.request(disregardBackpressure ? Long.MAX_VALUE : 1); + } + } + + @Override + public void onNext(List item) { + for (ByteBuffer buffer : item) { + int n = buffer.remaining(); + demand.addAndGet(-n); + sharedBuffer.add(buffer); + } + if (demand.get() > 0) { + subscription.request(1); + } + } + + @Override + public void onError(Throwable throwable) { + sharedBuffer.error(throwable); + } + + @Override + public void onComplete() { + sharedBuffer.complete(); + } + + @Override + public void start() { + Flow.Subscription initialDemand; + synchronized (this) { + initialDemand = subscription; + demand.set(1); + } + if (initialDemand != null) { + initialDemand.request(1); + } + } + + @Override + public void onBytesConsumed(long bytesConsumed) { + long prev = demand.getAndAdd(bytesConsumed); + if (prev <= 0 && prev + bytesConsumed > 0) { + subscription.request(1); + } + } + + @Override + public void allowDiscard() { + Flow.Subscription subscription; + synchronized (this) { + cancelled = true; + subscription = this.subscription; + } + if (subscription != null) { + subscription.cancel(); + } + } + + @Override + public void disregardBackpressure() { + disregardBackpressure = true; + if (subscription != null) { + subscription.request(Long.MAX_VALUE); + } + } +} diff --git a/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/JdkRawHttpClient.java b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/JdkRawHttpClient.java index 05706b4749..9e3d47c909 100644 --- a/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/JdkRawHttpClient.java +++ b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/JdkRawHttpClient.java @@ -68,7 +68,7 @@ protected Publisher> responsePublisher(@NonNull HttpRequest< headerName -> httpRequest.headers().allValues(headerName)); } BodySizeLimits bodySizeLimits = new BodySizeLimits(Long.MAX_VALUE, configuration.getMaxContentLength()); - return client.sendAsync(httpRequest, responseInfo -> new ReactiveByteBufferByteBody.ByteBodySubscriber(bodySizeLimits)); + return client.sendAsync(httpRequest, responseInfo -> new ByteBodySubscriber(bodySizeLimits)); }) .flatMap(Mono::fromCompletionStage) .onErrorMap(IOException.class, e -> new HttpClientException("Error sending request: " + e.getMessage(), e)) diff --git a/http-client/src/main/java/io/micronaut/http/client/netty/DefaultHttpClient.java b/http-client/src/main/java/io/micronaut/http/client/netty/DefaultHttpClient.java index f2a77ed029..840e920aa9 100644 --- a/http-client/src/main/java/io/micronaut/http/client/netty/DefaultHttpClient.java +++ b/http-client/src/main/java/io/micronaut/http/client/netty/DefaultHttpClient.java @@ -50,6 +50,7 @@ import io.micronaut.http.bind.DefaultRequestBinderRegistry; import io.micronaut.http.bind.RequestBinderRegistry; import io.micronaut.http.body.ByteBody; +import io.micronaut.http.body.CharSequenceBodyWriter; import io.micronaut.http.body.ChunkedMessageBodyReader; import io.micronaut.http.body.CloseableAvailableByteBody; import io.micronaut.http.body.CloseableByteBody; @@ -97,7 +98,6 @@ import io.micronaut.http.netty.body.NettyBodyAdapter; import io.micronaut.http.netty.body.NettyByteBody; import io.micronaut.http.netty.body.NettyByteBufMessageBodyHandler; -import io.micronaut.http.netty.body.NettyCharSequenceBodyWriter; import io.micronaut.http.netty.body.NettyJsonHandler; import io.micronaut.http.netty.body.NettyJsonStreamHandler; import io.micronaut.http.netty.body.NettyWritableBodyWriter; @@ -1989,7 +1989,7 @@ private static MessageBodyHandlerRegistry createDefaultMessageBodyHandlerRegistr ); JsonMapper mapper = JsonMapper.createDefault(); registry.add(MediaType.APPLICATION_JSON_TYPE, new NettyJsonHandler<>(mapper)); - registry.add(MediaType.APPLICATION_JSON_TYPE, new NettyCharSequenceBodyWriter()); + registry.add(MediaType.APPLICATION_JSON_TYPE, new CharSequenceBodyWriter(StandardCharsets.UTF_8)); registry.add(MediaType.APPLICATION_JSON_STREAM_TYPE, new NettyJsonStreamHandler<>(mapper)); return registry; } diff --git a/http-netty/src/main/java/io/micronaut/http/netty/NettyHttpHeaders.java b/http-netty/src/main/java/io/micronaut/http/netty/NettyHttpHeaders.java index 55019ca132..ab12475323 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/NettyHttpHeaders.java +++ b/http-netty/src/main/java/io/micronaut/http/netty/NettyHttpHeaders.java @@ -302,7 +302,14 @@ public MutableHttpHeaders contentType(MediaType mediaType) { } contentType = Optional.ofNullable(mediaType); return this; + } + @Override + public MutableHttpHeaders contentTypeIfMissing(MediaType mediaType) { + if (nettyHeaders.contains(HttpHeaderNames.CONTENT_TYPE)) { + return this; + } + return contentType(mediaType); } @Override diff --git a/http-netty/src/main/java/io/micronaut/http/netty/body/NettyBodyAdapter.java b/http-netty/src/main/java/io/micronaut/http/netty/body/NettyBodyAdapter.java index f8c05103ff..c855242896 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/body/NettyBodyAdapter.java +++ b/http-netty/src/main/java/io/micronaut/http/netty/body/NettyBodyAdapter.java @@ -18,21 +18,16 @@ import io.micronaut.core.annotation.Internal; import io.micronaut.core.annotation.NonNull; import io.micronaut.core.annotation.Nullable; +import io.micronaut.http.body.AbstractBodyAdapter; import io.micronaut.http.body.AvailableByteBody; import io.micronaut.http.body.ByteBody; import io.micronaut.http.body.stream.BodySizeLimits; -import io.micronaut.http.body.stream.BufferConsumer; import io.micronaut.http.netty.EventLoopFlow; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.EventLoop; import io.netty.handler.codec.http.HttpHeaders; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.LongUnaryOperator; /** * Adapter from generic streaming {@link ByteBody} to {@link StreamingNettyByteBody}. @@ -41,21 +36,12 @@ * @since 4.6.0 */ @Internal -public final class NettyBodyAdapter implements BufferConsumer.Upstream, Subscriber { +public final class NettyBodyAdapter extends AbstractBodyAdapter { private final EventLoopFlow eventLoopFlow; - private final Publisher source; - @Nullable - private final Runnable onDiscard; - - private volatile boolean cancelled; - private volatile Subscription subscription; - private StreamingNettyByteBody.SharedBuffer sharedBuffer; - private final AtomicLong demand = new AtomicLong(1); private NettyBodyAdapter(EventLoop eventLoop, Publisher source, @Nullable Runnable onDiscard) { + super(source, onDiscard); this.eventLoopFlow = new EventLoopFlow(eventLoop); - this.source = source; - this.onDiscard = onDiscard; } /** @@ -92,55 +78,6 @@ public static StreamingNettyByteBody adapt(Publisher publisher, EventLo return new StreamingNettyByteBody(adapter.sharedBuffer); } - @Override - public void start() { - source.subscribe(this); - } - - @Override - public void onBytesConsumed(long bytesConsumed) { - if (bytesConsumed < 0) { - throw new IllegalArgumentException("Negative bytes consumed"); - } - - // clamping add - LongUnaryOperator add = l -> l + bytesConsumed < l ? Long.MAX_VALUE : l + bytesConsumed; - long oldDemand = this.demand.getAndUpdate(add); - long newDemand = add.applyAsLong(oldDemand); - if (oldDemand <= 0 && newDemand > 0) { - subscription.request(1); - } - } - - @Override - public void allowDiscard() { - cancelled = true; - if (subscription != null) { - subscription.cancel(); - } - if (onDiscard != null) { - onDiscard.run(); - } - } - - @Override - public void disregardBackpressure() { - this.demand.set(Long.MAX_VALUE); - if (subscription != null) { - subscription.request(Long.MAX_VALUE); - } - } - - @Override - public void onSubscribe(Subscription s) { - this.subscription = s; - if (cancelled) { - s.cancel(); - } else { - s.request(1); - } - } - @Override public void onNext(ByteBuf bytes) { if (eventLoopFlow.executeNow(() -> onNext0(bytes))) { @@ -158,15 +95,15 @@ private void onNext0(ByteBuf bytes) { @Override public void onError(Throwable t) { - if (eventLoopFlow.executeNow(() -> sharedBuffer.error(t))) { - sharedBuffer.error(t); + if (eventLoopFlow.executeNow(() -> super.onError(t))) { + super.onError(t); } } @Override public void onComplete() { - if (eventLoopFlow.executeNow(() -> sharedBuffer.complete())) { - sharedBuffer.complete(); + if (eventLoopFlow.executeNow(super::onComplete)) { + super.onComplete(); } } diff --git a/http-netty/src/main/java/io/micronaut/http/netty/body/NettyByteBodyFactory.java b/http-netty/src/main/java/io/micronaut/http/netty/body/NettyByteBodyFactory.java new file mode 100644 index 0000000000..d095cca15e --- /dev/null +++ b/http-netty/src/main/java/io/micronaut/http/netty/body/NettyByteBodyFactory.java @@ -0,0 +1,123 @@ +/* + * Copyright 2017-2024 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.netty.body; + +import io.micronaut.buffer.netty.NettyByteBufferFactory; +import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.NonNull; +import io.micronaut.core.io.buffer.ByteBuffer; +import io.micronaut.core.util.functional.ThrowingConsumer; +import io.micronaut.http.body.ByteBodyFactory; +import io.micronaut.http.body.CloseableAvailableByteBody; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +/** + * {@link ByteBodyFactory} implementation with netty-optimized bodies. + * + * @since 4.8.0 + * @author Jonas Konrad + */ +@Internal +public final class NettyByteBodyFactory extends ByteBodyFactory { + public NettyByteBodyFactory(@NonNull Channel channel) { + // note: atm we only use the alloc from the channel, but in the future we might also use + // the event loop for streaming bodies. Please design use sites to have a channel + // available, and don't create a constructor that just takes the alloc :) + super(new NettyByteBufferFactory(channel.alloc())); + } + + private ByteBufAllocator alloc() { + return (ByteBufAllocator) byteBufferFactory().getNativeAllocator(); + } + + @Override + public @NonNull CloseableAvailableByteBody adapt(@NonNull ByteBuffer buffer) { + if (buffer.asNativeBuffer() instanceof ByteBuf bb) { + return new AvailableNettyByteBody(bb); + } + return super.adapt(buffer); + } + + @Override + public @NonNull CloseableAvailableByteBody adapt(byte @NonNull [] array) { + return new AvailableNettyByteBody(Unpooled.wrappedBuffer(array)); + } + + @Override + public @NonNull CloseableAvailableByteBody buffer(@NonNull ThrowingConsumer writer) throws T { + ByteBuf buf = alloc().buffer(); + boolean release = true; + try { + ByteBufOutputStream s = new ByteBufOutputStream(buf); + writer.accept(s); + try { + s.close(); + } catch (IOException e) { + throw new IllegalStateException("Failed to close buffer stream", e); + } + release = false; + return new AvailableNettyByteBody(buf); + } finally { + if (release) { + buf.release(); + } + } + } + + @Override + public @NonNull CloseableAvailableByteBody createEmpty() { + return AvailableNettyByteBody.empty(); + } + + @Override + public @NonNull CloseableAvailableByteBody copyOf(@NonNull CharSequence cs, @NonNull Charset charset) { + ByteBuf byteBuf = charset == StandardCharsets.UTF_8 ? + ByteBufUtil.writeUtf8(alloc(), cs) : + ByteBufUtil.encodeString(alloc(), CharBuffer.wrap(cs), charset); + return new AvailableNettyByteBody(byteBuf); + } + + @Override + public @NonNull CloseableAvailableByteBody copyOf(@NonNull InputStream stream) throws IOException { + ByteBuf buffer = alloc().buffer(); + boolean free = true; + try { + while (true) { + if (buffer.writeBytes(stream, 4096) == -1) { + break; + } + } + free = false; + return new AvailableNettyByteBody(buffer); + } finally { + if (free) { + buffer.release(); + } + } + } +} diff --git a/http-netty/src/main/java/io/micronaut/http/netty/body/NettyCharSequenceBodyWriter.java b/http-netty/src/main/java/io/micronaut/http/netty/body/NettyCharSequenceBodyWriter.java deleted file mode 100644 index 667612991c..0000000000 --- a/http-netty/src/main/java/io/micronaut/http/netty/body/NettyCharSequenceBodyWriter.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2017-2024 original authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.micronaut.http.netty.body; - -import io.micronaut.context.annotation.Replaces; -import io.micronaut.core.annotation.Internal; -import io.micronaut.core.io.buffer.ByteBufferFactory; -import io.micronaut.core.type.Argument; -import io.micronaut.core.type.MutableHeaders; -import io.micronaut.http.ByteBodyHttpResponse; -import io.micronaut.http.ByteBodyHttpResponseWrapper; -import io.micronaut.http.HttpRequest; -import io.micronaut.http.MediaType; -import io.micronaut.http.MutableHttpHeaders; -import io.micronaut.http.MutableHttpResponse; -import io.micronaut.http.body.CharSequenceBodyWriter; -import io.micronaut.http.body.MessageBodyWriter; -import io.micronaut.http.body.ResponseBodyWriter; -import io.micronaut.http.codec.CodecException; -import io.micronaut.http.netty.NettyHttpHeaders; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.ByteBufUtil; -import io.netty.handler.codec.http.HttpHeaderNames; -import jakarta.inject.Singleton; - -import java.io.OutputStream; -import java.nio.CharBuffer; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; - -/** - * A JSON body should not be escaped or parsed as a JSON value. - * - * @author Denis Stepanov - * @since 4.6 - */ -@Singleton -@Replaces(CharSequenceBodyWriter.class) -@Internal -public final class NettyCharSequenceBodyWriter implements ResponseBodyWriter { - private final CharSequenceBodyWriter defaultHandler = new CharSequenceBodyWriter(StandardCharsets.UTF_8); - - @Override - public ByteBodyHttpResponse write(ByteBufferFactory bufferFactory, HttpRequest request, MutableHttpResponse outgoingResponse, Argument type, MediaType mediaType, CharSequence object) throws CodecException { - MutableHttpHeaders headers = outgoingResponse.getHeaders(); - Charset charset = MessageBodyWriter.getCharset(mediaType, headers); - ByteBuf byteBuf = charset == StandardCharsets.UTF_8 ? - ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, object) : - ByteBufUtil.encodeString(ByteBufAllocator.DEFAULT, CharBuffer.wrap(object), charset); - NettyHttpHeaders nettyHttpHeaders = (NettyHttpHeaders) headers; - if (!nettyHttpHeaders.contains(HttpHeaderNames.CONTENT_TYPE)) { - nettyHttpHeaders.set(HttpHeaderNames.CONTENT_TYPE, mediaType); - } - return ByteBodyHttpResponseWrapper.wrap(outgoingResponse, new AvailableNettyByteBody(byteBuf)); - } - - @Override - public void writeTo(Argument type, MediaType mediaType, CharSequence object, MutableHeaders outgoingHeaders, OutputStream outputStream) throws CodecException { - defaultHandler.writeTo(type, mediaType, object, outgoingHeaders, outputStream); - } - -} diff --git a/http-netty/src/main/java/io/micronaut/http/netty/body/NettyJsonHandler.java b/http-netty/src/main/java/io/micronaut/http/netty/body/NettyJsonHandler.java index b71c720fe3..eeac995408 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/body/NettyJsonHandler.java +++ b/http-netty/src/main/java/io/micronaut/http/netty/body/NettyJsonHandler.java @@ -27,29 +27,26 @@ import io.micronaut.core.type.Headers; import io.micronaut.core.type.MutableHeaders; import io.micronaut.http.ByteBodyHttpResponse; -import io.micronaut.http.ByteBodyHttpResponseWrapper; import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; import io.micronaut.http.MediaType; import io.micronaut.http.MutableHttpResponse; +import io.micronaut.http.body.ByteBodyFactory; import io.micronaut.http.body.ChunkedMessageBodyReader; +import io.micronaut.http.body.CloseableByteBody; import io.micronaut.http.body.MessageBodyHandler; import io.micronaut.http.body.MessageBodyWriter; import io.micronaut.http.body.ResponseBodyWriter; import io.micronaut.http.codec.CodecException; -import io.micronaut.http.netty.NettyHttpHeaders; import io.micronaut.json.JsonFeatures; import io.micronaut.json.JsonMapper; import io.micronaut.json.body.CustomizableJsonHandler; import io.micronaut.json.body.JsonMessageHandler; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.ByteBufOutputStream; -import io.netty.handler.codec.http.HttpHeaderNames; import jakarta.inject.Singleton; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; -import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -66,7 +63,6 @@ @JsonMessageHandler.ConsumesJson @BootstrapContextCompatible @Requires(beans = JsonMapper.class) - public final class NettyJsonHandler implements MessageBodyHandler, ChunkedMessageBodyReader, CustomizableJsonHandler, ResponseBodyWriter { private final JsonMessageHandler jsonMessageHandler; @@ -132,18 +128,13 @@ public ByteBuffer writeTo(Argument type, MediaType mediaType, T object, Mu } @Override - public ByteBodyHttpResponse write(ByteBufferFactory bufferFactory, @NonNull HttpRequest request, @NonNull MutableHttpResponse outgoingResponse, @NonNull Argument type, @NonNull MediaType mediaType, @NonNull T object) throws CodecException { - NettyHttpHeaders nettyHttpHeaders = (NettyHttpHeaders) outgoingResponse.getHeaders(); - nettyHttpHeaders.setIfMissing(HttpHeaderNames.CONTENT_TYPE, mediaType); - ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); - JsonMapper jsonMapper = jsonMessageHandler.getJsonMapper(); - try { - jsonMapper.writeValue(new ByteBufOutputStream(buffer), object); - } catch (IOException e) { - buffer.release(); - throw new CodecException("Error encoding object [" + object + "] to JSON: " + e.getMessage(), e); - } - return ByteBodyHttpResponseWrapper.wrap(outgoingResponse, new AvailableNettyByteBody(buffer)); + public ByteBodyHttpResponse write(@NonNull ByteBodyFactory bodyFactory, @NonNull HttpRequest request, @NonNull MutableHttpResponse outgoingResponse, @NonNull Argument type, @NonNull MediaType mediaType, @NonNull T object) throws CodecException { + return jsonMessageHandler.write(bodyFactory, request, outgoingResponse, type, mediaType, object); + } + + @Override + public CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory, @NonNull HttpRequest request, @NonNull HttpResponse response, @NonNull Argument type, @NonNull MediaType mediaType, T object) { + return jsonMessageHandler.writePiece(bodyFactory, request, response, type, mediaType, object); } @Override diff --git a/http-netty/src/main/java/io/micronaut/http/netty/body/NettyWritableBodyWriter.java b/http-netty/src/main/java/io/micronaut/http/netty/body/NettyWritableBodyWriter.java index 0a31a039fc..e06fcf49b3 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/body/NettyWritableBodyWriter.java +++ b/http-netty/src/main/java/io/micronaut/http/netty/body/NettyWritableBodyWriter.java @@ -18,19 +18,21 @@ import io.micronaut.context.annotation.BootstrapContextCompatible; import io.micronaut.context.annotation.Replaces; import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.NonNull; import io.micronaut.core.io.Writable; import io.micronaut.core.io.buffer.ByteBuffer; -import io.micronaut.core.io.buffer.ByteBufferFactory; import io.micronaut.core.type.Argument; import io.micronaut.core.type.Headers; import io.micronaut.core.type.MutableHeaders; import io.micronaut.http.ByteBodyHttpResponse; import io.micronaut.http.ByteBodyHttpResponseWrapper; import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; import io.micronaut.http.MediaType; -import io.micronaut.http.MutableHttpHeaders; import io.micronaut.http.MutableHttpResponse; +import io.micronaut.http.body.ByteBodyFactory; import io.micronaut.http.body.ChunkedMessageBodyReader; +import io.micronaut.http.body.CloseableByteBody; import io.micronaut.http.body.MessageBodyWriter; import io.micronaut.http.body.ResponseBodyWriter; import io.micronaut.http.body.TypedMessageBodyHandler; @@ -40,7 +42,6 @@ import io.micronaut.runtime.ApplicationConfiguration; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufOutputStream; -import io.netty.handler.codec.http.HttpHeaderNames; import jakarta.inject.Singleton; import org.reactivestreams.Publisher; @@ -74,19 +75,31 @@ public boolean isBlocking() { } @Override - public ByteBodyHttpResponse write(ByteBufferFactory bufferFactory, HttpRequest request, MutableHttpResponse outgoingResponse, Argument type, MediaType mediaType, Writable object) throws CodecException { - MutableHttpHeaders outgoingHeaders = outgoingResponse.getHeaders(); - if (mediaType != null && !outgoingHeaders.contains(HttpHeaderNames.CONTENT_TYPE)) { - outgoingHeaders.contentType(mediaType); - } + public ByteBodyHttpResponse write(@NonNull ByteBodyFactory bodyFactory, + HttpRequest request, + MutableHttpResponse outgoingResponse, + Argument type, + MediaType mediaType, + Writable object) throws CodecException { + outgoingResponse.getHeaders().contentTypeIfMissing(mediaType); + return ByteBodyHttpResponseWrapper.wrap(outgoingResponse, writePiece(bodyFactory, request, outgoingResponse, type, mediaType, object)); + } + + @Override + public CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory, + @NonNull HttpRequest request, + @NonNull HttpResponse response, + @NonNull Argument type, + @NonNull MediaType mediaType, + Writable object) { ByteBufOutputStream outputStream = new ByteBufOutputStream(ByteBufAllocator.DEFAULT.buffer()); try { - object.writeTo(outputStream, MessageBodyWriter.getCharset(mediaType, outgoingHeaders)); + object.writeTo(outputStream, MessageBodyWriter.getCharset(mediaType, response.getHeaders())); outputStream.close(); } catch (IOException e) { throw new MessageBodyException("Error writing body from writable", e); } - return ByteBodyHttpResponseWrapper.wrap(outgoingResponse, new AvailableNettyByteBody(outputStream.buffer())); + return new AvailableNettyByteBody(outputStream.buffer()); } @Override diff --git a/http-netty/src/test/groovy/io/micronaut/http/netty/body/DefaultHandlerSpec.groovy b/http-netty/src/test/groovy/io/micronaut/http/netty/body/DefaultHandlerSpec.groovy index 7c9536a6c6..5e4b5b28c5 100644 --- a/http-netty/src/test/groovy/io/micronaut/http/netty/body/DefaultHandlerSpec.groovy +++ b/http-netty/src/test/groovy/io/micronaut/http/netty/body/DefaultHandlerSpec.groovy @@ -9,6 +9,7 @@ import io.micronaut.core.type.MutableHeaders import io.micronaut.http.MediaType import io.micronaut.http.annotation.Consumes import io.micronaut.http.annotation.Produces +import io.micronaut.http.body.CharSequenceBodyWriter import io.micronaut.http.body.DefaultMessageBodyHandlerRegistry import io.micronaut.http.body.MessageBodyHandler import io.micronaut.http.body.StringBodyReader @@ -64,7 +65,7 @@ class DefaultHandlerSpec extends Specification { then: writer.isPresent() - writer.get() instanceof NettyCharSequenceBodyWriter + writer.get() instanceof CharSequenceBodyWriter when: def reader = bodyHandlerRegistry.findReader(Argument.STRING, List.of(mediaType)) diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/NettyResponseLifecycle.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/NettyResponseLifecycle.java new file mode 100644 index 0000000000..387c6f2a11 --- /dev/null +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/NettyResponseLifecycle.java @@ -0,0 +1,189 @@ +/* + * Copyright 2017-2024 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.server.netty; + +import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.NonNull; +import io.micronaut.core.async.subscriber.LazySendingSubscriber; +import io.micronaut.core.execution.ExecutionFlow; +import io.micronaut.http.ByteBodyHttpResponse; +import io.micronaut.http.ByteBodyHttpResponseWrapper; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.body.ByteBody; +import io.micronaut.http.body.CloseableByteBody; +import io.micronaut.http.body.ConcatenatingSubscriber; +import io.micronaut.http.body.stream.BodySizeLimits; +import io.micronaut.http.netty.EventLoopFlow; +import io.micronaut.http.netty.NettyHttpResponseBuilder; +import io.micronaut.http.netty.body.AvailableNettyByteBody; +import io.micronaut.http.netty.body.ByteBufConsumer; +import io.micronaut.http.netty.body.NettyBodyAdapter; +import io.micronaut.http.netty.body.NettyByteBody; +import io.micronaut.http.netty.body.NettyByteBodyFactory; +import io.micronaut.http.netty.body.StreamingNettyByteBody; +import io.micronaut.http.netty.stream.StreamedHttpResponse; +import io.micronaut.http.server.ResponseLifecycle; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.EventLoop; +import io.netty.handler.codec.http.HttpContent; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Executor; + +/** + * Netty-specific version of {@link ResponseLifecycle}. + * + * @since 4.8.0 + * @author Jonas Konrad + * @author Jonas Konrad + */ +@Internal +final class NettyResponseLifecycle extends ResponseLifecycle { + private final RoutingInBoundHandler routingInBoundHandler; + private final NettyHttpRequest request; + + public NettyResponseLifecycle(RoutingInBoundHandler routingInBoundHandler, NettyHttpRequest request) { + super(routingInBoundHandler.routeExecutor, + routingInBoundHandler.messageBodyHandlerRegistry, + routingInBoundHandler.conversionService, + new NettyByteBodyFactory(request.getChannelHandlerContext().channel())); + this.routingInBoundHandler = routingInBoundHandler; + this.request = request; + } + + @Override + protected Executor ioExecutor() { + return routingInBoundHandler.getIoExecutor(); + } + + @Override + protected ExecutionFlow> encodeNoBody(HttpResponse response) { + if (response instanceof NettyHttpResponseBuilder builder) { + io.netty.handler.codec.http.HttpResponse nettyResponse = builder.toHttpResponse(); + if (nettyResponse instanceof StreamedHttpResponse streamed) { + return LazySendingSubscriber.create(streamed).map(contents -> { + CloseableByteBody body = NettyBodyAdapter.adapt(Flux.from(contents).map(HttpContent::content), eventLoop()); + return ByteBodyHttpResponseWrapper.wrap(response, body); + }).onErrorResume(e -> (ExecutionFlow) handleStreamingError(request, e)); + } + } + + return super.encodeNoBody(response); + } + + private EventLoop eventLoop() { + return request.getChannelHandlerContext().channel().eventLoop(); + } + + @Override + protected @NonNull CloseableByteBody concatenate(Publisher items) { + return NettyConcatenatingSubscriber.concatenate(eventLoop(), items); + } + + @Override + protected @NonNull CloseableByteBody concatenateJson(Publisher items) { + return JsonNettyConcatenatingSubscriber.concatenateJson(eventLoop(), items); + } + + private static class NettyConcatenatingSubscriber extends ConcatenatingSubscriber implements ByteBufConsumer { + final StreamingNettyByteBody.SharedBuffer sharedBuffer; + private final EventLoop eventLoop; + private final EventLoopFlow flow; + + NettyConcatenatingSubscriber(EventLoop eventLoop) { + this.eventLoop = eventLoop; + this.flow = new EventLoopFlow(eventLoop); + sharedBuffer = new StreamingNettyByteBody.SharedBuffer(eventLoop, BodySizeLimits.UNLIMITED, this); + } + + static CloseableByteBody concatenate(EventLoop eventLoop, Publisher publisher) { + NettyConcatenatingSubscriber subscriber = new NettyConcatenatingSubscriber(eventLoop); + publisher.subscribe(subscriber); + return new StreamingNettyByteBody(subscriber.sharedBuffer); + } + + @Override + protected Upstream forward(ByteBody body) { + NettyByteBody adapted = NettyBodyAdapter.adapt(body, eventLoop); + if (adapted instanceof StreamingNettyByteBody streaming) { + return streaming.primary(this); + } else { + add(AvailableNettyByteBody.toByteBuf((AvailableNettyByteBody) adapted)); + complete(); + return null; + } + } + + @Override + public void add(@NonNull ByteBuf buffer) { + int n = buffer.readableBytes(); + onForward(n); + add0(buffer); + } + + void add0(@NonNull ByteBuf buffer) { + if (flow.executeNow(() -> sharedBuffer.add(buffer))) { + sharedBuffer.add(buffer); + } + } + + @Override + protected void forwardComplete() { + if (flow.executeNow(sharedBuffer::complete)) { + sharedBuffer.complete(); + } + } + + @Override + protected void forwardError(Throwable t) { + if (flow.executeNow(() -> sharedBuffer.error(t))) { + sharedBuffer.error(t); + } + } + } + + private static final class JsonNettyConcatenatingSubscriber extends NettyConcatenatingSubscriber { + private static final ByteBuf START_ARRAY = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("[", StandardCharsets.UTF_8)).asReadOnly(); + private static final ByteBuf END_ARRAY = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("]", StandardCharsets.UTF_8)).asReadOnly(); + private static final ByteBuf SEPARATOR = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(",", StandardCharsets.UTF_8)).asReadOnly(); + private static final ByteBuf EMPTY_ARRAY = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("[]", StandardCharsets.UTF_8)).asReadOnly(); + + JsonNettyConcatenatingSubscriber(EventLoop eventLoop) { + super(eventLoop); + } + + static CloseableByteBody concatenateJson(EventLoop eventLoop, Publisher publisher) { + JsonNettyConcatenatingSubscriber subscriber = new JsonNettyConcatenatingSubscriber(eventLoop); + publisher.subscribe(subscriber); + return new StreamingNettyByteBody(subscriber.sharedBuffer); + } + + @Override + protected long emitLeadingSeparator(boolean first) { + add0((first ? START_ARRAY : SEPARATOR).duplicate()); + return 1; + } + + @Override + protected long emitFinalSeparator(boolean first) { + add0((first ? EMPTY_ARRAY : END_ARRAY).duplicate()); + return first ? 2 : 1; + } + } +} diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/RoutingInBoundHandler.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/RoutingInBoundHandler.java index b1cff56328..c3631ab639 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/RoutingInBoundHandler.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/RoutingInBoundHandler.java @@ -15,83 +15,41 @@ */ package io.micronaut.http.server.netty; -import io.micronaut.buffer.netty.NettyByteBufferFactory; import io.micronaut.context.event.ApplicationEventPublisher; import io.micronaut.core.annotation.Internal; -import io.micronaut.core.annotation.NonNull; -import io.micronaut.core.annotation.Nullable; -import io.micronaut.core.async.publisher.DelayedSubscriber; -import io.micronaut.core.async.publisher.Publishers; import io.micronaut.core.convert.ConversionService; -import io.micronaut.core.execution.DelayedExecutionFlow; import io.micronaut.core.execution.ExecutionFlow; -import io.micronaut.core.io.buffer.ByteBuffer; -import io.micronaut.core.io.buffer.ByteBufferFactory; import io.micronaut.core.propagation.PropagatedContext; -import io.micronaut.core.type.Argument; -import io.micronaut.core.type.MutableHeaders; import io.micronaut.http.ByteBodyHttpResponse; import io.micronaut.http.ByteBodyHttpResponseWrapper; -import io.micronaut.http.HttpAttributes; import io.micronaut.http.HttpMethod; import io.micronaut.http.HttpRequest; import io.micronaut.http.HttpResponse; -import io.micronaut.http.MediaType; -import io.micronaut.http.MutableHttpResponse; import io.micronaut.http.body.CloseableByteBody; -import io.micronaut.http.body.MediaTypeProvider; import io.micronaut.http.body.MessageBodyHandlerRegistry; -import io.micronaut.http.body.MessageBodyWriter; -import io.micronaut.http.body.ResponseBodyWriter; -import io.micronaut.http.body.ResponseBodyWriterWrapper; -import io.micronaut.http.codec.CodecException; import io.micronaut.http.context.ServerHttpRequestContext; -import io.micronaut.http.context.ServerRequestContext; import io.micronaut.http.context.event.HttpRequestTerminatedEvent; -import io.micronaut.http.exceptions.HttpStatusException; -import io.micronaut.http.netty.EventLoopFlow; -import io.micronaut.http.netty.NettyHttpResponseBuilder; import io.micronaut.http.netty.NettyMutableHttpResponse; import io.micronaut.http.netty.body.AvailableNettyByteBody; -import io.micronaut.http.netty.body.NettyBodyAdapter; import io.micronaut.http.netty.channel.ChannelPipelineCustomizer; -import io.micronaut.http.netty.stream.JsonSubscriber; -import io.micronaut.http.netty.stream.StreamedHttpResponse; import io.micronaut.http.server.RouteExecutor; import io.micronaut.http.server.binding.RequestArgumentSatisfier; import io.micronaut.http.server.netty.configuration.NettyHttpServerConfiguration; import io.micronaut.http.server.netty.handler.OutboundAccess; import io.micronaut.http.server.netty.handler.RequestHandler; -import io.micronaut.web.router.DefaultUrlRouteInfo; -import io.micronaut.web.router.RouteInfo; import io.micronaut.web.router.resource.StaticResourceResolver; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufOutputStream; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.compression.DecompressionException; -import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.DefaultHttpResponse; -import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpContent; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpVersion; import io.netty.util.AttributeKey; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; import javax.net.ssl.SSLException; import java.io.IOException; import java.nio.channels.ClosedChannelException; -import java.util.Collections; -import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; @@ -244,38 +202,19 @@ public void writeResponse(OutboundAccess outboundAccess, response = routeExecutor.createDefaultErrorResponse(nettyHttpRequest, throwable); } if (response != null) { - ExecutionFlow> finalResponse; - try { - finalResponse = encodeHttpResponse( - nettyHttpRequest, - response, - response.body() - ); - } catch (Throwable e) { - try { - response = routeExecutor.createDefaultErrorResponse(nettyHttpRequest, e); - finalResponse = encodeHttpResponse( - nettyHttpRequest, - response, - response.body() - ); - } catch (Throwable f) { - f.addSuppressed(e); - finalResponse = ExecutionFlow.error(f); - try { - outboundAccess.closeAfterWrite(); - outboundAccess.write(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR), AvailableNettyByteBody.empty()); - } catch (Throwable g) { - f.addSuppressed(g); - } - LOG.warn("Failed to encode error response", f); - } - } + ExecutionFlow> finalResponse = + new NettyResponseLifecycle(this, nettyHttpRequest).encodeHttpResponseSafe(nettyHttpRequest, response); finalResponse.onComplete((r, t) -> { ByteBodyHttpResponse encodedResponse; if (t != null) { // fallback of the fallback... encodedResponse = ByteBodyHttpResponseWrapper.wrap(HttpResponse.serverError(), AvailableNettyByteBody.empty()); + try { + outboundAccess.closeAfterWrite(); + } catch (Throwable g) { + t.addSuppressed(g); + } + LOG.warn("Failed to encode error response", t); } else { encodedResponse = r; } @@ -324,184 +263,6 @@ ExecutorService getIoExecutor() { return executor; } - @SuppressWarnings("unchecked") - private ExecutionFlow> encodeHttpResponse( - NettyHttpRequest nettyRequest, - HttpResponse httpResponse, - Object body) { - MutableHttpResponse response = httpResponse.toMutableResponse(); - if (nettyRequest.getMethod() != HttpMethod.HEAD && body != null) { - Object routeInfoO = response.getAttribute(HttpAttributes.ROUTE_INFO).orElse(null); - // usually this is a UriRouteInfo, avoid scalability issues here - @SuppressWarnings("unchecked") final RouteInfo routeInfo = (RouteInfo) (routeInfoO instanceof DefaultUrlRouteInfo uri ? uri : (RouteInfo) routeInfoO); - - if (Publishers.isConvertibleToPublisher(body)) { - response.body(null); - return writeStreamedWithErrorHandling(nettyRequest, response, mapToHttpContent(nettyRequest, response, body, routeInfo, nettyRequest.getChannelHandlerContext())); - } - - // avoid checkcast for MessageBodyWriter interface here - Object o = response.getBodyWriter().orElse(null); - MessageBodyWriter messageBodyWriter = o instanceof ResponseBodyWriter rbw ? rbw : (MessageBodyWriter) o; - MediaType responseMediaType = response.getContentType().orElse(null); - Argument responseBodyType; - if (routeInfo != null) { - responseBodyType = (Argument) routeInfo.getResponseBodyType(); - } else { - responseBodyType = Argument.of((Class) body.getClass()); - } - if (responseMediaType == null) { - // perf: check for common body types - //noinspection ConditionCoveredByFurtherCondition - if (!(body instanceof String) && !(body instanceof byte[]) && body instanceof MediaTypeProvider mediaTypeProvider) { - responseMediaType = mediaTypeProvider.getMediaType(); - } else if (routeInfo != null) { - responseMediaType = routeExecutor.resolveDefaultResponseContentType(nettyRequest, routeInfo); - } else { - responseMediaType = MediaType.APPLICATION_JSON_TYPE; - } - } - - if (messageBodyWriter == null) { - // lookup write to use, any logic that hits this path should consider setting - // a body writer on the response before writing - messageBodyWriter = this.messageBodyHandlerRegistry - .findWriter(responseBodyType, Collections.singletonList(responseMediaType)) - .orElse(null); - } - if (messageBodyWriter == null || !responseBodyType.isInstance(body) || !messageBodyWriter.isWriteable(responseBodyType, responseMediaType)) { - responseBodyType = Argument.ofInstance(body); - messageBodyWriter = this.messageBodyHandlerRegistry.getWriter(responseBodyType, List.of(responseMediaType)); - } - return buildFinalResponse(nettyRequest, (MutableHttpResponse) response, responseBodyType, responseMediaType, body, messageBodyWriter, false); - } else { - response.body(null); - return writeFinalNettyResponse( - response, - nettyRequest - ); - } - } - - private ExecutionFlow> buildFinalResponse(NettyHttpRequest nettyRequest, - MutableHttpResponse response, - Argument responseBodyType, - MediaType mediaType, - T body, - MessageBodyWriter messageBodyWriter, - boolean onIoExecutor) { - if (!onIoExecutor && messageBodyWriter.isBlocking()) { - return ExecutionFlow.async(getIoExecutor(), () -> buildFinalResponse(nettyRequest, response, responseBodyType, mediaType, body, messageBodyWriter, true)); - } - - NettyByteBufferFactory bufferFactory = new NettyByteBufferFactory(nettyRequest.getChannelHandlerContext().alloc()); - try { - return ExecutionFlow.just(NettyResponseBodyWriterWrapper.wrap(messageBodyWriter) - .write(bufferFactory, nettyRequest, response, responseBodyType, mediaType, body)); - } catch (CodecException e) { - final MutableHttpResponse errorResponse = (MutableHttpResponse) routeExecutor.createDefaultErrorResponse(nettyRequest, e); - Object errorBody = errorResponse.body(); - Argument type = Argument.ofInstance(errorBody); - MediaType errorContentType = errorResponse.getContentType().orElse(MediaType.APPLICATION_JSON_TYPE); - MessageBodyWriter errorBodyWriter = messageBodyHandlerRegistry.getWriter(type, List.of(errorContentType)); - if (!onIoExecutor && errorBodyWriter.isBlocking()) { - return ExecutionFlow.async(getIoExecutor(), () -> ExecutionFlow.just(NettyResponseBodyWriterWrapper.wrap(errorBodyWriter) - .write(bufferFactory, nettyRequest, errorResponse, type, errorContentType, errorBody))); - } else { - return ExecutionFlow.just(NettyResponseBodyWriterWrapper.wrap(errorBodyWriter) - .write(bufferFactory, nettyRequest, errorResponse, type, errorContentType, errorBody)); - } - } - } - - private Flux mapToHttpContent(NettyHttpRequest request, - MutableHttpResponse response, - Object body, - RouteInfo routeInfo, - ChannelHandlerContext context) { - MediaType mediaType = response.getContentType().orElse(null); - NettyByteBufferFactory byteBufferFactory = new NettyByteBufferFactory(context.alloc()); - Flux bodyPublisher = Flux.from(Publishers.convertToPublisher(conversionService, body)); - Flux httpContentPublisher; - boolean isJson = false; - if (routeInfo != null) { - if (mediaType == null) { - mediaType = routeExecutor.resolveDefaultResponseContentType(request, routeInfo); - } - isJson = mediaType != null && - mediaType.getExtension().equals(MediaType.EXTENSION_JSON) && routeInfo.isResponseBodyJsonFormattable(); - MediaType finalMediaType = mediaType; - httpContentPublisher = bodyPublisher.concatMap(message -> { - MessageBodyWriter messageBodyWriter = routeInfo.getMessageBodyWriter(); - @SuppressWarnings("unchecked") - Argument responseBodyType = (Argument) routeInfo.getResponseBodyType(); - - if (messageBodyWriter == null || !responseBodyType.isInstance(message) || !messageBodyWriter.isWriteable(responseBodyType, finalMediaType)) { - responseBodyType = Argument.ofInstance(message); - messageBodyWriter = ResponseBodyWriter.wrap(messageBodyHandlerRegistry.getWriter(responseBodyType, List.of(finalMediaType))); - } - return writeAsync( - messageBodyWriter, - responseBodyType, - finalMediaType, - message, - response.getHeaders(), byteBufferFactory); - }).map(byteBuffer -> new DefaultHttpContent((ByteBuf) byteBuffer.asNativeBuffer())); - } else { - MediaType finalMediaType = mediaType; - httpContentPublisher = bodyPublisher - .concatMap(message -> { - Argument type = Argument.ofInstance(message); - MessageBodyWriter messageBodyWriter = messageBodyHandlerRegistry.getWriter(type, finalMediaType == null ? List.of() : List.of(finalMediaType)); - return writeAsync(messageBodyWriter, type, finalMediaType, message, response.getHeaders(), byteBufferFactory); - }) - .map(byteBuffer -> new DefaultHttpContent((ByteBuf) byteBuffer.asNativeBuffer())); - } - - if (isJson) { - // if the Publisher is returning JSON then in order for it to be valid JSON for each emitted element - // we must wrap the JSON in array and delimit the emitted items - - httpContentPublisher = JsonSubscriber.lift(httpContentPublisher); - } - - httpContentPublisher = httpContentPublisher - .contextWrite(reactorContext -> reactorContext.put(ServerRequestContext.KEY, request)); - - return httpContentPublisher; - } - - private Publisher> writeAsync( - @NonNull MessageBodyWriter messageBodyWriter, - @NonNull Argument type, - @NonNull MediaType mediaType, - T object, - @NonNull MutableHeaders outgoingHeaders, - @NonNull ByteBufferFactory bufferFactory - ) { - if (messageBodyWriter.isBlocking()) { - return Mono.>defer(() -> Mono.just(messageBodyWriter.writeTo(type, mediaType, object, outgoingHeaders, bufferFactory))) - .subscribeOn(Schedulers.fromExecutor(ioExecutor)); - } else { - return Mono.just(messageBodyWriter.writeTo(type, mediaType, object, outgoingHeaders, bufferFactory)); - } - } - - private ExecutionFlow> writeFinalNettyResponse(MutableHttpResponse message, NettyHttpRequest request) { - io.netty.handler.codec.http.HttpResponse nettyResponse = NettyHttpResponseBuilder.toHttpResponse(message); - if (nettyResponse instanceof StreamedHttpResponse streamed) { - return writeStreamedWithErrorHandling(request, message, streamed); - } else { - return ExecutionFlow.just(ByteBodyHttpResponseWrapper.wrap(message, new AvailableNettyByteBody(((FullHttpResponse) nettyResponse).content()))); - } - } - - private ExecutionFlow> writeStreamedWithErrorHandling(NettyHttpRequest request, HttpResponse response, Publisher streamed) { - LazySendingSubscriber sub = new LazySendingSubscriber(request, response); - streamed.subscribe(sub); - return sub.output; - } - private void closeConnectionIfError(HttpResponse message, HttpRequest request, OutboundAccess outboundAccess) { boolean decodeError = request instanceof NettyHttpRequest nettyRequest && nettyRequest.getNativeRequest().decoderResult().isFailure(); @@ -524,185 +285,4 @@ boolean isIgnorable(Throwable cause) { String message = cause.getMessage(); return cause instanceof IOException && message != null && IGNORABLE_ERROR_MESSAGE.matcher(message).matches(); } - - /** - * This processor waits for the first item before sending the response, and handles errors if they - * appear as the first item. - */ - private final class LazySendingSubscriber implements Subscriber, Publisher { - private static final Object COMPLETE = new Object(); - - boolean headersSent = false; - Subscription upstream; - final DelayedSubscriber downstream = new DelayedSubscriber<>(); - @Nullable - HttpContent first; - Object completion = null; // in case first hasn't been consumed we need to delay completion - - private final EventLoopFlow flow; - private final NettyHttpRequest request; - private final HttpResponse headers; - private final DelayedExecutionFlow> output = DelayedExecutionFlow.create(); - - private LazySendingSubscriber(NettyHttpRequest request, HttpResponse headers) { - this.request = request; - this.headers = headers; - this.flow = new EventLoopFlow(request.getChannelHandlerContext().channel().eventLoop()); - } - - @Override - public void subscribe(Subscriber s) { - downstream.onSubscribe(new Subscription() { - @Override - public void request(long n) { - HttpContent first = LazySendingSubscriber.this.first; - if (first != null) { - LazySendingSubscriber.this.first = null; - // onNext may trigger further request calls - s.onNext(first.content()); - if (completion != null) { - if (completion == COMPLETE) { - s.onComplete(); - } else { - s.onError((Throwable) completion); - } - return; - } - if (n != Long.MAX_VALUE) { - n--; - if (n == 0) { - return; - } - } - } - upstream.request(n); - } - - @Override - public void cancel() { - if (first != null) { - first.release(); - first = null; - } - upstream.cancel(); - } - }); - downstream.subscribe(s); - } - - @Override - public void onSubscribe(Subscription s) { - upstream = s; - s.request(1); - } - - @Override - public void onNext(HttpContent httpContent) { - if (flow.executeNow(() -> onNext0(httpContent))) { - onNext0(httpContent); - } - } - - private void onNext0(HttpContent httpContent) { - if (headersSent) { - downstream.onNext(httpContent.content()); - } else { - first = httpContent; - headersSent = true; - output.complete(ByteBodyHttpResponseWrapper.wrap(headers, NettyBodyAdapter.adapt(this, request.getChannelHandlerContext().channel().eventLoop()))); - } - } - - @Override - public void onError(Throwable t) { - if (flow.executeNow(() -> onError0(t))) { - onError0(t); - } - } - - private void onError0(Throwable t) { - if (headersSent) { - // nothing we can do - if (first != null) { - completion = t; - } else { - downstream.onError(t); - } - } else { - // limited error handling - MutableHttpResponse response; - if (t instanceof HttpStatusException hse) { - response = HttpResponse.status(hse.getStatus()); - if (hse.getBody().isPresent()) { - response.body(hse.getBody().get()); - } else if (hse.getMessage() != null) { - response.body(hse.getMessage()); - } - } else { - response = routeExecutor.createDefaultErrorResponse(request, t); - } - output.completeFrom(encodeHttpResponse( - request, - response, - response.body() - )); - } - } - - @Override - public void onComplete() { - if (flow.executeNow(this::onComplete0)) { - onComplete0(); - } - } - - private void onComplete0() { - if (headersSent) { - if (first != null) { - completion = COMPLETE; - } else { - downstream.onComplete(); - } - } else { - headersSent = true; - output.complete(ByteBodyHttpResponseWrapper.wrap(headers, AvailableNettyByteBody.empty())); - } - } - } - - /** - * Replacement for {@link ResponseBodyWriterWrapper} that uses a netty {@link ByteBuf} instead - * of a byte array as the backing store. - * - * @param Body type - */ - private static final class NettyResponseBodyWriterWrapper extends ResponseBodyWriterWrapper { - private NettyResponseBodyWriterWrapper(MessageBodyWriter wrapped) { - super(wrapped); - } - - static ResponseBodyWriter wrap(MessageBodyWriter mbw) { - if (mbw instanceof ResponseBodyWriter rbw) { - return rbw; - } else { - return new NettyResponseBodyWriterWrapper<>(mbw); - } - } - - @Override - public @NonNull ByteBodyHttpResponse write(@NonNull ByteBufferFactory bufferFactory, @NonNull HttpRequest request, @NonNull MutableHttpResponse httpResponse, @NonNull Argument type, @NonNull MediaType mediaType, T object) throws CodecException { - ByteBuf buf = ((NettyByteBufferFactory) bufferFactory).buffer().asNativeBuffer(); - ByteBufOutputStream bbos = new ByteBufOutputStream(buf); - boolean release = true; - try { - writeTo(type, mediaType, object, httpResponse.getHeaders(), bbos); - release = false; - return ByteBodyHttpResponseWrapper.wrap(httpResponse, new AvailableNettyByteBody(buf)); - } finally { - if (release) { - buf.release(); - } - } - } - } } diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/configuration/NettyHttpServerConfiguration.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/configuration/NettyHttpServerConfiguration.java index 0d5c9b7f60..33c6aeaaee 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/configuration/NettyHttpServerConfiguration.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/configuration/NettyHttpServerConfiguration.java @@ -1196,7 +1196,9 @@ public static class Parent extends EventLoopConfig { * @author James Kleeh * @author graemerocher * @since 3.1.0 + * @deprecated Replaced by {@link HttpServerConfiguration.FileTypeHandlerConfiguration} */ + @Deprecated(since = "4.8.0", forRemoval = true) @ConfigurationProperties("responses.file") public static class FileTypeHandlerConfiguration { @@ -1225,14 +1227,6 @@ public FileTypeHandlerConfiguration() { @Inject public FileTypeHandlerConfiguration(@Nullable @Property(name = "netty.responses.file.cache-seconds") Integer cacheSeconds, @Nullable @Property(name = "netty.responses.file.cache-control.public") Boolean isPublic) { - if (cacheSeconds != null) { - this.cacheSeconds = cacheSeconds; - LOG.warn("The configuration `netty.responses.file.cache-seconds` is deprecated and will be removed in a future release. Use `micronaut.server.netty.responses.file.cache-seconds` instead."); - } - if (isPublic != null) { - this.cacheControl.setPublic(isPublic); - LOG.warn("The configuration `netty.responses.file.cache-control.public` is deprecated and will be removed in a future release. Use `micronaut.server.netty.responses.file.cache-control.public` instead."); - } } /** @@ -1268,7 +1262,10 @@ public void setCacheControl(CacheControlConfiguration cacheControl) { /** * Configuration for the Cache-Control header. + * + * @deprecated Replaced by {@link HttpServerConfiguration.FileTypeHandlerConfiguration.CacheControlConfiguration} */ + @Deprecated(since = "4.8.0", forRemoval = true) @ConfigurationProperties("cache-control") public static class CacheControlConfiguration { diff --git a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/handler/Http2ServerHandlerSpec.groovy b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/handler/Http2ServerHandlerSpec.groovy index 9e8a1da8d6..4f2e86c4fc 100644 --- a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/handler/Http2ServerHandlerSpec.groovy +++ b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/handler/Http2ServerHandlerSpec.groovy @@ -1,6 +1,6 @@ package io.micronaut.http.server.netty.handler -import io.micronaut.buffer.netty.NettyByteBufferFactory + import io.micronaut.core.annotation.NonNull import io.micronaut.http.body.CloseableByteBody import io.micronaut.http.body.InternalByteBody @@ -8,6 +8,7 @@ import io.micronaut.http.body.stream.InputStreamByteBody import io.micronaut.http.netty.body.AvailableNettyByteBody import io.micronaut.http.netty.body.NettyBodyAdapter import io.micronaut.http.netty.body.NettyByteBody +import io.micronaut.http.netty.body.NettyByteBodyFactory import io.micronaut.http.server.netty.EmbeddedTestUtil import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator @@ -351,7 +352,7 @@ class Http2ServerHandlerSpec extends Specification { read++ return 1 } - }, OptionalLong.empty(), service, NettyByteBufferFactory.DEFAULT)) + }, OptionalLong.empty(), service, new NettyByteBodyFactory(ctx.channel()))) } @Override diff --git a/http-server/src/main/java/io/micronaut/http/server/HttpServerConfiguration.java b/http-server/src/main/java/io/micronaut/http/server/HttpServerConfiguration.java index f8458facc0..eeb3b15ac9 100644 --- a/http-server/src/main/java/io/micronaut/http/server/HttpServerConfiguration.java +++ b/http-server/src/main/java/io/micronaut/http/server/HttpServerConfiguration.java @@ -16,6 +16,7 @@ package io.micronaut.http.server; import io.micronaut.context.annotation.ConfigurationProperties; +import io.micronaut.context.annotation.Property; import io.micronaut.core.annotation.Internal; import io.micronaut.core.annotation.NonNull; import io.micronaut.core.annotation.Nullable; @@ -29,6 +30,8 @@ import io.micronaut.runtime.ApplicationConfiguration; import io.micronaut.scheduling.executor.ThreadSelection; import jakarta.inject.Inject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.nio.charset.Charset; @@ -133,6 +136,9 @@ public class HttpServerConfiguration implements ServerContextPathProvider { @SuppressWarnings("WeakerAccess") public static final int DEFAULT_MAX_PARAMS = 1024; + + private static final Logger LOG = LoggerFactory.getLogger(HttpServerConfiguration.class); + private Integer port; private String host; private Integer readTimeout; @@ -1092,4 +1098,116 @@ public void setHeader(boolean header) { this.header = header; } } + + /** + * Allows configuration of properties for the {@link io.micronaut.http.server.netty.body.AbstractFileBodyWriter}. + * + * @author James Kleeh + * @author graemerocher + * @since 3.1.0 + */ + @ConfigurationProperties("responses.file") + public static class FileTypeHandlerConfiguration { + + /** + * The default cache seconds. + */ + @SuppressWarnings("WeakerAccess") + public static final int DEFAULT_CACHESECONDS = 60; + + private int cacheSeconds = DEFAULT_CACHESECONDS; + private CacheControlConfiguration cacheControl = new CacheControlConfiguration(); + + /** + * Default constructor. + */ + public FileTypeHandlerConfiguration() { + } + + @Inject + FileTypeHandlerConfiguration( + @Nullable @Property(name = "netty.responses.file.cache-seconds") Integer cacheSecondsOld, + @Nullable @Property(name = "netty.responses.file.cache-control.public") Boolean isPublicOld, + @Nullable @Property(name = "micronaut.server.netty.responses.file.cache-seconds") Integer cacheSeconds, + @Nullable @Property(name = "micronaut.server.netty.responses.file.cache-control.public") Boolean isPublic + + ) { + + if (cacheSecondsOld != null) { + this.cacheSeconds = cacheSecondsOld; + LOG.warn("The configuration `netty.responses.file.cache-seconds` is deprecated and will be removed in a future release. Use `micronaut.server.responses.file.cache-seconds` instead."); + } + if (isPublicOld != null) { + this.cacheControl.setPublic(isPublicOld); + LOG.warn("The configuration `netty.responses.file.cache-control.public` is deprecated and will be removed in a future release. Use `micronaut.server.responses.file.cache-control.public` instead."); + } + if (cacheSeconds != null) { + this.cacheSeconds = cacheSeconds; + LOG.warn("The configuration `micronaut.server.netty.responses.file.cache-seconds` is deprecated and will be removed in a future release. Use `micronaut.server.responses.file.cache-seconds` instead."); + } + if (isPublic != null) { + this.cacheControl.setPublic(isPublic); + LOG.warn("The configuration `micronaut.server.netty.responses.file.cache-control.public` is deprecated and will be removed in a future release. Use `micronaut.server.responses.file.cache-control.public` instead."); + } + } + + /** + * @return the cache seconds + */ + public int getCacheSeconds() { + return cacheSeconds; + } + + /** + * Cache Seconds. Default value ({@value #DEFAULT_CACHESECONDS}). + * @param cacheSeconds cache seconds + */ + public void setCacheSeconds(int cacheSeconds) { + this.cacheSeconds = cacheSeconds; + } + + /** + * @return The cache control configuration + */ + public CacheControlConfiguration getCacheControl() { + return cacheControl; + } + + /** + * Sets the cache control configuration. + * + * @param cacheControl The cache control configuration + */ + public void setCacheControl(CacheControlConfiguration cacheControl) { + this.cacheControl = cacheControl; + } + + /** + * Configuration for the Cache-Control header. + */ + @ConfigurationProperties("cache-control") + public static class CacheControlConfiguration { + + private static final boolean DEFAULT_PUBLIC_CACHE = false; + + private boolean publicCache = DEFAULT_PUBLIC_CACHE; + + /** + * Sets whether the cache control is public. Default value ({@value #DEFAULT_PUBLIC_CACHE}) + * + * @param publicCache Public cache value + */ + public void setPublic(boolean publicCache) { + this.publicCache = publicCache; + } + + /** + * @return True if the cache control should be public + */ + @NonNull + public boolean getPublic() { + return publicCache; + } + } + } } diff --git a/http-server/src/main/java/io/micronaut/http/server/ResponseLifecycle.java b/http-server/src/main/java/io/micronaut/http/server/ResponseLifecycle.java new file mode 100644 index 0000000000..38c52407ff --- /dev/null +++ b/http-server/src/main/java/io/micronaut/http/server/ResponseLifecycle.java @@ -0,0 +1,348 @@ +/* + * Copyright 2017-2024 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.server; + +import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.NonNull; +import io.micronaut.core.async.publisher.Publishers; +import io.micronaut.core.async.subscriber.LazySendingSubscriber; +import io.micronaut.core.convert.ConversionService; +import io.micronaut.core.execution.ExecutionFlow; +import io.micronaut.core.type.Argument; +import io.micronaut.http.ByteBodyHttpResponse; +import io.micronaut.http.ByteBodyHttpResponseWrapper; +import io.micronaut.http.HttpAttributes; +import io.micronaut.http.HttpMethod; +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.HttpResponseWrapper; +import io.micronaut.http.MediaType; +import io.micronaut.http.MutableHttpResponse; +import io.micronaut.http.body.ByteBody; +import io.micronaut.http.body.ByteBodyFactory; +import io.micronaut.http.body.CloseableByteBody; +import io.micronaut.http.body.ConcatenatingSubscriber; +import io.micronaut.http.body.MediaTypeProvider; +import io.micronaut.http.body.MessageBodyHandlerRegistry; +import io.micronaut.http.body.MessageBodyWriter; +import io.micronaut.http.body.ResponseBodyWriter; +import io.micronaut.http.codec.CodecException; +import io.micronaut.http.exceptions.HttpStatusException; +import io.micronaut.http.reactive.execution.ReactiveExecutionFlow; +import io.micronaut.web.router.DefaultUrlRouteInfo; +import io.micronaut.web.router.RouteInfo; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Executor; + +/** + * This class handles encoding of the HTTP response in a server-agnostic way. Note that while this + * class is internal, it is used from servlet and must not be broken. + * + * @since 4.8.0 + * @author Jonas Konrad + */ +@Internal +public abstract class ResponseLifecycle { + private final RouteExecutor routeExecutor; + private final MessageBodyHandlerRegistry messageBodyHandlerRegistry; + private final ConversionService conversionService; + private final ByteBodyFactory byteBodyFactory; + + public ResponseLifecycle(RouteExecutor routeExecutor, + MessageBodyHandlerRegistry messageBodyHandlerRegistry, + ConversionService conversionService, + ByteBodyFactory byteBodyFactory) { + this.routeExecutor = routeExecutor; + this.messageBodyHandlerRegistry = messageBodyHandlerRegistry; + this.conversionService = conversionService; + this.byteBodyFactory = byteBodyFactory; + } + + /** + * The IO executor for blocking writers. + * + * @return The blocking executor + */ + @NonNull + protected abstract Executor ioExecutor(); + + /** + * Transform the given writer into a {@link ResponseBodyWriter}. + * + * @param messageBodyWriter The writer + * @return The response writer + * @param The writer type + */ + @NonNull + protected ResponseBodyWriter wrap(@NonNull MessageBodyWriter messageBodyWriter) { + return ResponseBodyWriter.wrap(messageBodyWriter); + } + + /** + * Encode the response. + * + * @param httpRequest The request that triggered this response + * @param response The unencoded response + * @return The encoded response + */ + @NonNull + public final ExecutionFlow> encodeHttpResponseSafe(@NonNull HttpRequest httpRequest, @NonNull HttpResponse response) { + try { + return encodeHttpResponse( + httpRequest, + response, + response.body() + ); + } catch (Throwable e) { + try { + response = routeExecutor.createDefaultErrorResponse(httpRequest, e); + return encodeHttpResponse( + httpRequest, + response, + response.body() + ); + } catch (Throwable f) { + f.addSuppressed(e); + return ExecutionFlow.error(f); + } + } + } + + @SuppressWarnings("unchecked") + private ExecutionFlow> encodeHttpResponse( + HttpRequest nettyRequest, + HttpResponse httpResponse, + Object body) { + MutableHttpResponse response = httpResponse.toMutableResponse(); + if (nettyRequest.getMethod() != HttpMethod.HEAD && body != null) { + Object routeInfoO = response.getAttribute(HttpAttributes.ROUTE_INFO).orElse(null); + // usually this is a UriRouteInfo, avoid scalability issues here + @SuppressWarnings("unchecked") final RouteInfo routeInfo = (RouteInfo) (routeInfoO instanceof DefaultUrlRouteInfo uri ? uri : (RouteInfo) routeInfoO); + + if (Publishers.isConvertibleToPublisher(body)) { + response.body(null); + return mapToHttpContent(nettyRequest, response, body, routeInfo); + } + + // avoid checkcast for MessageBodyWriter interface here + Object o = response.getBodyWriter().orElse(null); + MessageBodyWriter messageBodyWriter = o instanceof ResponseBodyWriter rbw ? rbw : (MessageBodyWriter) o; + MediaType responseMediaType = response.getContentType().orElse(null); + Argument responseBodyType; + if (routeInfo != null) { + responseBodyType = (Argument) routeInfo.getResponseBodyType(); + } else { + responseBodyType = Argument.of((Class) body.getClass()); + } + if (responseMediaType == null) { + // perf: check for common body types + //noinspection ConditionCoveredByFurtherCondition + if (!(body instanceof String) && !(body instanceof byte[]) && body instanceof MediaTypeProvider mediaTypeProvider) { + responseMediaType = mediaTypeProvider.getMediaType(); + } else if (routeInfo != null) { + responseMediaType = routeExecutor.resolveDefaultResponseContentType(nettyRequest, routeInfo); + } else { + responseMediaType = MediaType.APPLICATION_JSON_TYPE; + } + } + + if (messageBodyWriter == null) { + // lookup write to use, any logic that hits this path should consider setting + // a body writer on the response before writing + messageBodyWriter = messageBodyHandlerRegistry + .findWriter(responseBodyType, Collections.singletonList(responseMediaType)) + .orElse(null); + } + if (messageBodyWriter == null || !responseBodyType.isInstance(body) || !messageBodyWriter.isWriteable(responseBodyType, responseMediaType)) { + responseBodyType = Argument.ofInstance(body); + messageBodyWriter = messageBodyHandlerRegistry.getWriter(responseBodyType, List.of(responseMediaType)); + } + return buildFinalResponse(nettyRequest, (MutableHttpResponse) response, responseBodyType, responseMediaType, body, messageBodyWriter, false); + } else { + response.body(null); + + return encodeNoBody(response); + } + } + + /** + * Encode the given response without body, either because it has none or because this is a HEAD + * response. + * + * @param response The response + * @return The encoded response + */ + protected ExecutionFlow> encodeNoBody(HttpResponse response) { + if (response instanceof HttpResponseWrapper wrapper) { + return encodeNoBody(wrapper.getDelegate()); + } + + return ExecutionFlow.just(ByteBodyHttpResponseWrapper.wrap(response, byteBodyFactory.createEmpty())); + } + + private ExecutionFlow> mapToHttpContent(HttpRequest request, + MutableHttpResponse response, + Object body, + RouteInfo routeInfo) { + MediaType mediaType = response.getContentType().orElse(null); + Flux bodyPublisher = Flux.from(Publishers.convertToPublisher(conversionService, body)); + Flux httpContentPublisher; + boolean isJson; + if (routeInfo != null) { + if (mediaType == null) { + mediaType = routeExecutor.resolveDefaultResponseContentType(request, routeInfo); + } + isJson = mediaType != null && + mediaType.getExtension().equals(MediaType.EXTENSION_JSON) && routeInfo.isResponseBodyJsonFormattable(); + MediaType finalMediaType = mediaType; + httpContentPublisher = bodyPublisher.concatMap(message -> { + MessageBodyWriter messageBodyWriter = routeInfo.getMessageBodyWriter(); + @SuppressWarnings("unchecked") + Argument responseBodyType = (Argument) routeInfo.getResponseBodyType(); + + if (messageBodyWriter == null || !responseBodyType.isInstance(message) || !messageBodyWriter.isWriteable(responseBodyType, finalMediaType)) { + responseBodyType = Argument.ofInstance(message); + messageBodyWriter = wrap(messageBodyHandlerRegistry.getWriter(responseBodyType, List.of(finalMediaType))); + } + ExecutionFlow flow = writePieceAsync( + messageBodyWriter, + request, + response, + responseBodyType, + finalMediaType, + message); + return ReactiveExecutionFlow.toPublisher(() -> flow); + }); + } else { + isJson = false; + MediaType finalMediaType = mediaType; + httpContentPublisher = bodyPublisher + .concatMap(message -> { + Argument type = Argument.ofInstance(message); + MessageBodyWriter messageBodyWriter = messageBodyHandlerRegistry.getWriter(type, finalMediaType == null ? List.of() : List.of(finalMediaType)); + ExecutionFlow flow = writePieceAsync(messageBodyWriter, request, response, type, finalMediaType, message); + return ReactiveExecutionFlow.toPublisher(() -> flow); + }); + } + + httpContentPublisher = httpContentPublisher.doOnDiscard(CloseableByteBody.class, CloseableByteBody::close); + + return LazySendingSubscriber.create(httpContentPublisher).map(items -> { + CloseableByteBody byteBody = isJson ? concatenateJson(items) : concatenate(items); + return ByteBodyHttpResponseWrapper.wrap(response, byteBody); + }).onErrorResume(t -> (ExecutionFlow) handleStreamingError(request, t)); + } + + /** + * @see ConcatenatingSubscriber.ByteBufferConcatenatingSubscriber#concatenate + * @param items The items + * @return The concatenated body + */ + protected @NonNull CloseableByteBody concatenate(@NonNull Publisher items) { + return ConcatenatingSubscriber.ByteBufferConcatenatingSubscriber.concatenate(items); + } + + /** + * @see ConcatenatingSubscriber.JsonByteBufferConcatenatingSubscriber#concatenateJson + * @param items The items + * @return The concatenated body + */ + protected @NonNull CloseableByteBody concatenateJson(@NonNull Publisher items) { + return ConcatenatingSubscriber.JsonByteBufferConcatenatingSubscriber.concatenateJson(items); + } + + /** + * Handle an error that happened before the first item of a streaming response. + * + * @param request The request + * @param t The error + * @return The encoded error response + */ + @NonNull + protected final ExecutionFlow> handleStreamingError(@NonNull HttpRequest request, @NonNull Throwable t) { + // limited error handling + MutableHttpResponse errorResponse; + if (t instanceof HttpStatusException hse) { + errorResponse = HttpResponse.status(hse.getStatus()); + if (hse.getBody().isPresent()) { + errorResponse.body(hse.getBody().get()); + } else if (hse.getMessage() != null) { + errorResponse.body(hse.getMessage()); + } + } else { + errorResponse = routeExecutor.createDefaultErrorResponse(request, t); + } + return encodeHttpResponse( + request, + errorResponse, + errorResponse.body() + ); + } + + private ExecutionFlow writePieceAsync( + @NonNull MessageBodyWriter messageBodyWriter, + @NonNull HttpRequest request, + @NonNull HttpResponse response, + @NonNull Argument type, + @NonNull MediaType mediaType, + T object + ) { + if (messageBodyWriter.isBlocking()) { + return ExecutionFlow.async(ioExecutor(), () -> ExecutionFlow.just(writePieceSync(messageBodyWriter, request, response, type, mediaType, object))); + } else { + return ExecutionFlow.just(writePieceSync(messageBodyWriter, request, response, type, mediaType, object)); + } + } + + private CloseableByteBody writePieceSync(@NonNull MessageBodyWriter messageBodyWriter, @NonNull HttpRequest request, @NonNull HttpResponse response, @NonNull Argument type, @NonNull MediaType mediaType, T object) { + return wrap(messageBodyWriter).writePiece(byteBodyFactory, request, response, type, mediaType, object); + } + + private ExecutionFlow> buildFinalResponse(HttpRequest nettyRequest, + MutableHttpResponse response, + Argument responseBodyType, + MediaType mediaType, + T body, + MessageBodyWriter messageBodyWriter, + boolean onIoExecutor) { + if (!onIoExecutor && messageBodyWriter.isBlocking()) { + return ExecutionFlow.async(ioExecutor(), () -> buildFinalResponse(nettyRequest, response, responseBodyType, mediaType, body, messageBodyWriter, true)); + } + + try { + return ExecutionFlow.just(wrap(messageBodyWriter) + .write(byteBodyFactory, nettyRequest, response, responseBodyType, mediaType, body)); + } catch (CodecException e) { + final MutableHttpResponse errorResponse = (MutableHttpResponse) routeExecutor.createDefaultErrorResponse(nettyRequest, e); + Object errorBody = errorResponse.body(); + Argument type = Argument.ofInstance(errorBody); + MediaType errorContentType = errorResponse.getContentType().orElse(MediaType.APPLICATION_JSON_TYPE); + MessageBodyWriter errorBodyWriter = messageBodyHandlerRegistry.getWriter(type, List.of(errorContentType)); + if (!onIoExecutor && errorBodyWriter.isBlocking()) { + return ExecutionFlow.async(ioExecutor(), () -> ExecutionFlow.just(wrap(errorBodyWriter) + .write(byteBodyFactory, nettyRequest, errorResponse, type, errorContentType, errorBody))); + } else { + return ExecutionFlow.just(wrap(errorBodyWriter) + .write(byteBodyFactory, nettyRequest, errorResponse, type, errorContentType, errorBody)); + } + } + } + +} diff --git a/http-server/src/main/java/io/micronaut/http/server/RouteExecutor.java b/http-server/src/main/java/io/micronaut/http/server/RouteExecutor.java index 1b46c3cba6..eee284e50a 100644 --- a/http-server/src/main/java/io/micronaut/http/server/RouteExecutor.java +++ b/http-server/src/main/java/io/micronaut/http/server/RouteExecutor.java @@ -768,7 +768,6 @@ private Mono> processPublisherBody(PropagatedContext prop ).contextWrite(cv -> ReactorPropagation.addPropagatedContext(cv, propagatedContext).put(ServerRequestContext.KEY, request)); return Mono.>just(response - .header(HttpHeaders.TRANSFER_ENCODING, "chunked") .header(HttpHeaders.CONTENT_TYPE, mediaType) .body(ReactivePropagation.propagate(propagatedContext, bodyPublisher))) .contextWrite(context -> ReactorPropagation.addPropagatedContext(context, propagatedContext).put(ServerRequestContext.KEY, request)); diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/AbstractFileBodyWriter.java b/http-server/src/main/java/io/micronaut/http/server/body/AbstractFileBodyWriter.java similarity index 74% rename from http-server-netty/src/main/java/io/micronaut/http/server/netty/body/AbstractFileBodyWriter.java rename to http-server/src/main/java/io/micronaut/http/server/body/AbstractFileBodyWriter.java index 23de9f08de..25d4c78a7d 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/AbstractFileBodyWriter.java +++ b/http-server/src/main/java/io/micronaut/http/server/body/AbstractFileBodyWriter.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.micronaut.http.server.netty.body; +package io.micronaut.http.server.body; import io.micronaut.core.annotation.Experimental; import io.micronaut.core.annotation.Internal; @@ -24,15 +24,16 @@ import io.micronaut.http.HttpResponse; import io.micronaut.http.MutableHttpHeaders; import io.micronaut.http.MutableHttpResponse; -import io.micronaut.http.netty.body.AvailableNettyByteBody; -import io.micronaut.http.server.netty.configuration.NettyHttpServerConfiguration; +import io.micronaut.http.body.ByteBodyFactory; +import io.micronaut.http.server.HttpServerConfiguration; import io.micronaut.http.server.types.files.FileCustomizableResponseType; -import io.netty.handler.codec.http.HttpHeaderNames; import java.time.LocalDateTime; import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; -import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; /** * Abstract implementation for types that write files. @@ -40,16 +41,20 @@ @Experimental @Internal abstract sealed class AbstractFileBodyWriter permits InputStreamBodyWriter, StreamFileBodyWriter, SystemFileBodyWriter { - private static final String[] ENTITY_HEADERS = {HttpHeaders.ALLOW, HttpHeaders.CONTENT_ENCODING, HttpHeaders.CONTENT_LANGUAGE, HttpHeaders.CONTENT_LENGTH, HttpHeaders.CONTENT_LOCATION, HttpHeaders.CONTENT_MD5, HttpHeaders.CONTENT_RANGE, HttpHeaders.CONTENT_TYPE, HttpHeaders.EXPIRES, HttpHeaders.LAST_MODIFIED}; - protected final NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration; + private static final Set ENTITY_HEADERS = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + protected final HttpServerConfiguration.FileTypeHandlerConfiguration configuration; - AbstractFileBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration) { + static { + ENTITY_HEADERS.addAll(List.of(HttpHeaders.ALLOW, HttpHeaders.CONTENT_ENCODING, HttpHeaders.CONTENT_LANGUAGE, HttpHeaders.CONTENT_LENGTH, HttpHeaders.CONTENT_LOCATION, HttpHeaders.CONTENT_MD5, HttpHeaders.CONTENT_RANGE, HttpHeaders.CONTENT_TYPE, HttpHeaders.EXPIRES, HttpHeaders.LAST_MODIFIED)); + } + + AbstractFileBodyWriter(HttpServerConfiguration.FileTypeHandlerConfiguration configuration) { this.configuration = configuration; } private static void copyNonEntityHeaders(MutableHttpResponse from, MutableHttpResponse to) { from.getHeaders().forEachValue((header, value) -> { - if (Arrays.binarySearch(ENTITY_HEADERS, header) < 0) { + if (!ENTITY_HEADERS.contains(header)) { to.getHeaders().add(header, value); } }); @@ -71,9 +76,7 @@ protected boolean handleIfModifiedAndHeaders(HttpRequest request, MutableHttp } } - if (!response.getHeaders().contains(HttpHeaderNames.CONTENT_TYPE)) { - response.header(HttpHeaderNames.CONTENT_TYPE, systemFile.getMediaType().toString()); - } + response.getHeaders().contentTypeIfMissing(systemFile.getMediaType()); setDateAndCacheHeaders(response, lastModified); systemFile.process(nettyResponse); return false; @@ -87,7 +90,7 @@ protected void setDateAndCacheHeaders(MutableHttpResponse response, long lastMod // Date header MutableHttpHeaders headers = response.getHeaders(); LocalDateTime now = LocalDateTime.now(); - if (!headers.contains(HttpHeaderNames.DATE)) { + if (!headers.contains(HttpHeaders.DATE)) { headers.date(now); } @@ -98,7 +101,7 @@ protected void setDateAndCacheHeaders(MutableHttpResponse response, long lastMod } if (response.header(HttpHeaders.CACHE_CONTROL) == null) { - NettyHttpServerConfiguration.FileTypeHandlerConfiguration.CacheControlConfiguration cacheConfig = configuration.getCacheControl(); + HttpServerConfiguration.FileTypeHandlerConfiguration.CacheControlConfiguration cacheConfig = configuration.getCacheControl(); StringBuilder header = new StringBuilder(cacheConfig.getPublic() ? "public" : "private") .append(", max-age=") .append(configuration.getCacheSeconds()); @@ -119,10 +122,10 @@ protected void setDateHeader(MutableHttpResponse response) { headers.date(now); } - protected ByteBodyHttpResponse notModified(MutableHttpResponse originalResponse) { + protected ByteBodyHttpResponse notModified(ByteBodyFactory bodyFactory, MutableHttpResponse originalResponse) { MutableHttpResponse response = HttpResponse.notModified(); AbstractFileBodyWriter.copyNonEntityHeaders(originalResponse, response); setDateHeader(response); - return ByteBodyHttpResponseWrapper.wrap(response, AvailableNettyByteBody.empty()); + return ByteBodyHttpResponseWrapper.wrap(response, bodyFactory.createEmpty()); } } diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/FileBodyWriter.java b/http-server/src/main/java/io/micronaut/http/server/body/FileBodyWriter.java similarity index 61% rename from http-server-netty/src/main/java/io/micronaut/http/server/netty/body/FileBodyWriter.java rename to http-server/src/main/java/io/micronaut/http/server/body/FileBodyWriter.java index fe2e7151dd..f60237fc83 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/FileBodyWriter.java +++ b/http-server/src/main/java/io/micronaut/http/server/body/FileBodyWriter.java @@ -13,17 +13,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.micronaut.http.server.netty.body; +package io.micronaut.http.server.body; import io.micronaut.core.annotation.Experimental; import io.micronaut.core.annotation.Internal; -import io.micronaut.core.io.buffer.ByteBufferFactory; +import io.micronaut.core.annotation.NonNull; import io.micronaut.core.type.Argument; import io.micronaut.core.type.MutableHeaders; import io.micronaut.http.ByteBodyHttpResponse; import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; import io.micronaut.http.MediaType; import io.micronaut.http.MutableHttpResponse; +import io.micronaut.http.body.ByteBodyFactory; +import io.micronaut.http.body.CloseableByteBody; import io.micronaut.http.body.ResponseBodyWriter; import io.micronaut.http.codec.CodecException; import io.micronaut.http.server.types.files.SystemFile; @@ -49,10 +52,25 @@ public FileBodyWriter(SystemFileBodyWriter systemFileBodyWriter) { } @Override - public ByteBodyHttpResponse write(ByteBufferFactory bufferFactory, HttpRequest request, MutableHttpResponse outgoingResponse, Argument type, MediaType mediaType, File object) throws CodecException { + public ByteBodyHttpResponse write(@NonNull ByteBodyFactory bodyFactory, + HttpRequest request, + MutableHttpResponse outgoingResponse, + Argument type, + MediaType mediaType, + File object) throws CodecException { SystemFile systemFile = new SystemFile(object); MutableHttpResponse newResponse = outgoingResponse.body(systemFile); - return systemFileBodyWriter.write(request, newResponse, systemFile); + return systemFileBodyWriter.write(bodyFactory, request, newResponse, systemFile); + } + + @Override + public CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory, + @NonNull HttpRequest request, + @NonNull HttpResponse response, + @NonNull Argument type, + @NonNull MediaType mediaType, + File object) { + return systemFileBodyWriter.writePiece(bodyFactory, new SystemFile(object)); } @Override diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/InputStreamBodyWriter.java b/http-server/src/main/java/io/micronaut/http/server/body/InputStreamBodyWriter.java similarity index 58% rename from http-server-netty/src/main/java/io/micronaut/http/server/netty/body/InputStreamBodyWriter.java rename to http-server/src/main/java/io/micronaut/http/server/body/InputStreamBodyWriter.java index 9caa5ce87e..2fc3260822 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/InputStreamBodyWriter.java +++ b/http-server/src/main/java/io/micronaut/http/server/body/InputStreamBodyWriter.java @@ -13,25 +13,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.micronaut.http.server.netty.body; +package io.micronaut.http.server.body; -import io.micronaut.buffer.netty.NettyByteBufferFactory; import io.micronaut.core.annotation.Experimental; import io.micronaut.core.annotation.Internal; -import io.micronaut.core.io.buffer.ByteBufferFactory; +import io.micronaut.core.annotation.NonNull; import io.micronaut.core.type.Argument; import io.micronaut.core.type.MutableHeaders; import io.micronaut.http.ByteBodyHttpResponse; import io.micronaut.http.ByteBodyHttpResponseWrapper; import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; import io.micronaut.http.MediaType; import io.micronaut.http.MutableHttpResponse; +import io.micronaut.http.body.ByteBodyFactory; +import io.micronaut.http.body.CloseableByteBody; import io.micronaut.http.body.ResponseBodyWriter; import io.micronaut.http.body.stream.InputStreamByteBody; import io.micronaut.http.codec.CodecException; -import io.micronaut.http.server.netty.configuration.NettyHttpServerConfiguration; +import io.micronaut.http.server.HttpServerConfiguration; import io.micronaut.scheduling.TaskExecutors; -import io.netty.handler.codec.http.HttpHeaderNames; import jakarta.inject.Named; import jakarta.inject.Singleton; @@ -52,15 +53,30 @@ public final class InputStreamBodyWriter extends AbstractFileBodyWriter implements ResponseBodyWriter { private final ExecutorService executorService; - InputStreamBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration, @Named(TaskExecutors.BLOCKING) ExecutorService executorService) { + InputStreamBodyWriter(HttpServerConfiguration.FileTypeHandlerConfiguration configuration, @Named(TaskExecutors.BLOCKING) ExecutorService executorService) { super(configuration); this.executorService = executorService; } @Override - public ByteBodyHttpResponse write(ByteBufferFactory bufferFactory, HttpRequest request, MutableHttpResponse outgoingResponse, Argument type, MediaType mediaType, InputStream object) throws CodecException { - outgoingResponse.getHeaders().setIfMissing(HttpHeaderNames.CONTENT_TYPE, mediaType); - return ByteBodyHttpResponseWrapper.wrap(outgoingResponse, InputStreamByteBody.create(object, OptionalLong.empty(), executorService, NettyByteBufferFactory.DEFAULT)); + public ByteBodyHttpResponse write(@NonNull ByteBodyFactory bodyFactory, + HttpRequest request, + MutableHttpResponse outgoingResponse, + Argument type, + MediaType mediaType, + InputStream object) throws CodecException { + outgoingResponse.getHeaders().contentTypeIfMissing(mediaType); + return ByteBodyHttpResponseWrapper.wrap(outgoingResponse, InputStreamByteBody.create(object, OptionalLong.empty(), executorService, bodyFactory)); + } + + @Override + public CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory, + @NonNull HttpRequest request, + @NonNull HttpResponse response, + @NonNull Argument type, + @NonNull MediaType mediaType, + InputStream object) { + return InputStreamByteBody.create(object, OptionalLong.empty(), executorService, bodyFactory); } @Override diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamFileBodyWriter.java b/http-server/src/main/java/io/micronaut/http/server/body/StreamFileBodyWriter.java similarity index 57% rename from http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamFileBodyWriter.java rename to http-server/src/main/java/io/micronaut/http/server/body/StreamFileBodyWriter.java index 417d9ed0b5..dea7058354 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamFileBodyWriter.java +++ b/http-server/src/main/java/io/micronaut/http/server/body/StreamFileBodyWriter.java @@ -13,23 +13,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.micronaut.http.server.netty.body; +package io.micronaut.http.server.body; -import io.micronaut.buffer.netty.NettyByteBufferFactory; import io.micronaut.core.annotation.Experimental; import io.micronaut.core.annotation.Internal; -import io.micronaut.core.io.buffer.ByteBufferFactory; +import io.micronaut.core.annotation.NonNull; import io.micronaut.core.type.Argument; import io.micronaut.core.type.MutableHeaders; import io.micronaut.http.ByteBodyHttpResponse; import io.micronaut.http.ByteBodyHttpResponseWrapper; import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; import io.micronaut.http.MediaType; import io.micronaut.http.MutableHttpResponse; +import io.micronaut.http.body.ByteBodyFactory; +import io.micronaut.http.body.CloseableByteBody; import io.micronaut.http.body.ResponseBodyWriter; import io.micronaut.http.body.stream.InputStreamByteBody; import io.micronaut.http.codec.CodecException; -import io.micronaut.http.server.netty.configuration.NettyHttpServerConfiguration; +import io.micronaut.http.server.HttpServerConfiguration; import io.micronaut.http.server.types.files.StreamedFile; import io.micronaut.scheduling.TaskExecutors; import jakarta.inject.Named; @@ -52,22 +54,37 @@ public final class StreamFileBodyWriter extends AbstractFileBodyWriter implements ResponseBodyWriter { private final ExecutorService ioExecutor; - StreamFileBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration, @Named(TaskExecutors.BLOCKING) ExecutorService ioExecutor) { + StreamFileBodyWriter(HttpServerConfiguration.FileTypeHandlerConfiguration configuration, @Named(TaskExecutors.BLOCKING) ExecutorService ioExecutor) { super(configuration); this.ioExecutor = ioExecutor; } @Override - public ByteBodyHttpResponse write(ByteBufferFactory bufferFactory, HttpRequest request, MutableHttpResponse outgoingResponse, Argument type, MediaType mediaType, StreamedFile object) throws CodecException { + public ByteBodyHttpResponse write(@NonNull ByteBodyFactory bodyFactory, + HttpRequest request, + MutableHttpResponse outgoingResponse, + Argument type, + MediaType mediaType, + StreamedFile object) throws CodecException { if (handleIfModifiedAndHeaders(request, outgoingResponse, object, outgoingResponse)) { - return notModified(outgoingResponse); + return notModified(bodyFactory, outgoingResponse); } else { - long length = object.getLength(); - InputStream inputStream = object.getInputStream(); - return ByteBodyHttpResponseWrapper.wrap(outgoingResponse, InputStreamByteBody.create(inputStream, length > -1 ? OptionalLong.of(length) : OptionalLong.empty(), ioExecutor, NettyByteBufferFactory.DEFAULT)); + return ByteBodyHttpResponseWrapper.wrap(outgoingResponse, writePiece(bodyFactory, request, outgoingResponse, type, mediaType, object)); } } + @Override + public CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory, + @NonNull HttpRequest request, + @NonNull HttpResponse response, + @NonNull Argument type, + @NonNull MediaType mediaType, + StreamedFile object) { + long length = object.getLength(); + InputStream inputStream = object.getInputStream(); + return InputStreamByteBody.create(inputStream, length > -1 ? OptionalLong.of(length) : OptionalLong.empty(), ioExecutor, bodyFactory); + } + @Override public void writeTo(Argument type, MediaType mediaType, StreamedFile object, MutableHeaders outgoingHeaders, OutputStream outputStream) throws CodecException { throw new UnsupportedOperationException("Can only be used in a Netty context"); diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/SystemFileBodyWriter.java b/http-server/src/main/java/io/micronaut/http/server/body/SystemFileBodyWriter.java similarity index 80% rename from http-server-netty/src/main/java/io/micronaut/http/server/netty/body/SystemFileBodyWriter.java rename to http-server/src/main/java/io/micronaut/http/server/body/SystemFileBodyWriter.java index 419201b096..211ebb7fd8 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/SystemFileBodyWriter.java +++ b/http-server/src/main/java/io/micronaut/http/server/body/SystemFileBodyWriter.java @@ -13,14 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.micronaut.http.server.netty.body; +package io.micronaut.http.server.body; -import io.micronaut.buffer.netty.NettyByteBufferFactory; import io.micronaut.core.annotation.Experimental; import io.micronaut.core.annotation.Internal; import io.micronaut.core.annotation.NonNull; import io.micronaut.core.annotation.Nullable; -import io.micronaut.core.io.buffer.ByteBufferFactory; import io.micronaut.core.type.Argument; import io.micronaut.core.type.MutableHeaders; import io.micronaut.http.ByteBodyHttpResponse; @@ -28,18 +26,19 @@ import io.micronaut.http.HttpHeaders; import io.micronaut.http.HttpMethod; import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; import io.micronaut.http.HttpStatus; import io.micronaut.http.MediaType; import io.micronaut.http.MutableHttpResponse; +import io.micronaut.http.body.ByteBodyFactory; +import io.micronaut.http.body.CloseableByteBody; import io.micronaut.http.body.ResponseBodyWriter; import io.micronaut.http.body.stream.InputStreamByteBody; import io.micronaut.http.codec.CodecException; import io.micronaut.http.exceptions.MessageBodyException; -import io.micronaut.http.server.netty.configuration.NettyHttpServerConfiguration; +import io.micronaut.http.server.HttpServerConfiguration; import io.micronaut.http.server.types.files.SystemFile; import io.micronaut.scheduling.TaskExecutors; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpHeaderValues; import jakarta.inject.Named; import jakarta.inject.Singleton; @@ -69,7 +68,7 @@ public final class SystemFileBodyWriter extends AbstractFileBodyWriter implement private final ExecutorService ioExecutor; - public SystemFileBodyWriter(NettyHttpServerConfiguration.FileTypeHandlerConfiguration configuration, @Named(TaskExecutors.BLOCKING) ExecutorService ioExecutor) { + public SystemFileBodyWriter(HttpServerConfiguration.FileTypeHandlerConfiguration configuration, @Named(TaskExecutors.BLOCKING) ExecutorService ioExecutor) { super(configuration); this.ioExecutor = ioExecutor; } @@ -80,16 +79,21 @@ public void writeTo(Argument type, MediaType mediaType, SystemFile f } @Override - public ByteBodyHttpResponse write(ByteBufferFactory bufferFactory, HttpRequest request, @NonNull MutableHttpResponse httpResponse, @NonNull Argument type, @NonNull MediaType mediaType, SystemFile object) throws CodecException { - return write(request, httpResponse, object); + public ByteBodyHttpResponse write(@NonNull ByteBodyFactory bodyFactory, + HttpRequest request, + @NonNull MutableHttpResponse httpResponse, + @NonNull Argument type, + @NonNull MediaType mediaType, + SystemFile object) throws CodecException { + return write(bodyFactory, request, httpResponse, object); } - public ByteBodyHttpResponse write(HttpRequest request, MutableHttpResponse response, SystemFile systemFile) throws CodecException { + public ByteBodyHttpResponse write(@NonNull ByteBodyFactory bodyFactory, HttpRequest request, MutableHttpResponse response, SystemFile systemFile) throws CodecException { if (!systemFile.getFile().canRead()) { throw new MessageBodyException("Could not find file"); } if (handleIfModifiedAndHeaders(request, response, systemFile, response)) { - return notModified(response); + return notModified(bodyFactory, response); } else { // Parse the range headers (if any), and determine the position and content length @@ -118,9 +122,6 @@ public ByteBodyHttpResponse write(HttpRequest request, MutableHttpResponse } } response.header(HttpHeaders.ACCEPT_RANGES, UNIT_BYTES); - response.header(HttpHeaders.CONTENT_LENGTH, Long.toString(contentLength)); - } else { - response.header(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); } File file = systemFile.getFile(); @@ -132,10 +133,31 @@ public ByteBodyHttpResponse write(HttpRequest request, MutableHttpResponse } @NonNull InputStream stream = new RangeInputStream(is, position, contentLength); - return ByteBodyHttpResponseWrapper.wrap(response, InputStreamByteBody.create(stream, OptionalLong.of(contentLength), ioExecutor, NettyByteBufferFactory.DEFAULT)); + return ByteBodyHttpResponseWrapper.wrap(response, InputStreamByteBody.create(stream, OptionalLong.of(contentLength), ioExecutor, bodyFactory)); } } + @Override + public CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory, + @NonNull HttpRequest request, + @NonNull HttpResponse response, + @NonNull Argument type, + @NonNull MediaType mediaType, + SystemFile object) { + return writePiece(bodyFactory, object); + } + + public @NonNull CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory, SystemFile object) { + long fileLength = object.getLength(); + InputStream is; + try { + is = new FileInputStream(object.getFile()); + } catch (FileNotFoundException e) { + throw new MessageBodyException("Could not find file", e); + } + return InputStreamByteBody.create(is, OptionalLong.of(fileLength), ioExecutor, bodyFactory); + } + @Nullable private static IntRange parseRangeHeader(String value, long contentLength) { int equalsIdx = value.indexOf('='); diff --git a/http/src/main/java/io/micronaut/http/MutableHttpHeaders.java b/http/src/main/java/io/micronaut/http/MutableHttpHeaders.java index db857f5f4e..f28f428162 100644 --- a/http/src/main/java/io/micronaut/http/MutableHttpHeaders.java +++ b/http/src/main/java/io/micronaut/http/MutableHttpHeaders.java @@ -243,6 +243,21 @@ default MutableHttpHeaders contentType(MediaType mediaType) { return add(CONTENT_TYPE, mediaType); } + /** + * Sets the {@link HttpHeaders#CONTENT_TYPE} header to the given media type, if the header is + * missing. + * + * @param mediaType The media type + * @return This HTTP headers + */ + default MutableHttpHeaders contentTypeIfMissing(MediaType mediaType) { + if (!contains(CONTENT_TYPE)) { + return contentType(mediaType); + } else { + return this; + } + } + /** * Add a header for the given name and value. * diff --git a/http/src/main/java/io/micronaut/http/body/AbstractBodyAdapter.java b/http/src/main/java/io/micronaut/http/body/AbstractBodyAdapter.java new file mode 100644 index 0000000000..275df178c9 --- /dev/null +++ b/http/src/main/java/io/micronaut/http/body/AbstractBodyAdapter.java @@ -0,0 +1,115 @@ +/* + * Copyright 2017-2024 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.body; + +import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.NonNull; +import io.micronaut.core.annotation.Nullable; +import io.micronaut.http.body.stream.BaseSharedBuffer; +import io.micronaut.http.body.stream.BufferConsumer; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongUnaryOperator; + +/** + * Base implementation for an adapter that transforms a {@link Publisher} of buffers to a + * {@link ByteBody}. + * + * @param The input buffer type + * @param The output {@link BaseSharedBuffer} the buffers are forwarded to + * @since 4.8.0 + * @author Jonas Konrad + */ +@Internal +public abstract class AbstractBodyAdapter> implements BufferConsumer.Upstream, Subscriber { + protected S sharedBuffer; + + protected volatile Subscription subscription; + protected final AtomicLong demand = new AtomicLong(1); + + private final Publisher source; + @Nullable + private final Runnable onDiscard; + private volatile boolean cancelled; + + public AbstractBodyAdapter(@NonNull Publisher source, @Nullable Runnable onDiscard) { + this.source = source; + this.onDiscard = onDiscard; + } + + @Override + public final void start() { + source.subscribe(this); + } + + @Override + public final void onBytesConsumed(long bytesConsumed) { + if (bytesConsumed < 0) { + throw new IllegalArgumentException("Negative bytes consumed"); + } + + // clamping add + LongUnaryOperator add = l -> l + bytesConsumed < l ? Long.MAX_VALUE : l + bytesConsumed; + long oldDemand = this.demand.getAndUpdate(add); + long newDemand = add.applyAsLong(oldDemand); + if (oldDemand <= 0 && newDemand > 0) { + subscription.request(1); + } + } + + @Override + public final void allowDiscard() { + cancelled = true; + if (subscription != null) { + subscription.cancel(); + } + if (onDiscard != null) { + onDiscard.run(); + } + } + + @Override + public final void disregardBackpressure() { + this.demand.set(Long.MAX_VALUE); + if (subscription != null) { + subscription.request(Long.MAX_VALUE); + } + } + + @Override + public final void onSubscribe(Subscription s) { + this.subscription = s; + if (cancelled) { + s.cancel(); + } else { + s.request(1); + } + } + + @Override + public void onError(Throwable t) { + sharedBuffer.error(t); + } + + @Override + public void onComplete() { + sharedBuffer.complete(); + } + +} diff --git a/http/src/main/java/io/micronaut/http/body/ByteBodyFactory.java b/http/src/main/java/io/micronaut/http/body/ByteBodyFactory.java new file mode 100644 index 0000000000..5be9346e2e --- /dev/null +++ b/http/src/main/java/io/micronaut/http/body/ByteBodyFactory.java @@ -0,0 +1,161 @@ +/* + * Copyright 2017-2024 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.body; + +import io.micronaut.core.annotation.Blocking; +import io.micronaut.core.annotation.Experimental; +import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.NonNull; +import io.micronaut.core.io.buffer.ByteBuffer; +import io.micronaut.core.io.buffer.ByteBufferFactory; +import io.micronaut.core.io.buffer.ReferenceCounted; +import io.micronaut.core.util.ArrayUtils; +import io.micronaut.core.util.functional.ThrowingConsumer; +import io.micronaut.http.body.stream.AvailableByteArrayBody; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.Charset; + +/** + * Factory methods for {@link ByteBody}s. + *

While this is public API, extension is only allowed by micronaut-core. + * + * @author Jonas Konrad + * @since 4.8.0 + */ +@Experimental +public class ByteBodyFactory { + private final ByteBufferFactory byteBufferFactory; + + /** + * Internal constructor. + * + * @param byteBufferFactory The buffer factory + */ + @Internal + protected ByteBodyFactory(@NonNull ByteBufferFactory byteBufferFactory) { + this.byteBufferFactory = byteBufferFactory; + } + + /** + * Create a default body factory. Where possible, prefer using an existing factory that may + * have runtime-specific optimizations, such as the factory passed to + * {@link ResponseBodyWriter}. + * + * @param byteBufferFactory The base buffer factory + * @return The body factory + */ + @NonNull + public static ByteBodyFactory createDefault(@NonNull ByteBufferFactory byteBufferFactory) { + return new ByteBodyFactory(byteBufferFactory); + } + + /** + * Get the underlying {@link ByteBufferFactory}. Where possible, prefer using methods on the + * body factory directly. + * + * @return The buffer factory + */ + @NonNull + public final ByteBufferFactory byteBufferFactory() { + return byteBufferFactory; + } + + /** + * Create a new {@link CloseableAvailableByteBody} from the given buffer. Ownership of the + * buffer is transferred to this method; the original buffer may be copied or used as-is + * depending on implementation. If the buffer is {@link ReferenceCounted}, release ownership is + * also transferred to this method. + * + * @param buffer The buffer + * @return A {@link ByteBody} with the same content as the buffer + */ + @NonNull + public CloseableAvailableByteBody adapt(@NonNull ByteBuffer buffer) { + byte[] byteArray = buffer.toByteArray(); + if (buffer instanceof ReferenceCounted rc) { + rc.release(); + } + return adapt(byteArray); + } + + /** + * Create a new {@link CloseableAvailableByteBody} from the given array. Ownership of the array + * is transferred to this method; the array may be copied or used as-is, so do not modify the + * array after passing it to this method. + * + * @param array The array + * @return A {@link ByteBody} with the same content as the array + */ + @NonNull + public CloseableAvailableByteBody adapt(byte @NonNull [] array) { + return AvailableByteArrayBody.create(byteBufferFactory(), array); + } + + /** + * Buffer any data written to an {@link OutputStream} and return it as a {@link ByteBody}. + * + * @param writer The function that will write to the {@link OutputStream} + * @return The data written to the stream + * @param Exception type thrown by the consumer + * @throws T Exception thrown by the consumer + */ + @NonNull + public CloseableAvailableByteBody buffer(@NonNull ThrowingConsumer writer) throws T { + ByteArrayOutputStream s = new ByteArrayOutputStream(); + writer.accept(s); + return AvailableByteArrayBody.create(byteBufferFactory(), s.toByteArray()); + } + + /** + * Create an empty body. + * + * @return The empty body + */ + @NonNull + public CloseableAvailableByteBody createEmpty() { + return adapt(ArrayUtils.EMPTY_BYTE_ARRAY); + } + + /** + * Encode the given {@link CharSequence} and create a {@link ByteBody} from it. + * + * @param cs The input string + * @param charset The charset to use for encoding + * @return The encoded body + */ + @NonNull + public CloseableAvailableByteBody copyOf(@NonNull CharSequence cs, @NonNull Charset charset) { + return adapt(cs.toString().getBytes(charset)); + } + + /** + * Copy the data of the given {@link InputStream} into an available {@link ByteBody}. If the + * input is blocking, this method will also block. + * + * @param stream The input to copy + * @return A body containing the data read from the input + * @throws IOException Any exception thrown by the {@link InputStream} read methods + */ + @NonNull + @Blocking + public CloseableAvailableByteBody copyOf(@NonNull InputStream stream) throws IOException { + return adapt(stream.readAllBytes()); + } +} diff --git a/http/src/main/java/io/micronaut/http/body/ByteBufferBodyAdapter.java b/http/src/main/java/io/micronaut/http/body/ByteBufferBodyAdapter.java new file mode 100644 index 0000000000..4d105481e6 --- /dev/null +++ b/http/src/main/java/io/micronaut/http/body/ByteBufferBodyAdapter.java @@ -0,0 +1,91 @@ +/* + * Copyright 2017-2024 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.body; + +import io.micronaut.core.annotation.Experimental; +import io.micronaut.core.annotation.NonNull; +import io.micronaut.core.annotation.Nullable; +import io.micronaut.http.HttpHeaders; +import io.micronaut.http.body.stream.BodySizeLimits; +import org.reactivestreams.Publisher; + +import java.nio.ByteBuffer; +import java.util.OptionalLong; + +/** + * Adapter from {@link Publisher} of NIO {@link ByteBuffer} to a {@link ReactiveByteBufferByteBody}. + * + * @since 4.8.0 + * @author Jonas Konrad + */ +@Experimental +public final class ByteBufferBodyAdapter extends AbstractBodyAdapter { + private ByteBufferBodyAdapter(Publisher source, @Nullable Runnable onDiscard) { + super(source, onDiscard); + } + + /** + * Create a new body that contains the bytes of the given publisher. + * + * @param source The byte publisher + * @return A body with those bytes + */ + @NonNull + static ReactiveByteBufferByteBody adapt(@NonNull Publisher source) { + return adapt(source, null, null); + } + + /** + * Create a new body that contains the bytes of the given publisher. + * + * @param publisher The byte publisher + * @param headersForLength Optional headers for reading the {@code content-length} header + * @param onDiscard Optional runnable to call if the body is discarded ({@link #allowDiscard()}) + * @return A body with those bytes + */ + @NonNull + static ReactiveByteBufferByteBody adapt(@NonNull Publisher publisher, @Nullable HttpHeaders headersForLength, @Nullable Runnable onDiscard) { + ByteBufferBodyAdapter adapter = new ByteBufferBodyAdapter(publisher, onDiscard); + adapter.sharedBuffer = new ReactiveByteBufferByteBody.SharedBuffer(BodySizeLimits.UNLIMITED, adapter); + if (headersForLength != null) { + adapter.sharedBuffer.setExpectedLengthFrom(headersForLength.get(HttpHeaders.CONTENT_LENGTH)); + } + return new ReactiveByteBufferByteBody(adapter.sharedBuffer); + } + + /** + * Create a new body from the given publisher. + * + * @param publisher The input publisher + * @param contentLength Optional length of the body, must match the publisher exactly + * @return The ByteBody fed by the publisher + */ + public static CloseableByteBody adapt(@NonNull Publisher publisher, @NonNull OptionalLong contentLength) { + ByteBufferBodyAdapter adapter = new ByteBufferBodyAdapter(publisher, null); + adapter.sharedBuffer = new ReactiveByteBufferByteBody.SharedBuffer(BodySizeLimits.UNLIMITED, adapter); + contentLength.ifPresent(adapter.sharedBuffer::setExpectedLength); + return new ReactiveByteBufferByteBody(adapter.sharedBuffer); + } + + @Override + public void onNext(ByteBuffer buffer) { + long newDemand = demand.addAndGet(-buffer.remaining()); + sharedBuffer.add(buffer); + if (newDemand > 0) { + subscription.request(1); + } + } +} diff --git a/http/src/main/java/io/micronaut/http/body/CharSequenceBodyWriter.java b/http/src/main/java/io/micronaut/http/body/CharSequenceBodyWriter.java index 6c39bf6fa0..b4542af578 100644 --- a/http/src/main/java/io/micronaut/http/body/CharSequenceBodyWriter.java +++ b/http/src/main/java/io/micronaut/http/body/CharSequenceBodyWriter.java @@ -16,10 +16,16 @@ package io.micronaut.http.body; import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.NonNull; import io.micronaut.core.type.Argument; import io.micronaut.core.type.MutableHeaders; -import io.micronaut.http.HttpHeaders; +import io.micronaut.http.ByteBodyHttpResponse; +import io.micronaut.http.ByteBodyHttpResponseWrapper; +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; import io.micronaut.http.MediaType; +import io.micronaut.http.MutableHttpHeaders; +import io.micronaut.http.MutableHttpResponse; import io.micronaut.http.codec.CodecException; import io.micronaut.runtime.ApplicationConfiguration; import jakarta.inject.Inject; @@ -37,7 +43,7 @@ */ @Singleton @Internal -public final class CharSequenceBodyWriter implements TypedMessageBodyWriter { +public final class CharSequenceBodyWriter implements TypedMessageBodyWriter, ResponseBodyWriter { private final Charset defaultCharset; @@ -53,7 +59,7 @@ public CharSequenceBodyWriter(Charset defaultCharset) { @Override public void writeTo(Argument type, MediaType mediaType, CharSequence object, MutableHeaders outgoingHeaders, OutputStream outputStream) throws CodecException { if (mediaType != null) { - outgoingHeaders.setIfMissing(HttpHeaders.CONTENT_TYPE, mediaType); + ((MutableHttpHeaders) outgoingHeaders).contentTypeIfMissing(mediaType); } try { outputStream.write(object.toString().getBytes(MessageBodyWriter.findCharset(mediaType, outgoingHeaders).orElse(defaultCharset))); @@ -62,6 +68,17 @@ public void writeTo(Argument type, MediaType mediaType, CharSequen } } + @Override + public ByteBodyHttpResponse write(@NonNull ByteBodyFactory bodyFactory, HttpRequest request, MutableHttpResponse outgoingResponse, Argument type, MediaType mediaType, CharSequence object) throws CodecException { + outgoingResponse.getHeaders().contentTypeIfMissing(mediaType); + return ByteBodyHttpResponseWrapper.wrap(outgoingResponse, writePiece(bodyFactory, request, outgoingResponse, type, mediaType, object)); + } + + @Override + public CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory, @NonNull HttpRequest request, @NonNull HttpResponse response, @NonNull Argument type, @NonNull MediaType mediaType, CharSequence object) { + return bodyFactory.copyOf(object, MessageBodyWriter.getCharset(mediaType, response.getHeaders())); + } + @Override public Argument getType() { return Argument.of(CharSequence.class); diff --git a/http/src/main/java/io/micronaut/http/body/ConcatenatingSubscriber.java b/http/src/main/java/io/micronaut/http/body/ConcatenatingSubscriber.java new file mode 100644 index 0000000000..efc7f8372d --- /dev/null +++ b/http/src/main/java/io/micronaut/http/body/ConcatenatingSubscriber.java @@ -0,0 +1,332 @@ +/* + * Copyright 2017-2024 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.body; + +import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.NonNull; +import io.micronaut.core.annotation.Nullable; +import io.micronaut.http.body.stream.BaseSharedBuffer; +import io.micronaut.http.body.stream.BodySizeLimits; +import io.micronaut.http.body.stream.BufferConsumer; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Flux; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +/** + * This is a reactive subscriber that accepts {@link ByteBody}s and concatenates them into a single + * {@link BaseSharedBuffer}, optionally with separators. + * + * @since 4.8.0 + * @author Jonas Konrad + */ +@Internal +public abstract class ConcatenatingSubscriber implements BufferConsumer.Upstream, CoreSubscriber, BufferConsumer { + private long forwarded; + private long consumed; + + private Subscription subscription; + private boolean cancelled; + private volatile boolean disregardBackpressure; + private boolean first = true; + private BufferConsumer.Upstream currentComponent; + private boolean start = false; + private boolean delayedSubscriberCompletion = false; + private boolean currentComponentDone = false; + + @Override + public final void onSubscribe(Subscription s) { + boolean start; + boolean cancelled; + synchronized (this) { + this.subscription = s; + cancelled = this.cancelled; + start = this.start; + } + if (cancelled) { + s.cancel(); + } else if (start) { + s.request(1); + } + } + + /** + * Called before any new {@link ByteBody} component to emit an additional separator. + * + * @param first {@code true} iff this is the first element (i.e. the start of the output) + * @return The number of bytes written for {@link #onBytesConsumed} accounting + */ + protected long emitLeadingSeparator(boolean first) { + return 0; + } + + /** + * Called before after all {@link ByteBody} components to emit additional trailing bytes. + * + * @param first {@code true} iff this is the first element, i.e. there were no component {@link ByteBody}s + * @return The number of bytes written for {@link #onBytesConsumed} accounting + */ + protected long emitFinalSeparator(boolean first) { + return 0; + } + + @Override + public final void onComplete() { + synchronized (this) { + if (currentComponent != null) { + delayedSubscriberCompletion = true; + return; + } + } + + long emitted = emitFinalSeparator(first); + if (emitted != 0) { + synchronized (this) { + forwarded += emitted; + } + } + forwardComplete(); + } + + @Override + public final void onError(Throwable t) { + forwardError(t); + } + + /** + * Forward the given body to the shared buffer. + * + * @param body The body + * @return The {@link io.micronaut.http.body.stream.BufferConsumer.Upstream} to control + * component backpressure, or {@code null} if all bytes were written immediately (as is the + * case for an {@link AvailableByteBody}) + */ + @Nullable + protected abstract BufferConsumer.Upstream forward(ByteBody body); + + /** + * Should be called by the subclass when bytes are sent to the sharedBuffer, for + * {@link #onBytesConsumed} accounting. + * + * @param n The number of bytes forwarded + */ + protected final void onForward(long n) { + synchronized (this) { + forwarded += n; + } + } + + @Override + public final void onNext(ByteBody body) { + onForward(emitLeadingSeparator(first)); + first = false; + + BufferConsumer.Upstream component = forward(body); + if (component == null) { + return; + } + + long preAcknowledged; + synchronized (this) { + preAcknowledged = consumed - forwarded; + currentComponent = component; + } + + component.start(); + if (disregardBackpressure) { + component.disregardBackpressure(); + } else if (preAcknowledged > 0) { + component.onBytesConsumed(preAcknowledged); + } + } + + @Override + public final void start() { + Subscription initialDemand; + synchronized (this) { + initialDemand = subscription; + start = true; + } + if (initialDemand != null) { + initialDemand.request(1); + } + } + + @Override + public final void onBytesConsumed(long bytesConsumed) { + long delta; + Upstream currentComponent; + boolean requestNewComponent; + synchronized (this) { + long newConsumed = consumed + bytesConsumed; + if (newConsumed < consumed) { + // overflow + newConsumed = Long.MAX_VALUE; + } + delta = newConsumed - consumed; + consumed = newConsumed; + + currentComponent = this.currentComponent; + requestNewComponent = currentComponent == null && currentComponentDone && newConsumed >= forwarded; + } + if (currentComponent != null && delta > 0) { + currentComponent.onBytesConsumed(bytesConsumed); + } else if (requestNewComponent) { + // Previous component is now fully consumed, request a new one. + subscription.request(1); + } + } + + @Override + public final void allowDiscard() { + Upstream component; + Subscription subscription; + synchronized (this) { + component = currentComponent; + subscription = this.subscription; + cancelled = true; + } + if (subscription != null) { + subscription.cancel(); + } + if (component != null) { + component.allowDiscard(); + } + } + + @Override + public final void disregardBackpressure() { + Upstream component; + synchronized (this) { + component = currentComponent; + disregardBackpressure = true; + } + if (component != null) { + component.disregardBackpressure(); + } + } + + @Override + public final void complete() { + boolean delayedSubscriberCompletion; + boolean requestNextComponent; + synchronized (this) { + currentComponent = null; + delayedSubscriberCompletion = this.delayedSubscriberCompletion; + requestNextComponent = !delayedSubscriberCompletion && (disregardBackpressure || consumed >= forwarded); + currentComponentDone = !requestNextComponent; + } + if (delayedSubscriberCompletion) { + // onComplete was held back, call it now + onComplete(); + } else if (requestNextComponent) { + // current component completed. request the next ByteBody + subscription.request(1); + } + // if requestNextComponent is false, then the last component has not been fully consumed yet. we'll request the next later. + } + + @Override + public final void error(Throwable e) { + subscription.cancel(); + forwardError(e); + } + + /** + * Forward completion to the shared buffer. + */ + protected abstract void forwardComplete(); + + /** + * Forward an error to the shared buffer. + * + * @param t The error + */ + protected abstract void forwardError(Throwable t); + + /** + * Concatenating implementation that writes to a {@link ReactiveByteBufferByteBody}. + */ + public static class ByteBufferConcatenatingSubscriber extends ConcatenatingSubscriber implements ReactiveByteBufferByteBody.ByteBufferConsumer { + final ReactiveByteBufferByteBody.SharedBuffer sharedBuffer = new ReactiveByteBufferByteBody.SharedBuffer(BodySizeLimits.UNLIMITED, this); + + private ByteBufferConcatenatingSubscriber() { + } + + public static CloseableByteBody concatenate(Publisher publisher) { + ByteBufferConcatenatingSubscriber subscriber = new ByteBufferConcatenatingSubscriber(); + publisher.subscribe(subscriber); + return new ReactiveByteBufferByteBody(subscriber.sharedBuffer); + } + + @Override + protected Upstream forward(ByteBody body) { + return ByteBufferBodyAdapter.adapt(Flux.from(body.toByteArrayPublisher()).map(ByteBuffer::wrap)).primary(this); + } + + @Override + public void add(@NonNull ByteBuffer buffer) { + int n = buffer.remaining(); + onForward(n); + sharedBuffer.add(buffer); + } + + @Override + protected void forwardComplete() { + sharedBuffer.complete(); + } + + @Override + protected void forwardError(Throwable t) { + sharedBuffer.error(t); + } + } + + /** + * Concatenating implementation that writes to a {@link ReactiveByteBufferByteBody}, with + * JSON-style separators. + */ + public static final class JsonByteBufferConcatenatingSubscriber extends ByteBufferConcatenatingSubscriber { + private static final ByteBuffer START_ARRAY = ByteBuffer.wrap("[".getBytes(StandardCharsets.UTF_8)); + private static final ByteBuffer END_ARRAY = ByteBuffer.wrap("]".getBytes(StandardCharsets.UTF_8)); + private static final ByteBuffer SEPARATOR = ByteBuffer.wrap(",".getBytes(StandardCharsets.UTF_8)); + private static final ByteBuffer EMPTY_ARRAY = ByteBuffer.wrap("[]".getBytes(StandardCharsets.UTF_8)); + + private JsonByteBufferConcatenatingSubscriber() { + } + + public static CloseableByteBody concatenateJson(Publisher publisher) { + JsonByteBufferConcatenatingSubscriber subscriber = new JsonByteBufferConcatenatingSubscriber(); + publisher.subscribe(subscriber); + return new ReactiveByteBufferByteBody(subscriber.sharedBuffer); + } + + @Override + protected long emitLeadingSeparator(boolean first) { + sharedBuffer.add((first ? START_ARRAY : SEPARATOR).asReadOnlyBuffer()); + return 1; + } + + @Override + protected long emitFinalSeparator(boolean first) { + sharedBuffer.add((first ? EMPTY_ARRAY : END_ARRAY).asReadOnlyBuffer()); + return first ? 2 : 1; + } + } +} diff --git a/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/ReactiveByteBufferByteBody.java b/http/src/main/java/io/micronaut/http/body/ReactiveByteBufferByteBody.java similarity index 76% rename from http-client-jdk/src/main/java/io/micronaut/http/client/jdk/ReactiveByteBufferByteBody.java rename to http/src/main/java/io/micronaut/http/body/ReactiveByteBufferByteBody.java index 0ed228e241..01248113e0 100644 --- a/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/ReactiveByteBufferByteBody.java +++ b/http/src/main/java/io/micronaut/http/body/ReactiveByteBufferByteBody.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.micronaut.http.client.jdk; +package io.micronaut.http.body; import io.micronaut.core.annotation.Internal; import io.micronaut.core.annotation.NonNull; @@ -21,9 +21,6 @@ import io.micronaut.core.execution.DelayedExecutionFlow; import io.micronaut.core.execution.ExecutionFlow; import io.micronaut.core.io.buffer.ByteArrayBufferFactory; -import io.micronaut.http.body.CloseableAvailableByteBody; -import io.micronaut.http.body.CloseableByteBody; -import io.micronaut.http.body.InternalByteBody; import io.micronaut.http.body.stream.AvailableByteArrayBody; import io.micronaut.http.body.stream.BaseSharedBuffer; import io.micronaut.http.body.stream.BodySizeLimits; @@ -37,29 +34,24 @@ import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; -import java.net.http.HttpResponse; import java.nio.ByteBuffer; import java.util.List; import java.util.OptionalLong; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Flow; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; /** - * Streaming {@link io.micronaut.http.body.ByteBody} implementation for the JDK http client. + * Streaming {@link io.micronaut.http.body.ByteBody} implementation based on NIO {@link ByteBuffer}s. * * @since 4.8.0 * @author Jonas Konrad */ @Internal -final class ReactiveByteBufferByteBody implements CloseableByteBody, InternalByteBody { +public final class ReactiveByteBufferByteBody implements CloseableByteBody, InternalByteBody { private final SharedBuffer sharedBuffer; private BufferConsumer.Upstream upstream; - ReactiveByteBufferByteBody(SharedBuffer sharedBuffer) { + public ReactiveByteBufferByteBody(SharedBuffer sharedBuffer) { this(sharedBuffer, sharedBuffer.getRootUpstream()); } @@ -68,7 +60,7 @@ private ReactiveByteBufferByteBody(SharedBuffer sharedBuffer, BufferConsumer.Ups this.upstream = upstream; } - private BufferConsumer.Upstream primary(ByteBufferConsumer primary) { + BufferConsumer.Upstream primary(ByteBufferConsumer primary) { BufferConsumer.Upstream upstream = this.upstream; if (upstream == null) { BaseSharedBuffer.failClaim(); @@ -242,7 +234,7 @@ public void add(ByteBuffer buffer) { * Buffering is done using a {@link ByteArrayOutputStream}. Concurrency control is done through * a non-reentrant lock based on {@link AtomicReference}. */ - static final class SharedBuffer extends BaseSharedBuffer implements ByteBufferConsumer { + public static final class SharedBuffer extends BaseSharedBuffer implements ByteBufferConsumer { // fields for concurrency control, see #submit private final AtomicReference workState = new AtomicReference<>(WorkState.CLEAN); private final ConcurrentLinkedQueue workQueue = new ConcurrentLinkedQueue<>(); @@ -304,11 +296,11 @@ private void submit(Runnable task) { } } - public void reserve() { + void reserve() { submit(this::reserve0); } - public void subscribe(@Nullable ByteBufferConsumer consumer, Upstream upstream) { + void subscribe(@Nullable ByteBufferConsumer consumer, Upstream upstream) { submit(() -> subscribe0(consumer, upstream)); } @@ -408,105 +400,4 @@ private enum WorkState { WORKING_THEN_CLEAN, WORKING_THEN_DIRTY } - - /** - * {@link HttpResponse.BodySubscriber} implementation that pushes data into a - * {@link SharedBuffer}. - */ - static final class ByteBodySubscriber implements HttpResponse.BodySubscriber, BufferConsumer.Upstream { - private final SharedBuffer sharedBuffer; - private final CloseableByteBody root; - private final AtomicLong demand = new AtomicLong(0); - private Flow.Subscription subscription; - private boolean cancelled; - private volatile boolean disregardBackpressure; - - ByteBodySubscriber(BodySizeLimits limits) { - sharedBuffer = new SharedBuffer(limits, this); - root = new ReactiveByteBufferByteBody(sharedBuffer); - } - - @Override - public CompletionStage getBody() { - return CompletableFuture.completedFuture(root); - } - - @Override - public void onSubscribe(Flow.Subscription subscription) { - boolean initialDemand; - boolean cancelled; - synchronized (this) { - this.subscription = subscription; - cancelled = this.cancelled; - initialDemand = demand.get() > 0; - } - if (cancelled) { - subscription.cancel(); - } else if (initialDemand) { - subscription.request(disregardBackpressure ? Long.MAX_VALUE : 1); - } - } - - @Override - public void onNext(List item) { - for (ByteBuffer buffer : item) { - int n = buffer.remaining(); - demand.addAndGet(-n); - sharedBuffer.add(buffer); - } - if (demand.get() > 0) { - subscription.request(1); - } - } - - @Override - public void onError(Throwable throwable) { - sharedBuffer.error(throwable); - } - - @Override - public void onComplete() { - sharedBuffer.complete(); - } - - @Override - public void start() { - Flow.Subscription initialDemand; - synchronized (this) { - initialDemand = subscription; - demand.set(1); - } - if (initialDemand != null) { - initialDemand.request(1); - } - } - - @Override - public void onBytesConsumed(long bytesConsumed) { - long prev = demand.getAndAdd(bytesConsumed); - if (prev <= 0 && prev + bytesConsumed > 0) { - subscription.request(1); - } - } - - @Override - public void allowDiscard() { - Flow.Subscription subscription; - synchronized (this) { - cancelled = true; - subscription = this.subscription; - } - if (subscription != null) { - subscription.cancel(); - } - } - - @Override - public void disregardBackpressure() { - disregardBackpressure = true; - if (subscription != null) { - subscription.request(Long.MAX_VALUE); - } - } - } } diff --git a/http/src/main/java/io/micronaut/http/body/ResponseBodyWriter.java b/http/src/main/java/io/micronaut/http/body/ResponseBodyWriter.java index 4a9f7ff4ea..db62f2886d 100644 --- a/http/src/main/java/io/micronaut/http/body/ResponseBodyWriter.java +++ b/http/src/main/java/io/micronaut/http/body/ResponseBodyWriter.java @@ -18,10 +18,10 @@ import io.micronaut.core.annotation.Experimental; import io.micronaut.core.annotation.Indexed; import io.micronaut.core.annotation.NonNull; -import io.micronaut.core.io.buffer.ByteBufferFactory; import io.micronaut.core.type.Argument; import io.micronaut.http.ByteBodyHttpResponse; import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; import io.micronaut.http.MediaType; import io.micronaut.http.MutableHttpResponse; import io.micronaut.http.codec.CodecException; @@ -40,24 +40,46 @@ public interface ResponseBodyWriter extends MessageBodyWriter { /** * Writes an object as a {@link ByteBodyHttpResponse}. * - * @param bufferFactory The buffer factory - * @param request The request - * @param httpResponse The response - * @param type The response body type - * @param mediaType The media type - * @param object The object to write + * @param bodyFactory The buffer factory + * @param request The request + * @param httpResponse The response + * @param type The response body type + * @param mediaType The media type + * @param object The object to write * @return A {@link ByteBodyHttpResponse} with the response bytes * @throws CodecException If an error occurs encoding */ @NonNull ByteBodyHttpResponse write( - @NonNull ByteBufferFactory bufferFactory, + @NonNull ByteBodyFactory bodyFactory, @NonNull HttpRequest request, @NonNull MutableHttpResponse httpResponse, @NonNull Argument type, @NonNull MediaType mediaType, T object) throws CodecException; + /** + * Write a piece of a larger response, e.g. when writing a Publisher or a part of a + * multipart response. In this case, response headers cannot be modified. + * + * @param bodyFactory The buffer factory + * @param request The request + * @param response The response this piece is part of + * @param type The type of this piece + * @param mediaType The media type of this piece + * @param object The piece to write + * @return The response bytes + * @throws CodecException If an error occurs encoding + */ + @NonNull + CloseableByteBody writePiece( + @NonNull ByteBodyFactory bodyFactory, + @NonNull HttpRequest request, + @NonNull HttpResponse response, + @NonNull Argument type, + @NonNull MediaType mediaType, + T object) throws CodecException; + /** * Wrap the given writer, if necessary, to get a {@link ResponseBodyWriter}. * diff --git a/http/src/main/java/io/micronaut/http/body/ResponseBodyWriterWrapper.java b/http/src/main/java/io/micronaut/http/body/ResponseBodyWriterWrapper.java index 48ec5a962e..aaff607260 100644 --- a/http/src/main/java/io/micronaut/http/body/ResponseBodyWriterWrapper.java +++ b/http/src/main/java/io/micronaut/http/body/ResponseBodyWriterWrapper.java @@ -25,12 +25,12 @@ import io.micronaut.http.ByteBodyHttpResponse; import io.micronaut.http.ByteBodyHttpResponseWrapper; import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; import io.micronaut.http.MediaType; +import io.micronaut.http.MutableHttpHeaders; import io.micronaut.http.MutableHttpResponse; -import io.micronaut.http.body.stream.AvailableByteArrayBody; import io.micronaut.http.codec.CodecException; -import java.io.ByteArrayOutputStream; import java.io.OutputStream; /** @@ -41,10 +41,10 @@ * @author Jonas Konrad */ @Internal -public class ResponseBodyWriterWrapper implements ResponseBodyWriter { +final class ResponseBodyWriterWrapper implements ResponseBodyWriter { private final MessageBodyWriter wrapped; - protected ResponseBodyWriterWrapper(MessageBodyWriter wrapped) { + ResponseBodyWriterWrapper(MessageBodyWriter wrapped) { this.wrapped = wrapped; } @@ -74,9 +74,16 @@ public void writeTo(@NonNull Argument type, @NonNull MediaType mediaType, T o } @Override - public @NonNull ByteBodyHttpResponse write(@NonNull ByteBufferFactory bufferFactory, @NonNull HttpRequest request, @NonNull MutableHttpResponse httpResponse, @NonNull Argument type, @NonNull MediaType mediaType, T object) throws CodecException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - writeTo(type, mediaType, object, httpResponse.getHeaders(), baos); - return ByteBodyHttpResponseWrapper.wrap(httpResponse, AvailableByteArrayBody.create(bufferFactory, baos.toByteArray())); + public @NonNull ByteBodyHttpResponse write(@NonNull ByteBodyFactory bodyFactory, @NonNull HttpRequest request, @NonNull MutableHttpResponse httpResponse, @NonNull Argument type, @NonNull MediaType mediaType, T object) throws CodecException { + return ByteBodyHttpResponseWrapper.wrap(httpResponse, writePiece(bodyFactory, httpResponse.getHeaders(), type, mediaType, object)); + } + + @Override + public CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory, @NonNull HttpRequest request, @NonNull HttpResponse response, @NonNull Argument type, @NonNull MediaType mediaType, T object) { + return writePiece(bodyFactory, response.toMutableResponse().getHeaders(), type, mediaType, object); + } + + private @NonNull CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory, MutableHttpHeaders headers, @NonNull Argument type, @NonNull MediaType mediaType, T object) { + return bodyFactory.buffer(s -> writeTo(type, mediaType, object, headers, s)); } } diff --git a/http/src/main/java/io/micronaut/http/body/TextStreamBodyWriter.java b/http/src/main/java/io/micronaut/http/body/TextStreamBodyWriter.java index 28d4a138dc..cb8f81f521 100644 --- a/http/src/main/java/io/micronaut/http/body/TextStreamBodyWriter.java +++ b/http/src/main/java/io/micronaut/http/body/TextStreamBodyWriter.java @@ -19,7 +19,6 @@ import io.micronaut.core.annotation.NonNull; import io.micronaut.core.io.buffer.ByteBuffer; import io.micronaut.core.io.buffer.ByteBufferFactory; -import io.micronaut.core.io.buffer.ReferenceCounted; import io.micronaut.core.type.Argument; import io.micronaut.core.type.MutableHeaders; import io.micronaut.http.HttpHeaders; @@ -32,7 +31,10 @@ import jakarta.inject.Inject; import jakarta.inject.Singleton; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.OutputStream; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.List; @@ -87,6 +89,17 @@ private static Argument getBodyType(Argument type) { @Override public ByteBuffer writeTo(Argument type, MediaType mediaType, T object, MutableHeaders outgoingHeaders, ByteBufferFactory bufferFactory) throws CodecException { + ByteBufferOutput output = new ByteBufferOutput(bufferFactory); + write0(type, mediaType, object, outgoingHeaders, output); + return output.buffer; + } + + @Override + public void writeTo(Argument type, MediaType mediaType, T object, MutableHeaders outgoingHeaders, OutputStream outputStream) throws CodecException { + write0(type, mediaType, object, outgoingHeaders, new StreamOutput(outputStream)); + } + + private void write0(Argument type, MediaType mediaType, T object, MutableHeaders outgoingHeaders, Output output) { Argument bodyType = (Argument) type; Event event; if (object instanceof Event e) { @@ -108,22 +121,19 @@ public ByteBuffer writeTo(Argument type, MediaType mediaType, T object, Mu messageBodyWriter = registry.getWriter(bodyType, JSON_TYPE_LIST); } } - ByteBuffer buf = messageBodyWriter.writeTo(bodyType, MediaType.APPLICATION_JSON_TYPE, data, outgoingHeaders, bufferFactory); - body = buf.toByteArray(); - if (buf instanceof ReferenceCounted rc) { - rc.release(); - } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + messageBodyWriter.writeTo(bodyType, MediaType.APPLICATION_JSON_TYPE, data, outgoingHeaders, baos); + body = baos.toByteArray(); } outgoingHeaders.set(HttpHeaders.CONTENT_TYPE, mediaType != null ? mediaType : MediaType.TEXT_EVENT_STREAM_TYPE); - ByteBuffer eventData = bufferFactory.buffer(body.length + 10); - writeAttribute(eventData, COMMENT_PREFIX, event.getComment()); - writeAttribute(eventData, ID_PREFIX, event.getId()); - writeAttribute(eventData, EVENT_PREFIX, event.getName()); + writeAttribute(output, COMMENT_PREFIX, event.getComment()); + writeAttribute(output, ID_PREFIX, event.getId()); + writeAttribute(output, EVENT_PREFIX, event.getName()); Duration retry = event.getRetry(); if (retry != null) { - writeAttribute(eventData, RETRY_PREFIX, String.valueOf(retry.toMillis())); + writeAttribute(output, RETRY_PREFIX, String.valueOf(retry.toMillis())); } // Write the data @@ -133,18 +143,12 @@ public ByteBuffer writeTo(Argument type, MediaType mediaType, T object, Mu if (end == -1) { end = body.length - 1; } - eventData.write(DATA_PREFIX).write(body, start, end - start + 1); + output.write(DATA_PREFIX).write(body, start, end - start + 1); start = end + 1; } // Write new lines for event separation - eventData.write(NEWLINE).write(NEWLINE); - return eventData; - } - - @Override - public void writeTo(Argument type, MediaType mediaType, T object, MutableHeaders outgoingHeaders, OutputStream outputStream) throws CodecException { - throw new UnsupportedOperationException(); + output.write(NEWLINE).write(NEWLINE); } private static int indexOf(byte[] haystack, @SuppressWarnings("SameParameterValue") byte needle, int start) { @@ -161,11 +165,93 @@ private static int indexOf(byte[] haystack, @SuppressWarnings("SameParameterValu * @param attribute The attribute * @param value The value */ - private static void writeAttribute(ByteBuffer eventData, byte[] attribute, String value) { + private static void writeAttribute(Output eventData, byte[] attribute, String value) { if (value != null) { eventData.write(attribute) .write(value, StandardCharsets.UTF_8) .write(NEWLINE); } } + + private sealed interface Output { + void allocate(int expectedLength); + + Output write(byte[] b); + + Output write(byte[] b, int off, int len); + + Output write(String value, Charset charset); + } + + private static final class ByteBufferOutput implements Output { + final ByteBufferFactory bufferFactory; + ByteBuffer buffer; + + ByteBufferOutput(ByteBufferFactory bufferFactory) { + this.bufferFactory = bufferFactory; + } + + @Override + public void allocate(int expectedLength) { + buffer = bufferFactory.buffer(expectedLength); + } + + @Override + public Output write(byte[] b) { + buffer.write(b); + return this; + } + + @Override + public Output write(byte[] b, int off, int len) { + buffer.write(b, off, len); + return this; + } + + @Override + public Output write(String value, Charset charset) { + buffer.write(value, charset); + return this; + } + } + + private record StreamOutput(OutputStream stream) implements Output { + @Override + public void allocate(int expectedLength) { + } + + private void handle(IOException ioe) { + throw new CodecException("Failed to write SSE data", ioe); + } + + @Override + public Output write(byte[] b) { + try { + stream.write(b); + } catch (IOException e) { + handle(e); + } + return this; + } + + @Override + public Output write(byte[] b, int off, int len) { + try { + stream.write(b, off, len); + } catch (IOException e) { + handle(e); + } + return this; + } + + @Override + public Output write(String value, Charset charset) { + try { + stream.write(value.getBytes(charset)); + } catch (IOException e) { + handle(e); + } + return this; + } + } } diff --git a/http/src/main/java/io/micronaut/http/body/stream/BaseSharedBuffer.java b/http/src/main/java/io/micronaut/http/body/stream/BaseSharedBuffer.java index 4667c08931..e666aa3744 100644 --- a/http/src/main/java/io/micronaut/http/body/stream/BaseSharedBuffer.java +++ b/http/src/main/java/io/micronaut/http/body/stream/BaseSharedBuffer.java @@ -426,6 +426,11 @@ public void complete() { * @param e The error */ public void error(Throwable e) { + if (error != null) { + error.addSuppressed(e); + return; + } + error = e; discardBuffer(); if (subscribers != null) { diff --git a/http/src/main/java/io/micronaut/http/body/stream/BufferConsumer.java b/http/src/main/java/io/micronaut/http/body/stream/BufferConsumer.java index 0cee66edc5..0dbf364b97 100644 --- a/http/src/main/java/io/micronaut/http/body/stream/BufferConsumer.java +++ b/http/src/main/java/io/micronaut/http/body/stream/BufferConsumer.java @@ -63,7 +63,9 @@ default void start() { } /** - * Called when a number of bytes has been consumed by the downstream. + * Called when a number of bytes has been consumed by the downstream. Note that this can + * exceed the actual number of bytes written so far, if the downstream wants to signal it + * is ready consume much more data. * * @param bytesConsumed The number of bytes that were consumed */ diff --git a/http/src/main/java/io/micronaut/http/body/stream/InputStreamByteBody.java b/http/src/main/java/io/micronaut/http/body/stream/InputStreamByteBody.java index f76b8dd485..ac1b30ba86 100644 --- a/http/src/main/java/io/micronaut/http/body/stream/InputStreamByteBody.java +++ b/http/src/main/java/io/micronaut/http/body/stream/InputStreamByteBody.java @@ -22,6 +22,7 @@ import io.micronaut.core.io.buffer.ByteBuffer; import io.micronaut.core.io.buffer.ByteBufferFactory; import io.micronaut.core.util.ArgumentUtils; +import io.micronaut.http.body.ByteBodyFactory; import io.micronaut.http.body.CloseableAvailableByteBody; import io.micronaut.http.body.CloseableByteBody; import io.micronaut.http.body.InternalByteBody; @@ -62,14 +63,32 @@ private InputStreamByteBody(Context context, ExtendedInputStream stream) { * @param ioExecutor An executor where blocking {@link InputStream#read()} may be performed * @param bufferFactory A {@link ByteBufferFactory} for buffer-based methods * @return The body + * @deprecated Please pass a {@link ByteBodyFactory} instead + * ({@link #create(InputStream, OptionalLong, Executor, ByteBodyFactory)}) */ @NonNull public static CloseableByteBody create(@NonNull InputStream stream, @NonNull OptionalLong length, @NonNull Executor ioExecutor, @NonNull ByteBufferFactory bufferFactory) { + ArgumentUtils.requireNonNull("bufferFactory", bufferFactory); + return create(stream, length, ioExecutor, ByteBodyFactory.createDefault(bufferFactory)); + } + + /** + * Create a new stream-based {@link CloseableByteBody}. Ownership of the stream is transferred + * to the returned body. + * + * @param stream The stream backing the body + * @param length The expected content length (see {@link #expectedLength()}) + * @param ioExecutor An executor where blocking {@link InputStream#read()} may be performed + * @param bodyFactory A {@link ByteBodyFactory} for buffer-based methods + * @return The body + */ + @NonNull + public static CloseableByteBody create(@NonNull InputStream stream, @NonNull OptionalLong length, @NonNull Executor ioExecutor, @NonNull ByteBodyFactory bodyFactory) { ArgumentUtils.requireNonNull("stream", stream); ArgumentUtils.requireNonNull("length", length); ArgumentUtils.requireNonNull("ioExecutor", ioExecutor); - ArgumentUtils.requireNonNull("bufferFactory", bufferFactory); - return new InputStreamByteBody(new Context(length, ioExecutor, bufferFactory), ExtendedInputStream.wrap(stream)); + ArgumentUtils.requireNonNull("bodyFactory", bodyFactory); + return new InputStreamByteBody(new Context(length, ioExecutor, bodyFactory), ExtendedInputStream.wrap(stream)); } @Override @@ -146,7 +165,7 @@ public void close() { @Override public @NonNull Publisher> toByteBufferPublisher() { - return toByteArrayPublisher().map(context.bufferFactory::wrap); + return toByteArrayPublisher().map(context.bodyFactory.byteBufferFactory()::wrap); } @Override @@ -154,7 +173,7 @@ public void close() { ExtendedInputStream s = toInputStream(); return ExecutionFlow.async(context.ioExecutor, () -> { try (ExtendedInputStream t = s) { - return ExecutionFlow.just(AvailableByteArrayBody.create(context.bufferFactory(), t.readAllBytes())); + return ExecutionFlow.just(context.bodyFactory().copyOf(t)); } catch (Exception e) { return ExecutionFlow.error(e); } @@ -169,7 +188,7 @@ public void close() { private record Context( OptionalLong expectedLength, Executor ioExecutor, - ByteBufferFactory bufferFactory + ByteBodyFactory bodyFactory ) { } } diff --git a/http/src/test/groovy/io/micronaut/http/body/ConcatenatingSubscriberSpec.groovy b/http/src/test/groovy/io/micronaut/http/body/ConcatenatingSubscriberSpec.groovy new file mode 100644 index 0000000000..c5ff51d23e --- /dev/null +++ b/http/src/test/groovy/io/micronaut/http/body/ConcatenatingSubscriberSpec.groovy @@ -0,0 +1,33 @@ +package io.micronaut.http.body + +import io.micronaut.core.io.buffer.ByteArrayBufferFactory +import io.micronaut.http.body.stream.AvailableByteArrayBody +import reactor.core.publisher.Flux +import spock.lang.Specification + +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets + +class ConcatenatingSubscriberSpec extends Specification { + private static AvailableByteArrayBody available(String text) { + return AvailableByteArrayBody.create(ByteArrayBufferFactory.INSTANCE, text.getBytes(StandardCharsets.UTF_8)) + } + + private static ByteBuffer buffer(String text) { + return ByteBuffer.wrap(text.getBytes(StandardCharsets.UTF_8)) + } + + def test() { + given: + def input = Flux.just( + available("s1"), + available("s2"), + ByteBufferBodyAdapter.adapt(Flux.just(buffer("s3"), buffer("s4"))) + ) + + when: + def text = new String(ConcatenatingSubscriber.JsonByteBufferConcatenatingSubscriber.concatenateJson(input).toInputStream().readAllBytes(), StandardCharsets.UTF_8) + then: + text == "[s1,s2,s3s4]" + } +} diff --git a/http/src/test/groovy/io/micronaut/http/body/stream/InputStreamByteBodySpec.groovy b/http/src/test/groovy/io/micronaut/http/body/stream/InputStreamByteBodySpec.groovy index 326ba43039..e1714bc0fb 100644 --- a/http/src/test/groovy/io/micronaut/http/body/stream/InputStreamByteBodySpec.groovy +++ b/http/src/test/groovy/io/micronaut/http/body/stream/InputStreamByteBodySpec.groovy @@ -1,6 +1,7 @@ package io.micronaut.http.body.stream import io.micronaut.core.io.buffer.ByteArrayBufferFactory +import io.micronaut.http.body.ByteBodyFactory import spock.lang.Specification import java.nio.charset.StandardCharsets @@ -10,7 +11,7 @@ class InputStreamByteBodySpec extends Specification { def move() { given: def pool = Executors.newCachedThreadPool() - def a = InputStreamByteBody.create(new ByteArrayInputStream("foo".getBytes(StandardCharsets.UTF_8)), OptionalLong.empty(), pool, ByteArrayBufferFactory.INSTANCE) + def a = InputStreamByteBody.create(new ByteArrayInputStream("foo".getBytes(StandardCharsets.UTF_8)), OptionalLong.empty(), pool, ByteBodyFactory.createDefault(ByteArrayBufferFactory.INSTANCE)) def b = a.move() when: diff --git a/json-core/src/main/java/io/micronaut/json/body/JsonMessageHandler.java b/json-core/src/main/java/io/micronaut/json/body/JsonMessageHandler.java index c04588bee4..4efb4ab8a3 100644 --- a/json-core/src/main/java/io/micronaut/json/body/JsonMessageHandler.java +++ b/json-core/src/main/java/io/micronaut/json/body/JsonMessageHandler.java @@ -24,12 +24,20 @@ import io.micronaut.core.type.Argument; import io.micronaut.core.type.Headers; import io.micronaut.core.type.MutableHeaders; +import io.micronaut.http.ByteBodyHttpResponse; +import io.micronaut.http.ByteBodyHttpResponseWrapper; import io.micronaut.http.HttpHeaders; +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; import io.micronaut.http.MediaType; +import io.micronaut.http.MutableHttpResponse; import io.micronaut.http.annotation.Consumes; import io.micronaut.http.annotation.Produces; +import io.micronaut.http.body.ByteBodyFactory; +import io.micronaut.http.body.CloseableByteBody; import io.micronaut.http.body.MessageBodyHandler; import io.micronaut.http.body.MessageBodyWriter; +import io.micronaut.http.body.ResponseBodyWriter; import io.micronaut.http.codec.CodecException; import io.micronaut.json.JsonFeatures; import io.micronaut.json.JsonMapper; @@ -60,7 +68,7 @@ @JsonMessageHandler.ProducesJson @JsonMessageHandler.ConsumesJson @BootstrapContextCompatible -public final class JsonMessageHandler implements MessageBodyHandler, CustomizableJsonHandler { +public final class JsonMessageHandler implements MessageBodyHandler, CustomizableJsonHandler, ResponseBodyWriter { /** * The JSON handler should be preferred if for any type. @@ -143,6 +151,21 @@ public void writeTo(Argument type, @NonNull MediaType mediaType, T object, Mu } } + @Override + public @NonNull ByteBodyHttpResponse write(@NonNull ByteBodyFactory bodyFactory, @NonNull HttpRequest request, @NonNull MutableHttpResponse httpResponse, @NonNull Argument type, @NonNull MediaType mediaType, T object) throws CodecException { + httpResponse.getHeaders().contentTypeIfMissing(mediaType); + return ByteBodyHttpResponseWrapper.wrap(httpResponse, writePiece(bodyFactory, request, httpResponse, type, mediaType, object)); + } + + @Override + public @NonNull CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory, @NonNull HttpRequest request, @NonNull HttpResponse response, @NonNull Argument type, @NonNull MediaType mediaType, T object) throws CodecException { + try { + return bodyFactory.buffer(s -> jsonMapper.writeValue(s, object)); + } catch (IOException e) { + throw new CodecException("Error encoding object [" + object + "] to JSON: " + e.getMessage(), e); + } + } + @Override public CustomizableJsonHandler customize(JsonFeatures jsonFeatures) { return new JsonMessageHandler<>(jsonMapper.cloneWithFeatures(jsonFeatures));