diff --git a/src/main/java/no/digipost/DiggCollectors.java b/src/main/java/no/digipost/DiggCollectors.java
index 833fc15..317156d 100644
--- a/src/main/java/no/digipost/DiggCollectors.java
+++ b/src/main/java/no/digipost/DiggCollectors.java
@@ -22,6 +22,7 @@
import no.digipost.concurrent.OneTimeAssignment;
import no.digipost.stream.EmptyResultIfEmptySourceCollector;
import no.digipost.stream.NonEmptyStream;
+import no.digipost.stream.SubjectFilter;
import no.digipost.tuple.Tuple;
import no.digipost.tuple.ViewableAsTuple;
import no.digipost.util.ViewableAsOptional;
@@ -34,6 +35,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiFunction;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -49,6 +51,37 @@
public final class DiggCollectors {
+ /**
+ * Create a collector used for finding and accumulating a specific result,
+ * where applying a filter is not adequate. The returned
+ * {@link SubjectFilter subject filter} is used for
+ * further specifying the final (compound) condition for the result to find.
+ *
+ * When searching for a result in context of other elements, you must carefully
+ * ensure the source to have appropriate ordering and parallelity (or probably rather lack thereof)
+ * for the correct and expected operation of the collector.
+ *
+ * Note: because {@link Collector collectors} are applied to all elements
+ * in a Stream, care should be taken to exclude non-applicable elements e.g. using
+ * a {@link Stream#filter(Predicate) filter}, and {@link Stream#limit(long) limit}
+ * especially for infinite Streams, before collecting.
+ *
+ *
+ * @param The element type which is inspected by the subject filter. This type is
+ * typically the same as the element type of the Stream the final collector
+ * is applied to.
+ *
+ * @param subjectElement the predicate for selecting a subject element for further use
+ * in accumulating a result from applying the final collector.
+ *
+ * @return the subject filter for the collector, which must be further specified
+ * to build the final collector
+ */
+ public static SubjectFilter find(Predicate subjectElement) {
+ return new SubjectFilter<>(subjectElement);
+ }
+
+
/**
* A multituple is similar to a multimap in that it consists of one {@link Tuple#first() first} value and a List of
* values as the {@link Tuple#second() second} value,
diff --git a/src/main/java/no/digipost/stream/AtomicReferenceFolder.java b/src/main/java/no/digipost/stream/AtomicReferenceFolder.java
new file mode 100644
index 0000000..53474da
--- /dev/null
+++ b/src/main/java/no/digipost/stream/AtomicReferenceFolder.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) Posten Norge AS
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package no.digipost.stream;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.BiPredicate;
+import java.util.function.Predicate;
+
+/**
+ * A consumer function for "folding" a value with an {@link AtomicReference}.
+ *
+ * @param The type of the value this function folds.
+ */
+@FunctionalInterface
+interface AtomicReferenceFolder extends BiConsumer, T> {
+
+ static AtomicReferenceFolder clearReference() {
+ return (ref, value) -> ref.set(null);
+ }
+
+ static AtomicReferenceFolder keepFirst(Predicate super T> predicate) {
+ return (currentRef, candidateElement) ->
+ currentRef.accumulateAndGet(candidateElement, (current, candidate) -> current == null && predicate.test(candidate) ? candidate : current);
+ }
+
+ static AtomicReferenceFolder keepLast(Predicate super T> predicate) {
+ return (currentRef, candidateElement) -> {
+ if (predicate.test(candidateElement)) {
+ currentRef.set(candidateElement);
+ }
+ };
+ }
+
+ default AtomicReferenceFolder doInsteadIf(Predicate super T> valuePredicate, AtomicReferenceFolder foldOperation) {
+ return doInsteadIf((ref, value) -> valuePredicate.test(value), foldOperation);
+ }
+
+ default AtomicReferenceFolder doInsteadIf(BiPredicate super AtomicReference, ? super T> refAndValuePredicate, AtomicReferenceFolder foldOperation) {
+ return (ref, value) -> {
+ if (refAndValuePredicate.test(ref, value)) {
+ foldOperation.accept(ref, value);
+ } else {
+ this.accept(ref, value);
+ }
+ };
+ }
+}
diff --git a/src/main/java/no/digipost/stream/AtomicReferenceFoldingCollector.java b/src/main/java/no/digipost/stream/AtomicReferenceFoldingCollector.java
new file mode 100644
index 0000000..91ba473
--- /dev/null
+++ b/src/main/java/no/digipost/stream/AtomicReferenceFoldingCollector.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright (C) Posten Norge AS
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package no.digipost.stream;
+
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+
+import static java.util.Collections.unmodifiableSet;
+import static java.util.stream.Collector.Characteristics.CONCURRENT;
+
+final class AtomicReferenceFoldingCollector implements Collector, Optional> {
+
+ private final AtomicReferenceFolder accumulator;
+ private final Set characteristics;
+
+ AtomicReferenceFoldingCollector(AtomicReferenceFolder accumulator) {
+ this(accumulator, EnumSet.of(CONCURRENT));
+ }
+
+ AtomicReferenceFoldingCollector(AtomicReferenceFolder accumulator, Set characteristics) {
+ this.accumulator = accumulator;
+ this.characteristics = unmodifiableSet(characteristics);
+ }
+
+
+ @Override
+ public AtomicReferenceFolder accumulator() {
+ return accumulator;
+ }
+
+ @Override
+ public Set characteristics() {
+ return characteristics;
+ }
+
+ /**
+ * The combiner, while thread-safe, has essentially undefined behavior if combining two
+ * found elements, because there is no way to tell if one or the other should be prioritized.
+ * In all cases of combining one found element with a not found, the found element will be
+ * returned from the function.
+ */
+ @Override
+ public BinaryOperator> combiner() {
+ return (ref1, ref2) -> {
+ ref1.compareAndSet(null, ref2.get());
+ return ref1;
+ };
+ }
+
+ @Override
+ public Function, Optional> finisher() {
+ return ref -> Optional.ofNullable(ref.get());
+ }
+
+ @Override
+ public Supplier> supplier() {
+ return AtomicReference::new;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/no/digipost/stream/SubjectFilter.java b/src/main/java/no/digipost/stream/SubjectFilter.java
new file mode 100644
index 0000000..532f624
--- /dev/null
+++ b/src/main/java/no/digipost/stream/SubjectFilter.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (C) Posten Norge AS
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package no.digipost.stream;
+
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.stream.Collector;
+
+import static java.util.function.Predicate.isEqual;
+import static no.digipost.stream.AtomicReferenceFolder.clearReference;
+import static no.digipost.stream.AtomicReferenceFolder.keepFirst;
+import static no.digipost.stream.AtomicReferenceFolder.keepLast;
+
+/**
+ * The initial subject filter for building a searching {@code Collector}.
+ * The {@link Collector} is acquired by invoking a method to finalize the
+ * (compound) condition for accumulating the result across the elements
+ * the collector will be applied to.
+ *
+ * @param the type of elements this subject filter inspects
+ *
+ * @see #keepFirstNotFollowedBy(Predicate)
+ * @see #keepLastNotFollowedBy(Predicate)
+ */
+public final class SubjectFilter {
+ private final Predicate subjectElement;
+
+ public SubjectFilter(Predicate subjectElement) {
+ this.subjectElement = subjectElement;
+ }
+
+ public Collector> keepFirstNotFollowedBy(T cancellingElement) {
+ return keepFirstNotFollowedBy(isEqual(cancellingElement));
+ }
+
+ public Collector> keepFirstNotFollowedBy(Predicate super T> cancellingElement) {
+ return new AtomicReferenceFoldingCollector<>(keepFirst(subjectElement).doInsteadIf(cancellingElement, clearReference()));
+ }
+
+ public Collector> keepLastNotFollowedBy(T cancellingElement) {
+ return keepLastNotFollowedBy(isEqual(cancellingElement));
+ }
+
+ public Collector> keepLastNotFollowedBy(Predicate super T> cancellingElement) {
+ return new AtomicReferenceFoldingCollector<>(keepLast(subjectElement).doInsteadIf(cancellingElement, clearReference()));
+ }
+}
diff --git a/src/test/java/no/digipost/DiggCollectorsTest.java b/src/test/java/no/digipost/DiggCollectorsTest.java
index 49cac1e..0f1bb6c 100644
--- a/src/test/java/no/digipost/DiggCollectorsTest.java
+++ b/src/test/java/no/digipost/DiggCollectorsTest.java
@@ -19,7 +19,10 @@
import no.digipost.tuple.Tuple;
import no.digipost.tuple.ViewableAsTuple;
import no.digipost.util.ViewableAsOptional;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.quicktheories.core.Gen;
import uk.co.probablyfine.matchers.OptionalMatchers;
@@ -31,6 +34,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Random;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import static java.util.Arrays.asList;
@@ -39,6 +44,7 @@
import static no.digipost.DiggCollectors.allowAtMostOne;
import static no.digipost.DiggCollectors.allowAtMostOneOrElseThrow;
import static no.digipost.DiggCollectors.asSuppressedExceptionsOf;
+import static no.digipost.DiggCollectors.find;
import static no.digipost.DiggCollectors.toMultimap;
import static no.digipost.DiggCollectors.toMultituple;
import static no.digipost.DiggCollectors.toSingleExceptionWithSuppressed;
@@ -50,6 +56,7 @@
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.doThrow;
@@ -207,4 +214,45 @@ public void allowAtMostOneFailsWithCustomException() {
assertThat(thrown, sameInstance(customException));
});
}
+
+ @Nested
+ class FindTest {
+
+ @Test
+ void emptyStream() {
+ assertThat(Stream.empty().collect(find(e -> true).keepLastNotFollowedBy(null)), whereNot(Optional::isPresent));
+ }
+
+ @Test
+ void findsLastItemInStreamNothingIsVoided() {
+ assertThat(Stream.of(1, 2, 3, 4).collect(find((Integer n) -> n > 2).keepLastNotFollowedBy(0)), contains(4));
+ }
+
+ @Test
+ void findsLastMatchingItemAndNotCancelledByAnyFollowingElement() {
+ assertThat(Stream.of(1, 2, 3, 4, 5, 6).collect(find((Integer n) -> n > 2 && n < 6).keepLastNotFollowedBy(0)), contains(5));
+ }
+
+ @Test
+ void findsLastMatchingItemNotFollowedByCertainValue() {
+ assertThat(Stream.of(1, 2, 3, 4, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7).collect(find((Integer n) -> n > 3 && n < 6).keepLastNotFollowedBy(8)), contains(5));
+ }
+
+ @Test
+ void findsFirstMatchingItemNotFollowedByCertainValue() {
+ assertThat(Stream.of(1, 2, 3, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7).collect(find((Integer n) -> n > 3 && n < 6).keepFirstNotFollowedBy(8)), contains(4));
+ }
+
+ @RepeatedTest(20) @Timeout(10)
+ void parallelStreamsWorksButBehaviorIsNotReallyDefined() {
+ Random random = new Random();
+ Optional result = Optional.empty();
+ while(!result.isPresent()) {
+ result = IntStream.generate(() -> random.nextInt(1000)).limit(500_000).boxed()
+ .parallel()
+ .collect(find((Integer n) -> n < 100).keepLastNotFollowedBy(n -> n > 990));
+ }
+ assertThat(result, contains(lessThan(100)));
+ }
+ }
}