Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Completable#concatWith(Completable) remove atomic operation #1000

Merged
merged 1 commit into from
Apr 3, 2020

Conversation

Scottmitch
Copy link
Member

Motivation:
Completable#concatWith(Completable) currently uses an atomic operation to transition subscribe() from the first Completable to the second. However this is done in the contex of a Subscriber and the events should be sequenced in a serial fashion.

Modifications:

  • Completable#concatWith(Completable) to use a regular variable instead of volatile/atomic operation to switch subscribers

Result:
Less atomic operations in Completable#concatWith(Completable).

Motivation:
Completable#concatWith(Completable) currently uses an atomic operation to transition subscribe() from the first Completable to the second. However this is done in the contex of a Subscriber and the events should be sequenced in a [serial](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.3) fashion.

Modifications:
- Completable#concatWith(Completable) to use a regular variable instead of volatile/atomic operation to switch subscribers

Result:
Less atomic operations in Completable#concatWith(Completable).
private final Subscriber target;
private final Completable next;
@Nullable
private SequentialCancellable sequentialCancellable;
@SuppressWarnings("unused")
private volatile int subscribedToNext;
private boolean nextSubscribed;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting one.

We are using the same Subscriber instance across different sources. RS spec mentions that all methods of a Subscriber must be invoked serially(Rule 1.3) and a Subscriber should ensure delivery of signals happens-before processing of signals(Rule 2.11) however, there is no mention of a rule where subscribe() should happen-before delivery of signals to the Subscriber.

So, here there is nothing guaranteeing from the spec that their is a memory barrier between call to next.subscribeInternal(this) and onComplete() from the next source. So, it may be that we will see nextSubscribed as false and subscribe again.

Interestingly I had this conversation before but it wasn't clear whether a rule is required to be added to the spec or existing rules cover this case. So 🍿 😄

Copy link
Member Author

@Scottmitch Scottmitch Apr 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall discussing this previously as well. IIUC the "shared subscriber" scenario is equivalent to the following:

class MySubscriber<T> implements Subscriber<T> {
  // not final, so no constructor barriers
  Object externalState;
  int internalState;

  MySubscriber(Object state) {
    externalState = state;
    internalState = 5;
  }
  
  onNext(T) {
    // is externalState, internalState visible?
  }
}

Publisher<T> pub = ...;
pub.subscribe(new MySubscriber<>("outside"));

If there is no happens-before relationship all non-final state in MySubscriber may not be visible when its Subscriber methods are invoked, which doesn't seem like desirable semantics from RS.

Related considerations:

  • reactive-streams examples use non-final state. In this case the state happens to be set to the default value, but if there is no happens before the explicit initialization to false could happen at some later time and be visible after the Subscriber has set the value to true.
  • sequentialCancellable in this class would also have the same issue (e.g. non final state used across subscribes).
  • All our other concat operator implementations operate in the same way as this PR.

Suggested path forward:

  • use a consistent approach internally. This PR makes our approach consistent and is less change the the other direction which involves investigating all other local state (shared across subscribes or initialized in the construction, ...).
  • open another discussion with the RS folks to discuss the above. Revisit if we need to make more broad changes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good points about non-final state!

I have created this in RS:

reactive-streams/reactive-streams-jvm#486

And I agree we should go ahead with this change to make things consistent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants