-
Notifications
You must be signed in to change notification settings - Fork 533
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Asynchronous Stream Publisher - Example #377
base: master
Are you sure you want to change the base?
Conversation
Based on Supplier, for the case you don't know the count of elements (for example streaming from a file, etc). Publisher is controlled by the null value - enf of stream.
// we execute it asynchronously, this is to avoid executing the user code (`Iterable.iterator`) on the calling thread. | ||
// It also makes it easier to follow rule 1.9 | ||
private void doSubscribe() { | ||
if (!cancelled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cancelled
being true here is impossible because the only way to get that true is by submitting a Subscription
to the Subscriber
that happens once and under this condition.
try { | ||
subscriber.onSubscribe(this); | ||
} catch (final Throwable t) { // Due diligence to obey 2.13 | ||
terminateDueTo(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with crashing onXXX
is that you may end up in an undefined state where even calling onError
fails.
cancelled being true here is impossible because the only way to get that true is by submitting a Subscription to the Subscriber that happens once and under this condition.
The problem with crashing onXXX is that you may end up in an undefined state where even calling onError fails.
Hi @ttulka! For my understanding, does this solve a use-case that the Iterable-based example doesn't? |
Hi @viktorklang |
@ttulka Ah, ok, couldn't that be solved by doing a read-ahead of 1 element? (So when the def supplier2Iterable[T](s: Supplier[T]): Iterable[T] = new Iterable[T] {
override def iterator(): Iterator[T] = new Iterator[T] {
private[this] var elem = s.get()
override def hasNext(): Boolean = elem != null
override def next(): T =
if (elem == null) Iterator.empty.next()
else {
val prev = elem
elem = s.get()
prev
}
} } |
@viktorklang This makes definitely sense, actually it is the same logic I put into the iterable publisher. |
Based on Supplier, for the case you don't know the count of elements
(for example streaming from a file, etc).
Publisher is controlled by the null value - enf of stream.