-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Completable#concatWith(Completable) remove atomic operation #1000
Conversation
Motivation: Completable#concatWith(Completable) currently uses an atomic operation to transition subscribe() from the first Completable to the second. However this is done in the contex of a Subscriber and the events should be sequenced in a [serial](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.3) fashion. Modifications: - Completable#concatWith(Completable) to use a regular variable instead of volatile/atomic operation to switch subscribers Result: Less atomic operations in Completable#concatWith(Completable).
private final Subscriber target; | ||
private final Completable next; | ||
@Nullable | ||
private SequentialCancellable sequentialCancellable; | ||
@SuppressWarnings("unused") | ||
private volatile int subscribedToNext; | ||
private boolean nextSubscribed; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an interesting one.
We are using the same Subscriber
instance across different sources. RS spec mentions that all methods of a Subscriber
must be invoked serially(Rule 1.3) and a Subscriber
should ensure delivery of signals happens-before processing of signals(Rule 2.11) however, there is no mention of a rule where subscribe()
should happen-before delivery of signals to the Subscriber
.
So, here there is nothing guaranteeing from the spec that their is a memory barrier between call to next.subscribeInternal(this)
and onComplete()
from the next source. So, it may be that we will see nextSubscribed
as false
and subscribe again.
Interestingly I had this conversation before but it wasn't clear whether a rule is required to be added to the spec or existing rules cover this case. So 🍿 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recall discussing this previously as well. IIUC the "shared subscriber" scenario is equivalent to the following:
class MySubscriber<T> implements Subscriber<T> {
// not final, so no constructor barriers
Object externalState;
int internalState;
MySubscriber(Object state) {
externalState = state;
internalState = 5;
}
onNext(T) {
// is externalState, internalState visible?
}
}
Publisher<T> pub = ...;
pub.subscribe(new MySubscriber<>("outside"));
If there is no happens-before relationship all non-final state in MySubscriber
may not be visible when its Subscriber
methods are invoked, which doesn't seem like desirable semantics from RS.
Related considerations:
- reactive-streams examples use non-final state. In this case the state happens to be set to the default value, but if there is no happens before the explicit initialization to
false
could happen at some later time and be visible after theSubscriber
has set the value totrue
. sequentialCancellable
in this class would also have the same issue (e.g. non final state used across subscribes).- All our other
concat
operator implementations operate in the same way as this PR.
Suggested path forward:
- use a consistent approach internally. This PR makes our approach consistent and is less change the the other direction which involves investigating all other local state (shared across subscribes or initialized in the construction, ...).
- open another discussion with the RS folks to discuss the above. Revisit if we need to make more broad changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good points about non-final state!
I have created this in RS:
reactive-streams/reactive-streams-jvm#486
And I agree we should go ahead with this change to make things consistent.
Motivation:
Completable#concatWith(Completable) currently uses an atomic operation to transition subscribe() from the first Completable to the second. However this is done in the contex of a Subscriber and the events should be sequenced in a serial fashion.
Modifications:
Result:
Less atomic operations in Completable#concatWith(Completable).