From 33c8f52db4c54121635d4c69a7fbddb09c0828fb Mon Sep 17 00:00:00 2001 From: Aleksandr Mikhailov Date: Thu, 7 Nov 2024 18:34:48 +0200 Subject: [PATCH] Add Flux.unfold --- .../java/reactor/core/publisher/Flux.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index 055a0e97be..71978ace16 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.Optional; import java.util.Queue; import java.util.Set; import java.util.Spliterator; @@ -2073,6 +2074,30 @@ public static Flux switchOnNext(Publisher Type of the elements + * @param Type of the internal state + * + * @return a {@link Flux} that produces elements using `f` until `f` returns empty {@link Optional} + */ + public static Flux unfold(S init, Function>> f) { + return Flux.generate(() -> init, (s, sink) -> { + Optional> res = f.apply(s); + if (!res.isPresent()) { + sink.complete(); + return s; + } else { + sink.next(res.get().getT1()); + return res.get().getT2(); + } + }); + } + /** * Uses a resource, generated by a supplier for each individual Subscriber, while streaming the values from a * Publisher derived from the same resource and makes sure the resource is released if the sequence terminates or