Skip to content

Commit

Permalink
Publisher#flatMapSingle make subscription non-volatile (#1025)
Browse files Browse the repository at this point in the history
Motivation:
Publisher#flatMapSingle has the Subscription from onSubscribe saved to a
volatile variable. This was due to the Subscription being in the context
of the mapped Single's Subscriber termination callbacks and ambiguity in
the Reactive Streams specification about visibility related to
Publisher#subscribe(..) and Subscriber state. However recent
[discussions](reactive-streams/reactive-streams-jvm#486)
have provided more insight that the Publisher must provide visibility,
and therefore the volatile state is not necessary.

Modifications:
- Publisher#flatMapSingle subscription member variable can be
non-volatile

Result:
Less volatile state and more clear expecations.
  • Loading branch information
Scottmitch authored Apr 27, 2020
1 parent 29c4f7d commit c4fa823
Showing 1 changed file with 8 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,14 @@ private static final class FlatMapSubscriber<T, R> implements Subscriber<T>, Sub
private volatile int active; // Number of currently active Singles.
@SuppressWarnings("unused")
@Nullable
private volatile Subscription subscription;
@SuppressWarnings("unused")
@Nullable
private volatile TerminalNotification terminalNotification;
/**
* This variable is only accessed within the "emitting lock" so we rely upon this to provide visibility to
* other threads.
*/
private boolean targetTerminated;
@Nullable
private Subscription subscription;

private final Queue<Object> pending;
private final DynamicCompositeCancellable cancellable = new MapDynamicCompositeCancellable();
Expand All @@ -139,18 +138,17 @@ private static final class FlatMapSubscriber<T, R> implements Subscriber<T>, Sub

@Override
public void request(long n) {
final Subscription s = subscription;
assert s != null;
assert subscription != null;
if (!isRequestNValid(n)) {
s.request(n);
subscription.request(n);
return;
}

requestedUpdater.accumulateAndGet(this, n, FlowControlUtils::addWithOverflowProtection);
int actualSourceRequestN = calculateSourceRequested(requestedUpdater, sourceRequestedUpdater,
sourceEmittedUpdater, source.maxConcurrency, this);
if (actualSourceRequestN != 0) {
s.request(actualSourceRequestN);
subscription.request(actualSourceRequestN);
}
}

Expand Down Expand Up @@ -218,9 +216,6 @@ private boolean onError0(Throwable throwable, boolean overrideComplete,
}

private void enqueueAndDrain(Object item) {
Subscription s = subscription;
assert s != null;

if (!pending.offer(item)) {
QueueFullException exception = new QueueFullException("pending");
if (item instanceof TerminalNotification) {
Expand All @@ -230,10 +225,11 @@ private void enqueueAndDrain(Object item) {
onError0(exception, true, true);
}
}
drainPending(s);
drainPending();
}

private void drainPending(Subscription subscription) {
private void drainPending() {
assert subscription != null;
long drainedCount = drainSingleConsumerQueue(pending, this::sendToTarget, emittingUpdater, this);
if (drainedCount != 0) {
// We ignore overflow here because once we get to this extreme, we won't be able to account for more
Expand Down

0 comments on commit c4fa823

Please sign in to comment.