Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move body writing logic into http-server #11342

Open
wants to merge 5 commits into
base: 4.8.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright 2017-2024 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.core.async.subscriber;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.execution.DelayedExecutionFlow;
import io.micronaut.core.execution.ExecutionFlow;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CorePublisher;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Signal;
import reactor.util.context.Context;

/**
* This class waits for the first item of a publisher before completing an ExecutionFlow with a
* publisher containing the same items.
*
* @param <T> The publisher item type
* @since 4.8.0
* @author Jonas Konrad
*/
@Internal
public final class LazySendingSubscriber<T> implements CoreSubscriber<T>, CorePublisher<T>, Subscription {
private final DelayedExecutionFlow<Publisher<T>> result = DelayedExecutionFlow.create();
private boolean receivedFirst = false;
private volatile boolean sentFirst = false;
private boolean sendingFirst = false;
private T first;
private Subscription upstream;
private volatile CoreSubscriber<? super T> downstream;
private Signal<? extends T> heldBackSignal;

private LazySendingSubscriber() {
}

/**
* Create an {@link ExecutionFlow} that waits for the first item of the given publisher. If
* there is an error before the first item, the flow will fail. If there is no error, the flow
* will complete with a publisher containing all items, including the first one.
*
* @param input The input stream
* @return A flow that will complete with the same stream
* @param <T> The item type
*/
@NonNull
public static <T> ExecutionFlow<Publisher<T>> create(@NonNull Publisher<T> input) {
LazySendingSubscriber<T> subscriber = new LazySendingSubscriber<>();
input.subscribe(subscriber);
return subscriber.result;
}

@Override
public Context currentContext() {
return downstream == null ? Context.empty() : downstream.currentContext();
}

@Override
public void onSubscribe(Subscription s) {
upstream = s;
s.request(1);
}

@Override
public void onNext(T t) {
if (!receivedFirst) {
receivedFirst = true;
first = t;
result.complete(this);
} else {
downstream.onNext(t);
}
}

@Override
public void onError(Throwable t) {
if (receivedFirst) {
Subscriber<? super T> d;
synchronized (this) {
d = downstream;
if (d == null || !sentFirst) {
heldBackSignal = Signal.error(t);
return;
}
}
d.onError(t);
} else {
receivedFirst = true;
result.completeExceptionally(t);
}
}

@Override
public void onComplete() {
if (!receivedFirst) {
onNext(null);
}

Subscriber<? super T> d;
synchronized (this) {
d = downstream;
if (d == null || !sentFirst) {
heldBackSignal = Signal.complete();
return;
}
}
d.onComplete();
}

@Override
public void subscribe(CoreSubscriber<? super T> subscriber) {
synchronized (this) {
downstream = subscriber;
}
subscriber.onSubscribe(this);
}

@Override
public void subscribe(Subscriber<? super T> s) {
subscribe(Operators.toCoreSubscriber(s));
}

@Override
public void request(long n) {
if (!sentFirst && !sendingFirst) {
sendingFirst = true;
if (first != null) {
downstream.onNext(first); // note: this can trigger reentrancy!
first = null;
}
Signal<? extends T> heldBackSignal;
synchronized (this) {
sentFirst = true;
heldBackSignal = this.heldBackSignal;
}
if (heldBackSignal != null) {
heldBackSignal.accept(downstream);
return;
}
n--;
if (n <= 0) {
return;
}
}

upstream.request(n);
}

@Override
public void cancel() {
if (!sentFirst) {
sentFirst = true;
T t = first;
first = null;
Operators.onNextDropped(t, currentContext());
}
upstream.cancel();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2017-2024 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.core.util.functional;

/**
* Consumer with a generic exception.
*
* @param <T> the type accepted by this consumer
* @param <E> the type of exception thrown from the supplier
* @author Jonas Konrad
* @since 4.8.0
*/
@FunctionalInterface
public interface ThrowingConsumer<T, E extends Throwable> {
/**
* Consume the value.
*
* @param t The value
* @throws E The generic exception
*/
void accept(T t) throws E;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright 2017-2024 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.http.client.jdk;

import io.micronaut.core.annotation.Internal;
import io.micronaut.http.body.CloseableByteBody;
import io.micronaut.http.body.ReactiveByteBufferByteBody;
import io.micronaut.http.body.stream.BodySizeLimits;
import io.micronaut.http.body.stream.BufferConsumer;

import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;

/**
* {@link HttpResponse.BodySubscriber} implementation that pushes data into a
* {@link ReactiveByteBufferByteBody.SharedBuffer}.
*
* @since 4.8.0
* @author Jonas Konrad
*/
@Internal
final class ByteBodySubscriber implements HttpResponse.BodySubscriber<CloseableByteBody>, BufferConsumer.Upstream {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: this has simply been moved from ReactiveByteBufferByteBody where it was an inner class previously, it's not new code.

private final ReactiveByteBufferByteBody.SharedBuffer sharedBuffer;
private final CloseableByteBody root;
private final AtomicLong demand = new AtomicLong(0);
private Flow.Subscription subscription;
private boolean cancelled;
private volatile boolean disregardBackpressure;

public ByteBodySubscriber(BodySizeLimits limits) {
sharedBuffer = new ReactiveByteBufferByteBody.SharedBuffer(limits, this);
root = new ReactiveByteBufferByteBody(sharedBuffer);
}

@Override
public CompletionStage<CloseableByteBody> getBody() {
return CompletableFuture.completedFuture(root);
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
boolean initialDemand;
boolean cancelled;
synchronized (this) {
this.subscription = subscription;
cancelled = this.cancelled;
initialDemand = demand.get() > 0;
}
if (cancelled) {
subscription.cancel();
} else if (initialDemand) {
subscription.request(disregardBackpressure ? Long.MAX_VALUE : 1);
}
}

@Override
public void onNext(List<ByteBuffer> item) {
for (ByteBuffer buffer : item) {
int n = buffer.remaining();
demand.addAndGet(-n);
sharedBuffer.add(buffer);
}
if (demand.get() > 0) {
subscription.request(1);
}
}

@Override
public void onError(Throwable throwable) {
sharedBuffer.error(throwable);
}

@Override
public void onComplete() {
sharedBuffer.complete();
}

@Override
public void start() {
Flow.Subscription initialDemand;
synchronized (this) {
initialDemand = subscription;
demand.set(1);
}
if (initialDemand != null) {
initialDemand.request(1);
}
}

@Override
public void onBytesConsumed(long bytesConsumed) {
long prev = demand.getAndAdd(bytesConsumed);
if (prev <= 0 && prev + bytesConsumed > 0) {
subscription.request(1);
}
}

@Override
public void allowDiscard() {
Flow.Subscription subscription;
synchronized (this) {
cancelled = true;
subscription = this.subscription;
}
if (subscription != null) {
subscription.cancel();
}
}

@Override
public void disregardBackpressure() {
disregardBackpressure = true;
if (subscription != null) {
subscription.request(Long.MAX_VALUE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected <O> Publisher<HttpResponse<O>> responsePublisher(@NonNull HttpRequest<
headerName -> httpRequest.headers().allValues(headerName));
}
BodySizeLimits bodySizeLimits = new BodySizeLimits(Long.MAX_VALUE, configuration.getMaxContentLength());
return client.sendAsync(httpRequest, responseInfo -> new ReactiveByteBufferByteBody.ByteBodySubscriber(bodySizeLimits));
return client.sendAsync(httpRequest, responseInfo -> new ByteBodySubscriber(bodySizeLimits));
})
.flatMap(Mono::fromCompletionStage)
.onErrorMap(IOException.class, e -> new HttpClientException("Error sending request: " + e.getMessage(), e))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.micronaut.http.bind.DefaultRequestBinderRegistry;
import io.micronaut.http.bind.RequestBinderRegistry;
import io.micronaut.http.body.ByteBody;
import io.micronaut.http.body.CharSequenceBodyWriter;
import io.micronaut.http.body.ChunkedMessageBodyReader;
import io.micronaut.http.body.CloseableAvailableByteBody;
import io.micronaut.http.body.CloseableByteBody;
Expand Down Expand Up @@ -97,7 +98,6 @@
import io.micronaut.http.netty.body.NettyBodyAdapter;
import io.micronaut.http.netty.body.NettyByteBody;
import io.micronaut.http.netty.body.NettyByteBufMessageBodyHandler;
import io.micronaut.http.netty.body.NettyCharSequenceBodyWriter;
import io.micronaut.http.netty.body.NettyJsonHandler;
import io.micronaut.http.netty.body.NettyJsonStreamHandler;
import io.micronaut.http.netty.body.NettyWritableBodyWriter;
Expand Down Expand Up @@ -1989,7 +1989,7 @@ private static MessageBodyHandlerRegistry createDefaultMessageBodyHandlerRegistr
);
JsonMapper mapper = JsonMapper.createDefault();
registry.add(MediaType.APPLICATION_JSON_TYPE, new NettyJsonHandler<>(mapper));
registry.add(MediaType.APPLICATION_JSON_TYPE, new NettyCharSequenceBodyWriter());
registry.add(MediaType.APPLICATION_JSON_TYPE, new CharSequenceBodyWriter(StandardCharsets.UTF_8));
registry.add(MediaType.APPLICATION_JSON_STREAM_TYPE, new NettyJsonStreamHandler<>(mapper));
return registry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,14 @@ public MutableHttpHeaders contentType(MediaType mediaType) {
}
contentType = Optional.ofNullable(mediaType);
return this;
}

@Override
public MutableHttpHeaders contentTypeIfMissing(MediaType mediaType) {
if (nettyHeaders.contains(HttpHeaderNames.CONTENT_TYPE)) {
return this;
}
return contentType(mediaType);
}

@Override
Expand Down
Loading
Loading