Skip to content

Commit

Permalink
stream: Added utility to be able to process asynchronously provided e…
Browse files Browse the repository at this point in the history
…lements using the synchronous API.
  • Loading branch information
kelemen committed Jan 30, 2022
1 parent cfed6bc commit 2f71495
Show file tree
Hide file tree
Showing 13 changed files with 1,065 additions and 0 deletions.
2 changes: 2 additions & 0 deletions subprojects/jtrim-stream/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,5 @@ that is a small detail at this point.
- `FluentSeqGroupMapper`: A fluent style builder for `SeqMapper`.
- `FluentSeqConsumer`: A fluent style builder for `SeqMapper`.
- `FluentSeqGroupConsumer`: A fluent style builder for `SeqGroupConsumer`.
- `AsyncProducers`: Contains factory methods connect an asynchronous data source
to a synchronous producer.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.jtrim2.stream;

interface AsyncElementProcess {
public void finish(Throwable error);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.jtrim2.stream;

import org.jtrim2.cancel.CancellationToken;

/**
* Defines a sink to which asynchronous element providers can put their received elements.
* This is generally used to supply elements to one of the synchronous producers of {@link AsyncProducers}.
* <P>
* Once there won't be anymore elements added to this sink, it must be marked as finished by the
* {@link #finish(Throwable) finish} method. Failing to finish the process might prevent the consumer
* from ever terminating.
*
* <h2>Thread safety</h2>
* Implementations of this interface are required to be safe to use by multiple
* threads concurrently.
*
* <h3>Synchronization transparency</h3>
* Adding an element to the sink might block and wait until elements are consumed and there
* is more room for additional elements in the sink. Finishing adding elements never waits, so
* it can be safely called while holding a lock.
*
* @param <T> the type of the elements which can be added to this sink
*
* @see AsyncProducers
*/
public interface AsyncElementSink<T> {
/**
* Adds an element to this sink to be processed. This method may block and wait, if it is full
* and its consumer needs to consume some of its elements to continue.
* <P>
* <B>Note</B>: If this method throws an exception, that exception might originate from the
* consumer. In this case, it is futile to try adding more elements, as this method will keep
* failing.
*
* @param cancelToken the cancellation token which can signal cancellation, if waiting for
* adding the element should be canceled. Cancellation detection is done on a best effort
* basis, and there is no guarantee that it will be detected. However, if it is detected, then
* an {@link org.jtrim2.cancel.OperationCanceledException OperationCanceledException} will
* be thrown, and the element will not be added. This argument cannot be {@code null}.
* @param element the element to be added to the queue. The sink does not support {@code null}
* elements, so this argument cannot be {@code null}.
* @return {@code true} if the element was added to the sink, {@code false} if
* {@link #finish(Throwable) finish} was called with no failure.
*
* @throws org.jtrim2.cancel.OperationCanceledException thrown if cancellation was detected. Note
* that it is possible that this exception is thrown if the process was finished with failure
* and the failure exception was this kind of exception. If such distinction need to be made,
* then checking the passed cancellation token is recommended. The edge case when both
* the cancellation token signals cancellation and the processed failed with cancellation
* exception, it is not possible to distinguish between the two scenarios unless the consumer
* can be monitored.
* @throws Exception thrown if the process finished with a failure. Either by the consumer
* failing or this sink (via the {@link #finish(Throwable) finish} method).
*/
public boolean tryPut(CancellationToken cancelToken, T element) throws Exception;

/**
* Marks this sink completed, and prevents more elements to be added to this sink.
* <P>
* If calling this method <I>happens-before</I> calling the {@link #tryPut(CancellationToken, Object) tryPut}
* method call, then the {@code tryPut} will call will just return {@code false} without doing anything.
* <P>
* If the {@code tryPut} <I>happens-before</I> this method call, then previously added elements remain
* to be processed (i.e., they are not removed due to this {@code finish} call).
* <P>
* If this method is called concurrently with {@code tryPut}, then any one of the above two scenarios might
* happen (but not something else).
* <P>
* Calling this method multiple times has an undefined behaviour.
*
* @param error must be {@code null} if adding the elements completed normally, or
* an exception if there was an error while trying to get the elements to be added
* to this sink. Note that setting a failure will cause the consumer process
* to detect that failure and rethrow it.
*/
public void finish(Throwable error);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package org.jtrim2.stream;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.jtrim2.concurrent.Tasks;
import org.jtrim2.executor.TaskExecutor;
import org.jtrim2.utils.ExceptionHelper;

/**
* Defines an asynchronously populatable, but synchronously processed producer.
* That is, an asynchronously populatable sink which provides the source for the
* synchronous producers via a queue.
* <P>
* Note: Usually you want to create instances of this class via the factory methods in
* {@link AsyncProducers}.
*
* <h2>Thread safety</h2>
* Instances of this class can be shared by multiple threads safely, and its
* methods can be called concurrently.
*
* <h3>Synchronization transparency</h3>
* Methods of this class are <I>synchronization transparent</I>, so they can be
* called in any context (e.g.: while holding a lock).
*
* @param <T> the type of the produced elements
*
* @see AsyncProducers
*/
public final class AsyncProducerRef<T> {
private final AsyncElementSink<T> elementSink;
private final Supplier<SeqProducer<T>> producerFactory;

/**
* Creates a new instance with the given asynchronously populatable sink providing the
* data for the given producers (created by the give factory).
*
* @param elementSink the asynchronously populatable sink providing the data for the
* synchronous producers. This argument will be returned as is by the {@link #getElementSink()} method.
* This argument cannot be {@code null}.
* @param producerFactory the factory creating producers providing the elements put into the sink.
* All producers share the same sink. This argument will be returned as is by the {@link #getProducerFactory()}
* method. This argument cannot be {@code null}, and the factory may not return {@code null}.
*
* @see AsyncProducers
*/
public AsyncProducerRef(AsyncElementSink<T> elementSink, Supplier<SeqProducer<T>> producerFactory) {
this.elementSink = Objects.requireNonNull(elementSink, "elementSink");
this.producerFactory = Objects.requireNonNull(producerFactory, "producerFactory");
}

/**
* Returns the asynchronously populatable sink providing the data for the synchronous producers.
*
* @return the asynchronously populatable sink providing the data for the synchronous producers.
* This method never returns {@code null}.
*/
public AsyncElementSink<T> getElementSink() {
return elementSink;
}

/**
* Returns the factory creating producers providing the elements put into the sink.
* All producers created by the returned factory will share the {@link #getElementSink() sink},
* and you can run any number of them in parallel to process the elements put into the sink
* faster.
*
* @return the factory creating producers providing the elements put into the sink.
* This method never returns {@code null}.
*/
public Supplier<SeqProducer<T>> getProducerFactory() {
return producerFactory;
}

private Supplier<SeqProducer<T>> getSafeProducerFactory() {
Supplier<SeqProducer<T>> producerFactoryCapture = producerFactory;
return () -> Objects.requireNonNull(producerFactoryCapture.get(), "producerFactory.get()");
}

/**
* Returns a new producer created by the {@link #getProducerFactory() producer factory}.
*
* @return a new producer created by the {@link #getProducerFactory() producer factory}.
* This method never returns {@code null}.
*
* @throws NullPointerException thrown if the producer factory returns {@code null}
*/
public SeqProducer<T> newSeqProducer() {
return Objects.requireNonNull(producerFactory.get(), "producerFactory.get()");
}

/**
* Returns a new producer producing elements put into the {@link #getElementSink() sink}
* on the given number of threads. One of the processing thread will be the thread on
* which the producer is called, and only one less will run in the provided executor.
* Note: It is assumed that the given executor is able to run {@code maxThreadCount - 1}
* tasks concurrently. Note that it is not error if the given executor is unable to
* execute tasks (except that it has to be able to eventually complete scheduled tasks
* to it).
*
* @param executor the executor on which the background processing is done. Note
* that one worker will run synchronously, so if you've specified 1 for {@code maxThreadCount},
* then the executor will not be used. This argument cannot be {@code null}.
* @param maxThreadCount the number of concurrent workers processing the elements
* put into the sink. This argument must greater than or equal to 1.
* @return a new producer producing elements put into the {@link #getElementSink() sink}
* on the given number of threads. This method never returns {@code null}.
*/
public SeqGroupProducer<T> newSeqGroupProducer(TaskExecutor executor, int maxThreadCount) {
Objects.requireNonNull(executor, "executor");
ExceptionHelper.checkArgumentInRange(maxThreadCount, 1, Integer.MAX_VALUE, "maxThreadCount");

int extraThreadCount = maxThreadCount - 1;
if (extraThreadCount == 0) {
return newSeqProducer().toFluent().toSingleGroupProducer().unwrap();
}

Supplier<SeqProducer<T>> producerFactoryCapture = getSafeProducerFactory();

return (cancelToken, seqConsumer) -> {
AtomicReference<Throwable> errorRef = new AtomicReference<>();
BackgroundWorkerManager workerManager = new BackgroundWorkerManager(
executor,
Tasks.noOpTask(),
failure -> errorRef.compareAndSet(null, failure)
);

workerManager.startWorkers(cancelToken, extraThreadCount, taskCancelToken -> {
seqConsumer.consumeAll(cancelToken, producerFactoryCapture.get());
});

try {
seqConsumer.consumeAll(cancelToken, producerFactoryCapture.get());
} catch (Throwable ex) {
errorRef.compareAndSet(null, ex);
}

workerManager.waitForWorkers();
ExceptionHelper.rethrowCheckedIfNotNull(errorRef.get(), Exception.class);
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package org.jtrim2.stream;

/**
* Defines factory methods for asynchronously populated synchronous producers.
*/
public final class AsyncProducers {
/**
* Returns an asynchronously populatable sink and synchronous producer(s) producing the elements
* put into the sink. It is possible to create multiple producers, in which case it is undefined
* which element put into the sink will be produced by which producer. However, an element
* will only be produced by exactly one producer. Producers that are never asked to produce
* elements can be ignored (i.e. they won't prevent elements to be processed).
* <P>
* Note: The returned producers will only return after the sink is marked as
* {@link AsyncElementSink#finish(Throwable) finished} or if the consumer of any of the producers
* fail with an exception.
* <P>
* See the following example for a trivial use-case:
* <pre>{@code
* AsyncProducerRef<String> ref = AsyncProducers.createAsyncSourcedProducer(20);
* AsyncElementSink<T> sink = ref.getElementSink();
*
* myAsyncSource.omReceiveElement(str -> sink.tryPut(Cancellation.UNCANCELABLE_TOKEN, str));
* myAsyncSource.onClose(exception -> ref.getElementSink().finish(exception));
*
* ref.newSeqProducer()
* .toFluent()
* .withContextFreeConsumer(str -> process(str))
* .execute(Cancellation.UNCANCELABLE_TOKEN);
* }</pre>
* <P>
* This method is effectively equivalent to {@code createAsyncSourcedProducer(maxQueueSize, maxQueueSize)}.
*
* @param <T> the type of the elements produced
* @param maxQueueSize the maximum number of outstanding elements the queue storing the elements
* of the sink can store. The elements need to be stored in the queue until they were retrieved
* by a producer. This argument must be greater than or equal to 1.
* @return an asynchronously populatable sink and synchronous producer(s) producing the elements
* put into the sink. This method never returns {@code null}.
*/
public static <T> AsyncProducerRef<T> createAsyncSourcedProducer(int maxQueueSize) {
return createAsyncSourcedProducer(maxQueueSize, maxQueueSize);
}

/**
* Returns an asynchronously populatable sink and synchronous producer(s) producing the elements
* put into the sink. It is possible to create multiple producers, in which case it is undefined
* which element put into the sink will be produced by which producer. However, an element
* will only be produced by exactly one producer. Producers that are never asked to produce
* elements can be ignored (i.e. they won't prevent elements to be processed).
* <P>
* Note: The returned producers will only return after the sink is marked as
* {@link AsyncElementSink#finish(Throwable) finished} or if the consumer of any of the producers
* fail with an exception.
* <P>
* See the following example for a trivial use-case:
* <pre>{@code
* AsyncProducerRef<String> ref = AsyncProducers.createAsyncSourcedProducer(1000, 10);
* AsyncElementSink<T> sink = ref.getElementSink();
*
* myAsyncSource.omReceiveElement(str -> sink.tryPut(Cancellation.UNCANCELABLE_TOKEN, str));
* myAsyncSource.onClose(exception -> ref.getElementSink().finish(exception));
*
* ref.newSeqProducer()
* .toFluent()
* .withContextFreeConsumer(str -> process(str))
* .execute(Cancellation.UNCANCELABLE_TOKEN);
* }</pre>
*
* @param <T> the type of the elements produced
* @param maxQueueSize the maximum number of outstanding elements the queue storing the elements
* of the sink can store. The elements need to be stored in the queue until they were retrieved
* by a producer. This argument must be greater than or equal to 1.
* @param initialQueueCapacity an initial capacity of the queue should reserve initially. Setting
* this parameter to any allowed value is only an optimization and carry no defined semantics. This
* argument must be greater than or equal to zero, but no greater than {@code maxQueueSize}.
* @return an asynchronously populatable sink and synchronous producer(s) producing the elements
* put into the sink. This method never returns {@code null}.
*/
public static <T> AsyncProducerRef<T> createAsyncSourcedProducer(int maxQueueSize, int initialQueueCapacity) {
DefaultAsyncElementSource<T> source = new DefaultAsyncElementSource<>(maxQueueSize, initialQueueCapacity);
return new AsyncProducerRef<>(source, () -> new AsyncSourceProducer<>(source));
}

private AsyncProducers() {
throw new AssertionError();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.jtrim2.stream;

import java.util.Objects;
import org.jtrim2.cancel.CancellationToken;
import org.jtrim2.utils.ExceptionHelper;

final class AsyncSourceProducer<T> implements SeqProducer<T> {
private final PollableElementSource<? extends T> source;

public AsyncSourceProducer(PollableElementSource<? extends T> source) {
this.source = Objects.requireNonNull(source, "source");
}

@Override
public void transferAll(CancellationToken cancelToken, ElementConsumer<? super T> consumer) throws Exception {
Objects.requireNonNull(cancelToken, "cancelToken");
Objects.requireNonNull(consumer, "consumer");

Throwable transferFailure = null;
try {
T element;
while ((element = source.getNext(cancelToken)) != null) {
consumer.processElement(element);
}
} catch (Throwable ex) {
transferFailure = ex;
}
source.finish(transferFailure);
ExceptionHelper.rethrowCheckedIfNotNull(transferFailure, Exception.class);
}
}
Loading

0 comments on commit 2f71495

Please sign in to comment.