From 9ec20f0e1b2edcdce82346d313260922b1d4bf9a Mon Sep 17 00:00:00 2001 From: Rune Flobakk Date: Fri, 5 Jan 2024 14:16:42 +0100 Subject: [PATCH] Add support for non-trivial Stream searching --- src/main/java/no/digipost/DiggCollectors.java | 33 ++++++++ .../stream/AtomicReferenceFolder.java | 61 +++++++++++++++ .../AtomicReferenceFoldingCollector.java | 78 +++++++++++++++++++ .../no/digipost/stream/SubjectFilter.java | 60 ++++++++++++++ .../java/no/digipost/DiggCollectorsTest.java | 48 ++++++++++++ 5 files changed, 280 insertions(+) create mode 100644 src/main/java/no/digipost/stream/AtomicReferenceFolder.java create mode 100644 src/main/java/no/digipost/stream/AtomicReferenceFoldingCollector.java create mode 100644 src/main/java/no/digipost/stream/SubjectFilter.java diff --git a/src/main/java/no/digipost/DiggCollectors.java b/src/main/java/no/digipost/DiggCollectors.java index 833fc15..317156d 100644 --- a/src/main/java/no/digipost/DiggCollectors.java +++ b/src/main/java/no/digipost/DiggCollectors.java @@ -22,6 +22,7 @@ import no.digipost.concurrent.OneTimeAssignment; import no.digipost.stream.EmptyResultIfEmptySourceCollector; import no.digipost.stream.NonEmptyStream; +import no.digipost.stream.SubjectFilter; import no.digipost.tuple.Tuple; import no.digipost.tuple.ViewableAsTuple; import no.digipost.util.ViewableAsOptional; @@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collector; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -49,6 +51,37 @@ public final class DiggCollectors { + /** + * Create a collector used for finding and accumulating a specific result, + * where applying a filter is not adequate. The returned + * {@link SubjectFilter subject filter} is used for + * further specifying the final (compound) condition for the result to find. + *

+ * When searching for a result in context of other elements, you must carefully + * ensure the source to have appropriate ordering and parallelity (or probably rather lack thereof) + * for the correct and expected operation of the collector. + *

+ * Note: because {@link Collector collectors} are applied to all elements + * in a Stream, care should be taken to exclude non-applicable elements e.g. using + * a {@link Stream#filter(Predicate) filter}, and {@link Stream#limit(long) limit} + * especially for infinite Streams, before collecting. + * + * + * @param The element type which is inspected by the subject filter. This type is + * typically the same as the element type of the Stream the final collector + * is applied to. + * + * @param subjectElement the predicate for selecting a subject element for further use + * in accumulating a result from applying the final collector. + * + * @return the subject filter for the collector, which must be further specified + * to build the final collector + */ + public static SubjectFilter find(Predicate subjectElement) { + return new SubjectFilter<>(subjectElement); + } + + /** * A multituple is similar to a multimap in that it consists of one {@link Tuple#first() first} value and a List of * values as the {@link Tuple#second() second} value, diff --git a/src/main/java/no/digipost/stream/AtomicReferenceFolder.java b/src/main/java/no/digipost/stream/AtomicReferenceFolder.java new file mode 100644 index 0000000..53474da --- /dev/null +++ b/src/main/java/no/digipost/stream/AtomicReferenceFolder.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) Posten Norge AS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package no.digipost.stream; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.BiPredicate; +import java.util.function.Predicate; + +/** + * A consumer function for "folding" a value with an {@link AtomicReference}. + * + * @param The type of the value this function folds. + */ +@FunctionalInterface +interface AtomicReferenceFolder extends BiConsumer, T> { + + static AtomicReferenceFolder clearReference() { + return (ref, value) -> ref.set(null); + } + + static AtomicReferenceFolder keepFirst(Predicate predicate) { + return (currentRef, candidateElement) -> + currentRef.accumulateAndGet(candidateElement, (current, candidate) -> current == null && predicate.test(candidate) ? candidate : current); + } + + static AtomicReferenceFolder keepLast(Predicate predicate) { + return (currentRef, candidateElement) -> { + if (predicate.test(candidateElement)) { + currentRef.set(candidateElement); + } + }; + } + + default AtomicReferenceFolder doInsteadIf(Predicate valuePredicate, AtomicReferenceFolder foldOperation) { + return doInsteadIf((ref, value) -> valuePredicate.test(value), foldOperation); + } + + default AtomicReferenceFolder doInsteadIf(BiPredicate, ? super T> refAndValuePredicate, AtomicReferenceFolder foldOperation) { + return (ref, value) -> { + if (refAndValuePredicate.test(ref, value)) { + foldOperation.accept(ref, value); + } else { + this.accept(ref, value); + } + }; + } +} diff --git a/src/main/java/no/digipost/stream/AtomicReferenceFoldingCollector.java b/src/main/java/no/digipost/stream/AtomicReferenceFoldingCollector.java new file mode 100644 index 0000000..91ba473 --- /dev/null +++ b/src/main/java/no/digipost/stream/AtomicReferenceFoldingCollector.java @@ -0,0 +1,78 @@ +/* + * Copyright (C) Posten Norge AS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package no.digipost.stream; + +import java.util.EnumSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BinaryOperator; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collector; + +import static java.util.Collections.unmodifiableSet; +import static java.util.stream.Collector.Characteristics.CONCURRENT; + +final class AtomicReferenceFoldingCollector implements Collector, Optional> { + + private final AtomicReferenceFolder accumulator; + private final Set characteristics; + + AtomicReferenceFoldingCollector(AtomicReferenceFolder accumulator) { + this(accumulator, EnumSet.of(CONCURRENT)); + } + + AtomicReferenceFoldingCollector(AtomicReferenceFolder accumulator, Set characteristics) { + this.accumulator = accumulator; + this.characteristics = unmodifiableSet(characteristics); + } + + + @Override + public AtomicReferenceFolder accumulator() { + return accumulator; + } + + @Override + public Set characteristics() { + return characteristics; + } + + /** + * The combiner, while thread-safe, has essentially undefined behavior if combining two + * found elements, because there is no way to tell if one or the other should be prioritized. + * In all cases of combining one found element with a not found, the found element will be + * returned from the function. + */ + @Override + public BinaryOperator> combiner() { + return (ref1, ref2) -> { + ref1.compareAndSet(null, ref2.get()); + return ref1; + }; + } + + @Override + public Function, Optional> finisher() { + return ref -> Optional.ofNullable(ref.get()); + } + + @Override + public Supplier> supplier() { + return AtomicReference::new; + } +} \ No newline at end of file diff --git a/src/main/java/no/digipost/stream/SubjectFilter.java b/src/main/java/no/digipost/stream/SubjectFilter.java new file mode 100644 index 0000000..532f624 --- /dev/null +++ b/src/main/java/no/digipost/stream/SubjectFilter.java @@ -0,0 +1,60 @@ +/* + * Copyright (C) Posten Norge AS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package no.digipost.stream; + +import java.util.Optional; +import java.util.function.Predicate; +import java.util.stream.Collector; + +import static java.util.function.Predicate.isEqual; +import static no.digipost.stream.AtomicReferenceFolder.clearReference; +import static no.digipost.stream.AtomicReferenceFolder.keepFirst; +import static no.digipost.stream.AtomicReferenceFolder.keepLast; + +/** + * The initial subject filter for building a searching {@code Collector}. + * The {@link Collector} is acquired by invoking a method to finalize the + * (compound) condition for accumulating the result across the elements + * the collector will be applied to. + * + * @param the type of elements this subject filter inspects + * + * @see #keepFirstNotFollowedBy(Predicate) + * @see #keepLastNotFollowedBy(Predicate) + */ +public final class SubjectFilter { + private final Predicate subjectElement; + + public SubjectFilter(Predicate subjectElement) { + this.subjectElement = subjectElement; + } + + public Collector> keepFirstNotFollowedBy(T cancellingElement) { + return keepFirstNotFollowedBy(isEqual(cancellingElement)); + } + + public Collector> keepFirstNotFollowedBy(Predicate cancellingElement) { + return new AtomicReferenceFoldingCollector<>(keepFirst(subjectElement).doInsteadIf(cancellingElement, clearReference())); + } + + public Collector> keepLastNotFollowedBy(T cancellingElement) { + return keepLastNotFollowedBy(isEqual(cancellingElement)); + } + + public Collector> keepLastNotFollowedBy(Predicate cancellingElement) { + return new AtomicReferenceFoldingCollector<>(keepLast(subjectElement).doInsteadIf(cancellingElement, clearReference())); + } +} diff --git a/src/test/java/no/digipost/DiggCollectorsTest.java b/src/test/java/no/digipost/DiggCollectorsTest.java index 49cac1e..0f1bb6c 100644 --- a/src/test/java/no/digipost/DiggCollectorsTest.java +++ b/src/test/java/no/digipost/DiggCollectorsTest.java @@ -19,7 +19,10 @@ import no.digipost.tuple.Tuple; import no.digipost.tuple.ViewableAsTuple; import no.digipost.util.ViewableAsOptional; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.quicktheories.core.Gen; import uk.co.probablyfine.matchers.OptionalMatchers; @@ -31,6 +34,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; +import java.util.stream.IntStream; import java.util.stream.Stream; import static java.util.Arrays.asList; @@ -39,6 +44,7 @@ import static no.digipost.DiggCollectors.allowAtMostOne; import static no.digipost.DiggCollectors.allowAtMostOneOrElseThrow; import static no.digipost.DiggCollectors.asSuppressedExceptionsOf; +import static no.digipost.DiggCollectors.find; import static no.digipost.DiggCollectors.toMultimap; import static no.digipost.DiggCollectors.toMultituple; import static no.digipost.DiggCollectors.toSingleExceptionWithSuppressed; @@ -50,6 +56,7 @@ import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.sameInstance; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.doThrow; @@ -207,4 +214,45 @@ public void allowAtMostOneFailsWithCustomException() { assertThat(thrown, sameInstance(customException)); }); } + + @Nested + class FindTest { + + @Test + void emptyStream() { + assertThat(Stream.empty().collect(find(e -> true).keepLastNotFollowedBy(null)), whereNot(Optional::isPresent)); + } + + @Test + void findsLastItemInStreamNothingIsVoided() { + assertThat(Stream.of(1, 2, 3, 4).collect(find((Integer n) -> n > 2).keepLastNotFollowedBy(0)), contains(4)); + } + + @Test + void findsLastMatchingItemAndNotCancelledByAnyFollowingElement() { + assertThat(Stream.of(1, 2, 3, 4, 5, 6).collect(find((Integer n) -> n > 2 && n < 6).keepLastNotFollowedBy(0)), contains(5)); + } + + @Test + void findsLastMatchingItemNotFollowedByCertainValue() { + assertThat(Stream.of(1, 2, 3, 4, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7).collect(find((Integer n) -> n > 3 && n < 6).keepLastNotFollowedBy(8)), contains(5)); + } + + @Test + void findsFirstMatchingItemNotFollowedByCertainValue() { + assertThat(Stream.of(1, 2, 3, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7).collect(find((Integer n) -> n > 3 && n < 6).keepFirstNotFollowedBy(8)), contains(4)); + } + + @RepeatedTest(20) @Timeout(10) + void parallelStreamsWorksButBehaviorIsNotReallyDefined() { + Random random = new Random(); + Optional result = Optional.empty(); + while(!result.isPresent()) { + result = IntStream.generate(() -> random.nextInt(1000)).limit(500_000).boxed() + .parallel() + .collect(find((Integer n) -> n < 100).keepLastNotFollowedBy(n -> n > 990)); + } + assertThat(result, contains(lessThan(100))); + } + } }