Skip to content

Commit

Permalink
Clarify intent of rule 1.9
Browse files Browse the repository at this point in the history
Rule 1.9 defines that `onSubscribe()` on a `Subscriber` should be called before calling any other method on that `Subscriber`. However, this does not clarify that if `onSubscribe()` is signalled asynchronously then there should be a happens-before relationship between `subscribe()` and signalling of `onSubscribe()`.
In absence of such a guarantee, non-final fields initialized in a constructor have no guarantee to be visible inside `onSubscribe()`.

Considering a simple example:

public class UnsafeSubscriber implements Subscriber<String> {
    private boolean duplicateOnSubscribe = false;

    @OverRide
    public void onSubscribe(final Subscription s) {
        if (duplicateOnSubscribe) {
            throw new IllegalStateException("Duplicate onSubscribe() calls.");
        }
        duplicateOnSubscribe = true;
    }

    @OverRide
    public void onNext(final String s) {

    }

    @OverRide
    public void onError(final Throwable t) {

    }

    @OverRide
    public void onComplete() {

    }
}
If an UnsafeSubscriber instance is created in a different thread than the one that invokes onSubscribe() (true for an asynchronous Publisher), according to the java memory model, this statement inside onSubscribe():

if (duplicateOnSubscribe) {

is guaranteed to compute to false if and only if the instance is published safely between these threads. None of the rules in the specifications establish a happens-before relationship between Publisher#subscribe() and Subscriber#onSubscribe(). So, the usage above can be categorized as unsafe. In a more convoluted form, the assignment:

private boolean duplicateOnSubscribe = false;

can be interleaved with

duplicateOnSubscribe = true; such that duplicateOnSubscribe is set to false later.

Has this been considered before or am I missing something?

Fixes reactive-streams#486
  • Loading branch information
Nitesh Kant committed May 7, 2020
1 parent de7ad73 commit 24f35af
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 1 deletion.
1 change: 1 addition & 0 deletions CopyrightWaivers.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,5 @@ Scottmitch | Scott Mitchell, [email protected], Apple Inc.
retronym | Jason Zaugg, [email protected], Lightbend Inc.
sullis | Sean Sullivan, [email protected]
tomislavhofman | Tomislav Hofman, [email protected]
niteshkant | Nitesh Kant, [email protected], Apple Inc.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public interface Publisher<T> {
| <a name="1.8">8</a> | If a `Subscription` is cancelled its `Subscriber` MUST eventually stop being signaled. |
| [:bulb:](#1.8 "1.8 explained") | *The intent of this rule is to make sure that Publishers respect a Subscriber’s request to cancel a Subscription when Subscription.cancel() has been called. The reason for **eventually** is because signals can have propagation delay due to being asynchronous.* |
| <a name="1.9">9</a> | `Publisher.subscribe` MUST call `onSubscribe` on the provided `Subscriber` prior to any other signals to that `Subscriber` and MUST [return normally](#term_return_normally), except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject the `Subscriber`) is by calling `onError` (after calling `onSubscribe`). |
| [:bulb:](#1.9 "1.9 explained") | *The intent of this rule is to make sure that `onSubscribe` is always signalled before any of the other signals, so that initialization logic can be executed by the Subscriber when the signal is received. Also `onSubscribe` MUST only be called at most once, [see [2.12](#2.12)]. If the supplied `Subscriber` is `null`, there is nowhere else to signal this but to the caller, which means a `java.lang.NullPointerException` must be thrown. Examples of possible situations: A stateful Publisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, or in a [terminal state](#term_terminal_state).* |
| [:bulb:](#1.9 "1.9 explained") | *The intent of this rule is to make sure that `onSubscribe` is always signalled before any of the other signals, so that initialization logic can be executed by the Subscriber when the signal is received. If `onSubscribe()` is signalled asynchronously, the `Publisher` should make sure that the receive of `Subscriber` happens-before signalling `onSubscribe()` on that `Subscriber`. See [JMM definition of Happens-Before in section 17.4.5](https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.4.5). Also `onSubscribe` MUST only be called at most once, [see [2.12](#2.12)]. If the supplied `Subscriber` is `null`, there is nowhere else to signal this but to the caller, which means a `java.lang.NullPointerException` must be thrown. Examples of possible situations: A stateful Publisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, or in a [terminal state](#term_terminal_state).* |
| <a name="1.10">10</a> | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. |
| [:bulb:](#1.10 "1.10 explained") | *The intent of this rule is to have callers of `subscribe` be aware that a generic Publisher and a generic Subscriber cannot be assumed to support being attached multiple times. Furthermore, it also mandates that the semantics of `subscribe` must be upheld no matter how many times it is called.* |
| <a name="1.11">11</a> | A `Publisher` MAY support multiple `Subscriber`s and decides whether each `Subscription` is unicast or multicast. |
Expand Down

0 comments on commit 24f35af

Please sign in to comment.