Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhances FluxReceiver internals #1185

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 56 additions & 43 deletions src/main/java/reactor/netty/channel/FluxReceive.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package reactor.netty.channel;

import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
Expand All @@ -34,7 +35,6 @@
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;

import static reactor.netty.ReactorNetty.format;

Expand All @@ -60,10 +60,19 @@ final class FluxReceive extends Flux<Object> implements Subscription, Disposable
Throwable inboundError;

volatile Disposable receiverCancel;
volatile int wip;

final static AtomicIntegerFieldUpdater<FluxReceive> WIP = AtomicIntegerFieldUpdater.newUpdater
(FluxReceive.class, "wip");
volatile int once;
static final AtomicIntegerFieldUpdater<FluxReceive> ONCE =
AtomicIntegerFieldUpdater.newUpdater(FluxReceive.class, "once");

// Please note, in this specific case WIP is non-volatile since all operation that
// involves work-in-progress pattern is within Netty Event-Loops which guarantees
// serial, thread-safe behaviour.
// However, we need that flag in order to preserve work-in-progress guarding that
// prevents stack overflow in case of onNext -> request -> onNext cycling on the
// same stack
int wip;
OlegDokuka marked this conversation as resolved.
Show resolved Hide resolved


FluxReceive(ChannelOperations<?, ?> parent) {

Expand Down Expand Up @@ -131,11 +140,42 @@ public void request(long n) {

@Override
public void subscribe(CoreSubscriber<? super Object> s) {
if (eventLoop.inEventLoop()){
startReceiver(s);
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
if (log.isDebugEnabled()) {
log.debug(format(channel, "Subscribing inbound receiver [pending: {}, cancelled:{}, " +
"inboundDone: {}]"),
getPending(),
isCancelled(),
inboundDone);
}
if (inboundDone && getPending() == 0) {
if (inboundError != null) {
Operators.error(s, inboundError);
return;
}

Operators.complete(s);
return;
}

receiver = s;

s.onSubscribe(this);
}
else {
eventLoop.execute(() -> startReceiver(s));
if (inboundDone && getPending() == 0) {
if (inboundError != null) {
Operators.error(s, inboundError);
return;
}

Operators.complete(s);
}
else {
Operators.error(s,
new IllegalStateException(
"Only one connection receive subscriber allowed."));
}
}
}

Expand Down Expand Up @@ -164,7 +204,8 @@ final void cleanQueue(@Nullable Queue<Object> q){
}

final void drainReceiver() {
if(WIP.getAndIncrement(this) != 0){
// general protect against stackoverflow onNext -> request -> onNext
if (wip++ != 0) {
return;
}
int missed = 1;
Expand All @@ -185,7 +226,7 @@ final void drainReceiver() {
}
return;
}
missed = WIP.addAndGet(this, -missed);
missed = (wip -= missed);
if(missed == 0){
break;
}
Expand Down Expand Up @@ -250,7 +291,7 @@ final void drainReceiver() {
channel.config()
.setAutoRead(true);
}
missed = WIP.addAndGet(this, -missed);
missed = (wip -= missed);
if(missed == 0){
break;
}
Expand All @@ -269,43 +310,13 @@ else if (!needRead) {
.setAutoRead(false);
}

missed = WIP.addAndGet(this, -missed);
missed = (wip -= missed);
if(missed == 0){
break;
}
}
}

final void startReceiver(CoreSubscriber<? super Object> s) {
if (receiver == null) {
if (log.isDebugEnabled()) {
log.debug(format(channel, "Subscribing inbound receiver [pending: {}, cancelled:{}, " +
"inboundDone: {}]"),
getPending(),
isCancelled(),
inboundDone);
}
if (inboundDone && getPending() == 0) {
if (inboundError != null) {
Operators.error(s, inboundError);
return;
}

Operators.complete(s);
return;
}

receiver = s;

s.onSubscribe(this);
}
else {
Operators.error(s,
new IllegalStateException(
"Only one connection receive subscriber allowed."));
}
}

final void onInboundNext(Object msg) {
if (inboundDone || isCancelled()) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -336,8 +347,10 @@ else if (msg instanceof ByteBufHolder){
else {
Queue<Object> q = receiverQueue;
if (q == null) {
q = Queues.unbounded()
.get();
// please note, in that case we are using non-thread safe, simple
// ArrayDeque since all modifications on this queue happens withing
// Netty Event Loop
q = new ArrayDeque<>();
receiverQueue = q;
}
if (log.isDebugEnabled()){
Expand Down