Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chaining unfolds #2911

Closed
clintonmead opened this issue Dec 20, 2024 · 6 comments
Closed

Chaining unfolds #2911

clintonmead opened this issue Dec 20, 2024 · 6 comments

Comments

@clintonmead
Copy link

clintonmead commented Dec 20, 2024

I have an Unfold m (s, a) b, and I also have a Stream a. What I want to do is produce a Stream b, which is the result of repeatedly applying my unfold to each element in my Stream a and concatenating the results. The issue is my unfold also requires a state s. I want to be able to set this initial state s for the first element from Stream a, and then have the state s be the "end state" of the unfold for each subsequent element of stream a. Kind of chaining the unfolds together, where the end state of each one is fed into the next unfold on the next element from the input stream.

The issue is that Unfold doesn't have an end state. I think this is because Unfold is like Conduit i o m (). But I need some sort of result (I think).

One alternative could be to modify the step data type, like so:

data Step s finalState a = Yield a s | Skip s | Stop finalState

data Unfold m finalState a b =
    forall s. Unfold (s -> m (Step s finalState)) (a -> m s)

But this is a pretty big change. You could do this:

data Step' s finalState a = Yield a s | Skip s | Stop finalState

data Unfold' m finalState a b =
    forall s. Unfold (s -> m (Step s finalState)) (a -> m s)

type Step s a = Step s () a
type Unfold m a b = Unfold' m () a b

But still this is a decent sized change.

The other thing I thought of was keeping the state in the output of the Unfold (which one could then map out latter on). Basically change your Unfold m (s, a) b -> Unfold m (s, a) (s, b).

Then we can write a function like this:

data ConcatMapUState o i b =
      ConcatMapUOuter o (Maybe b)
    | ConcatMapUInner o i (Maybe b)

unfoldManyChain :: Monad m => (Maybe b -> a -> a1) -> Unfold m a1 b -> Stream m a -> Stream m b
unfoldManyChain nextStateFunc (Unfold istep inject) (Stream ostep ost) =
  Stream step (ConcatMapUOuter ost Nothing) 
  where
    step gst (ConcatMapUOuter o a1) = do
        r <- ostep (adaptState gst) o
        case r of
            Yield a o' -> do
                i <- inject (nextStateFunc a1 a)
                i `seq` return (Skip (ConcatMapUInner o' i a1))
            Skip o' -> return $ Skip (ConcatMapUOuter o' a1)
            Stop -> return Stop

    step _ (ConcatMapUInner o i x) = do
        r <- istep i
        return $ case r of
            Yield x' i' -> Yield x' (ConcatMapUInner o i' (Just x'))
            Skip i'    -> Skip (ConcatMapUInner o i' x)
            Stop       -> Skip (ConcatMapUOuter o x)

Which is basically unfoldMany, but with an extra function that allows one to change what value goes to the unfold based on:

  1. The value from the source stream AND
  2. The LAST result value from the previous unfold (or Nothing if there wasn't one).

This allows one to chain state between unfolds.

But these seem like somewhat hacky solutions, and I don't think what I'm doing here is that weird. Basically I have an incoming stream of chunks of data, and I want to produce an outgoing stream of tokens, but it's important that I keep some state between the chunks (like, have we got an unclosed quote from the previous chunk). Is there a better way to do this with the existing combinators?

(This is a follow one from the discussion topic here: #2908, but I've made this into an issue now because I actually can provide some concrete code).


Edit: Here's an alternate implementation, that avoids an intermediate Unfold:

data UnfoldChainState o s = UnfoldChainState o s

unfoldManyChain' :: Monad m => (s -> (s, Maybe b)) -> (a -> s -> s) -> s -> Stream m a -> Stream m b
unfoldManyChain' iterFunc newStateFunc initialState (Stream ostep ost) = Stream step (UnfoldChainState ost initialState) where
  step gst state@(UnfoldChainState o s) = 
    let
      (s', mb) = iterFunc s
    in
      case mb of
        Just b -> return $ Yield b (UnfoldChainState o s')
        Nothing -> do
          r <- ostep (adaptState gst) o
          case r of
            Yield a o' -> step gst (UnfoldChainState o' (newStateFunc a s'))
            Skip o' -> return $ Skip (UnfoldChainState o' s')
            Stop -> return Stop
@shlok
Copy link
Contributor

shlok commented Dec 25, 2024

Turning Stream a into Stream b using a given stateful computation is something I have been doing successfully with scan or postscan (although the stateful computation is a Fold, not an Unfold). Would this not work for you?

@clintonmead
Copy link
Author

For each input element from the Stream a, I need to produce multiple output elements in the Stream b. So I've kinda got a function a -> Stream b (which is an Unfold). But my actual function is more like (s, a) -> (s, Stream b), And there's an initial state, and after processing one element from the stream (and producing multiple output elements) I then need to take that final state and forward through for processing the next element a (which will produce it's own stream of b).

I have two thoughts how to do this now:

  1. Do what you've suggested, a scan, but I think that will result in a Stream (Stream b). Is there a nice way to concat this into a flat Stream b, and is this a good approach? OR
  2. Just do the whole computation in StateT m s, and just keep the state there,

I feel like if I keep things inside the library framework (i.e. not doing 2) things will optimise better but I'm not sure how to approach that exactly. The issue I think with your Fold suggestion (correct me if I'm wrong) is that scan will only output as many elements as there are in the input (maybe one more as the initial). I need lots more elements outputted than from the input.

@harendra-kumar
Copy link
Member

@clintonmead for stateful transformation of an unfold you could scan or postscan an Unfold. There are scan and postscan operations in the Unfold module just like in the Stream module.

postscan :: Monad m => Fold m b c -> Unfold m a b -> Unfold m a c 

The output type c could include part of the state being used in the stateful transformation. It depends on how you write the Fold being used for the transformation.

@clintonmead
Copy link
Author

clintonmead commented Dec 26, 2024

@harendra-kumar sorry I must be still missing something. Fold combines multiple elements into one doesn't it? I need to break each element of a Stream into multiple elements. So lets say the input stream has 10 elements, the output stream may have 100 elements. How does a Fold on an Unfold increase the number of elements?

@harendra-kumar
Copy link
Member

I thought you wanted to introduce a state in an existing unfold, scanning an unfold can do that. But it seems you want to share a state across multiple invocations of an unfold. The only way to do that using the existing unfolds is by using StateT so that you can use the underlying monad to share the state.

To share the state explicitly, the signature you proposed looks good:

unfoldManyChain' :: Monad m => (s -> (s, Maybe b)) -> (a -> s -> s) -> s -> Stream m a -> Stream m b

But this will require a separate type so that we can incorporate the state parameter in the type. For your purposes you can try it out by writing it outside the library and see how it works. Also see how the StateT solution fares compared to this.

@clintonmead
Copy link
Author

clintonmead commented Jan 13, 2025

I ended up using Strictly's runStateT and it worked like a charm.

Thanks for all your help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants