-
Notifications
You must be signed in to change notification settings - Fork 533
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Asynchronous Stream Publisher - Example #377
Open
ttulka
wants to merge
7
commits into
reactive-streams:master
Choose a base branch
from
ttulka:master
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 2 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
ce0e13e
Asynchronous Stream Publisher - Example
ttulka b3eff1d
Adapting to Java 1.6
ttulka fb3a7d6
review #1
ttulka 1355ef0
review #2
ttulka cc96801
comment text adapted
ttulka 083fbca
supplier publisher baked on the interable publisher
ttulka 9725ef2
comment about end of stream added
ttulka File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
288 changes: 288 additions & 0 deletions
288
examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,288 @@ | ||
/************************************************************************ | ||
* Licensed under Public Domain (CC0) * | ||
* * | ||
* To the extent possible under law, the person who associated CC0 with * | ||
* this code has waived all copyright and related or neighboring * | ||
* rights to this code. * | ||
* * | ||
* You should have received a copy of the CC0 legalcode along with this * | ||
* work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.* | ||
************************************************************************/ | ||
|
||
package org.reactivestreams.example.unicast; | ||
|
||
import java.util.concurrent.ConcurrentLinkedQueue; | ||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
import org.reactivestreams.Publisher; | ||
import org.reactivestreams.Subscriber; | ||
import org.reactivestreams.Subscription; | ||
|
||
/** | ||
* AsyncStreamPublisher is an implementation of Reactive Streams `Publisher` | ||
* which executes asynchronously, using a provided `Executor` and produces elements | ||
* from a given `StreamSupplier` in a "unicast" configuration to its `Subscribers`. | ||
* <p> | ||
* NOTE: The code below uses a lot of try-catches to show the reader where exceptions can be expected, and where they are forbidden. | ||
*/ | ||
class AsyncStreamPublisher<T> implements Publisher<T> { | ||
|
||
private final static int DEFAULT_BATCHSIZE = 1024; | ||
|
||
private final StreamSupplier<T> supplier; // This is our data source / generator | ||
private final Executor executor; // This is our thread pool, which will make sure that our Publisher runs asynchronously to its Subscribers | ||
private final int batchSize; // In general, if one uses an `Executor`, one should be nice nad not hog a thread for too long, this is the cap for that, in elements | ||
|
||
public AsyncStreamPublisher(final StreamSupplier<T> supplier, final Executor executor) { | ||
this(supplier, DEFAULT_BATCHSIZE, executor); | ||
} | ||
|
||
public AsyncStreamPublisher(final StreamSupplier<T> supplier, final int batchSize, final Executor executor) { | ||
if (supplier == null) { | ||
throw null; | ||
} | ||
if (executor == null) { | ||
throw null; | ||
} | ||
if (batchSize < 1) { | ||
throw new IllegalArgumentException("batchSize must be greater than zero!"); | ||
} | ||
this.supplier = supplier; | ||
this.executor = executor; | ||
this.batchSize = batchSize; | ||
} | ||
|
||
@Override | ||
public void subscribe(final Subscriber<? super T> s) { | ||
// As per rule 1.11, we have decided to support multiple subscribers in a unicast configuration | ||
// for this `Publisher` implementation. | ||
// As per 2.13, this method must return normally (i.e. not throw) | ||
new SubscriptionImpl(s).init(); | ||
} | ||
|
||
static interface StreamSupplier<T> { | ||
T get(); | ||
} | ||
|
||
// These represent the protocol of the `AsyncIterablePublishers` SubscriptionImpls | ||
static interface Signal { | ||
} | ||
|
||
enum Cancel implements Signal {Instance} | ||
|
||
enum Subscribe implements Signal {Instance} | ||
|
||
enum Send implements Signal {Instance} | ||
|
||
static final class Request implements Signal { | ||
final long n; | ||
|
||
Request(final long n) { | ||
this.n = n; | ||
} | ||
} | ||
|
||
// This is our implementation of the Reactive Streams `Subscription`, | ||
// which represents the association between a `Publisher` and a `Subscriber`. | ||
final class SubscriptionImpl implements Subscription, Runnable { | ||
final Subscriber<? super T> subscriber; // We need a reference to the `Subscriber` so we can talk to it | ||
private boolean cancelled = false; // This flag will track whether this `Subscription` is to be considered cancelled or not | ||
private long demand = 0; // Here we track the current demand, i.e. what has been requested but not yet delivered | ||
private T nextElementToBeSent = null; // we need to fetch an element by the subscription initialization to ensure the stream is not empty | ||
|
||
SubscriptionImpl(final Subscriber<? super T> subscriber) { | ||
// As per rule 1.09, we need to throw a `java.lang.NullPointerException` if the `Subscriber` is `null` | ||
if (subscriber == null) { | ||
throw null; | ||
} | ||
this.subscriber = subscriber; | ||
} | ||
|
||
// This `ConcurrentLinkedQueue` will track signals that are sent to this `Subscription`, like `request` and `cancel` | ||
private final ConcurrentLinkedQueue<Signal> inboundSignals = new ConcurrentLinkedQueue<Signal>(); | ||
|
||
// We are using this `AtomicBoolean` to make sure that this `Subscription` doesn't run concurrently with itself, | ||
// which would violate rule 1.3 among others (no concurrent notifications). | ||
private final AtomicBoolean on = new AtomicBoolean(false); | ||
|
||
// This method will register inbound demand from our `Subscriber` and validate it against rule 3.9 and rule 3.17 | ||
private void doRequest(final long n) { | ||
if (n < 1) { | ||
terminateDueTo(new IllegalArgumentException(subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements.")); | ||
} else if (demand + n < 1) { | ||
// As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` we treat the signalled demand as "effectively unbounded" | ||
demand = Long.MAX_VALUE; // Here we protect from the overflow and treat it as "effectively unbounded" | ||
doSend(); // Then we proceed with sending data downstream | ||
} else { | ||
demand += n; // Here we record the downstream demand | ||
doSend(); // Then we can proceed with sending data downstream | ||
} | ||
} | ||
|
||
// This handles cancellation requests, and is idempotent, thread-safe and not synchronously performing heavy computations as specified in rule 3.5 | ||
private void doCancel() { | ||
cancelled = true; | ||
} | ||
|
||
// Instead of executing `subscriber.onSubscribe` synchronously from within `Publisher.subscribe` | ||
// we execute it asynchronously, this is to avoid executing the user code (`Iterable.iterator`) on the calling thread. | ||
// It also makes it easier to follow rule 1.9 | ||
private void doSubscribe() { | ||
if (!cancelled) { | ||
// Deal with setting up the subscription with the subscriber | ||
try { | ||
subscriber.onSubscribe(this); | ||
} catch (final Throwable t) { // Due diligence to obey 2.13 | ||
terminateDueTo(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem with crashing |
||
} | ||
|
||
// Deal with already complete iterators promptly | ||
boolean hasElements = false; | ||
try { | ||
// Try to fetch an element from a stream to ensure the stream is not empty, | ||
// this will be sent by the first calling of doSend | ||
nextElementToBeSent = supplier.get(); | ||
hasElements = nextElementToBeSent != null; | ||
} catch (final Throwable t) { | ||
terminateDueTo(t); // If hasNext throws, there's something wrong and we need to signal onError as per 1.2, 1.4 | ||
return; | ||
} | ||
|
||
// If we don't have anything to deliver, we're already done, so lets do the right thing and | ||
// not wait for demand to deliver `onComplete` as per rule 1.2 and 1.3 | ||
if (!hasElements) { | ||
try { | ||
doCancel(); // Rule 1.6 says we need to consider the `Subscription` cancelled when `onComplete` is signalled | ||
subscriber.onComplete(); | ||
} catch (final Throwable t) { // As per rule 2.13, `onComplete` is not allowed to throw exceptions, so we do what we can, and log this. | ||
(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", t)).printStackTrace(System.err); | ||
} | ||
} | ||
} | ||
} | ||
|
||
// This is our behavior for producing elements downstream | ||
private void doSend() { | ||
try { | ||
// In order to play nice with the `Executor` we will only send at-most `batchSize` before | ||
// rescheduing ourselves and relinquishing the current thread. | ||
int leftInBatch = batchSize; | ||
do { | ||
T next; | ||
boolean hasNext; | ||
try { | ||
next = supplier.get(); // We have already checked `hasNext` when subscribing, so we can fall back to testing -after- `next` is called. | ||
hasNext = next != null; // Need to keep track of End-of-Stream | ||
} catch (final Throwable t) { | ||
terminateDueTo(t); // If `next` or `hasNext` throws (they can, since it is user-provided), we need to treat the stream as errored as per rule 1.4 | ||
return; | ||
} finally { | ||
subscriber.onNext(nextElementToBeSent); // Then we signal the next element downstream to the `Subscriber` | ||
} | ||
nextElementToBeSent = next; // The next element is the actually read element | ||
if (!hasNext) { // If we are at End-of-Stream | ||
doCancel(); // We need to consider this `Subscription` as cancelled as per rule 1.6 | ||
subscriber.onComplete(); // Then we signal `onComplete` as per rule 1.2 and 1.5 | ||
} | ||
} while (!cancelled // This makes sure that rule 1.8 is upheld, i.e. we need to stop signalling "eventually" | ||
&& --leftInBatch > 0 // This makes sure that we only send `batchSize` number of elements in one go (so we can yield to other Runnables) | ||
&& --demand > 0); // This makes sure that rule 1.1 is upheld (sending more than was demanded) | ||
|
||
if (!cancelled && demand > 0) { // If the `Subscription` is still alive and well, and we have demand to satisfy, we signal ourselves to send more data | ||
signal(Send.Instance); | ||
} | ||
} catch (final Throwable t) { | ||
// We can only getNextResult here if `onNext` or `onComplete` threw, and they are not allowed to according to 2.13, so we can only cancel and log here. | ||
doCancel(); // Make sure that we are cancelled, since we cannot do anything else since the `Subscriber` is faulty. | ||
(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onNext or onComplete.", t)).printStackTrace(System.err); | ||
} | ||
} | ||
|
||
// This is a helper method to ensure that we always `cancel` when we signal `onError` as per rule 1.6 | ||
private void terminateDueTo(final Throwable t) { | ||
cancelled = true; // When we signal onError, the subscription must be considered as cancelled, as per rule 1.6 | ||
try { | ||
subscriber.onError(t); // Then we signal the error downstream, to the `Subscriber` | ||
} catch (final Throwable t2) { // If `onError` throws an exception, this is a spec violation according to rule 1.9, and all we can do is to log it. | ||
(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err); | ||
} | ||
} | ||
|
||
// What `signal` does is that it sends signals to the `Subscription` asynchronously | ||
private void signal(final Signal signal) { | ||
if (inboundSignals.offer(signal)) { // No need to null-check here as ConcurrentLinkedQueue does this for us | ||
tryScheduleToExecute(); // Then we try to schedule it for execution, if it isn't already | ||
} | ||
} | ||
|
||
// This is the main "event loop" if you so will | ||
@Override | ||
public final void run() { | ||
if (on.get()) { // establishes a happens-before relationship with the end of the previous run | ||
try { | ||
final Signal s = inboundSignals.poll(); // We take a signal off the queue | ||
if (!cancelled) { // to make sure that we follow rule 1.8, 3.6 and 3.7 | ||
// Below we simply unpack the `Signal`s and invoke the corresponding methods | ||
if (s instanceof Request) { | ||
doRequest(((Request) s).n); | ||
} else if (s == Send.Instance) { | ||
doSend(); | ||
} else if (s == Cancel.Instance) { | ||
doCancel(); | ||
} else if (s == Subscribe.Instance) { | ||
doSubscribe(); | ||
} | ||
} | ||
} finally { | ||
on.set(false); // establishes a happens-before relationship with the beginning of the next run | ||
if (!inboundSignals.isEmpty()) { // If we still have signals to process | ||
tryScheduleToExecute(); // Then we try to schedule ourselves to execute again | ||
} | ||
} | ||
} | ||
} | ||
|
||
// This method makes sure that this `Subscription` is only running on one Thread at a time, | ||
// this is important to make sure that we follow rule 1.3 | ||
private final void tryScheduleToExecute() { | ||
if (on.compareAndSet(false, true)) { | ||
try { | ||
executor.execute(this); | ||
} catch (Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully | ||
if (!cancelled) { | ||
doCancel(); // First of all, this failure is not recoverable, so we need to follow rule 1.4 and 1.6 | ||
try { | ||
terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t)); | ||
} finally { | ||
inboundSignals.clear(); // We're not going to need these anymore | ||
// This subscription is cancelled by now, but letting it become schedulable again means | ||
// that we can drain the inboundSignals queue if anything arrives after clearing | ||
on.set(false); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
// Our implementation of `Subscription.request` sends a signal to the Subscription that more elements are in demand | ||
@Override | ||
public void request(final long n) { | ||
signal(new Request(n)); | ||
} | ||
|
||
// Our implementation of `Subscription.cancel` sends a signal to the Subscription that the `Subscriber` is not interested in any more elements | ||
@Override | ||
public void cancel() { | ||
signal(Cancel.Instance); | ||
} | ||
|
||
// The reason for the `executeQuery` method is that we want to ensure the `SubscriptionImpl` | ||
// is completely constructed before it is exposed to the thread pool, therefor this | ||
// method is only intended to be invoked once, and immediately after the constructor has | ||
// finished. | ||
void init() { | ||
signal(Subscribe.Instance); | ||
} | ||
} | ||
} |
74 changes: 74 additions & 0 deletions
74
examples/src/test/java/org/reactivestreams/example/unicast/AsyncStreamPublisherTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
/************************************************************************ | ||
* Licensed under Public Domain (CC0) * | ||
* * | ||
* To the extent possible under law, the person who associated CC0 with * | ||
* this code has waived all copyright and related or neighboring * | ||
* rights to this code. * | ||
* * | ||
* You should have received a copy of the CC0 legalcode along with this * | ||
* work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.* | ||
************************************************************************/ | ||
|
||
package org.reactivestreams.example.unicast; | ||
|
||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
|
||
import org.reactivestreams.Publisher; | ||
import org.reactivestreams.tck.PublisherVerification; | ||
import org.reactivestreams.tck.TestEnvironment; | ||
import org.testng.annotations.AfterClass; | ||
import org.testng.annotations.BeforeClass; | ||
import org.testng.annotations.Test; | ||
|
||
import static org.reactivestreams.example.unicast.AsyncStreamPublisher.StreamSupplier; | ||
|
||
@Test // Must be here for TestNG to find and run this, do not remove | ||
public class AsyncStreamPublisherTest extends PublisherVerification<Integer> { | ||
|
||
private ExecutorService e; | ||
|
||
@BeforeClass | ||
void before() { | ||
e = Executors.newFixedThreadPool(4); | ||
} | ||
|
||
@AfterClass | ||
void after() { | ||
if (e != null) { | ||
e.shutdown(); | ||
} | ||
} | ||
|
||
public AsyncStreamPublisherTest() { | ||
super(new TestEnvironment()); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
@Override | ||
public Publisher<Integer> createPublisher(final long elements) { | ||
assert (elements <= maxElementsFromPublisher()); | ||
return new AsyncStreamPublisher(new StreamSupplier<Integer>() { | ||
private int at; | ||
@Override | ||
public Integer get() { | ||
return at < elements ? at++ : null; | ||
} | ||
}, e); | ||
} | ||
|
||
@Override | ||
public Publisher<Integer> createFailedPublisher() { | ||
return new AsyncStreamPublisher<Integer>(new StreamSupplier<Integer>() { | ||
@Override | ||
public Integer get() { | ||
throw new RuntimeException("Error state signal!"); | ||
} | ||
}, e); | ||
} | ||
|
||
@Override | ||
public long maxElementsFromPublisher() { | ||
return Integer.MAX_VALUE; | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cancelled
being true here is impossible because the only way to get that true is by submitting aSubscription
to theSubscriber
that happens once and under this condition.