Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add warning to demux documentation #2402

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

shlok
Copy link
Contributor

@shlok shlok commented May 10, 2023

I apparently need to do this at one point for things to go smoothly. (However, if this makes no sense (based on the way demux is known to work), feel free to reject.)

@@ -326,6 +326,10 @@ demuxGeneric getKey getFold = fmap extract $ foldlM' step initial
-- This can be used to scan a stream and collect the results from the scan
-- output.
--
-- /Warning/: One should call the returned monadic action to make sure the
-- folds fully complete—even if the action returns nothing useful (e.g., a map
-- of @()@ values).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be the case unless there is some laziness somewhere. Can you test and confirm that this makes a difference?

@harendra-kumar
Copy link
Member

I apparently need to do this at one point for things to go smoothly.

Can you recheck/reproduce this?

@shlok
Copy link
Contributor Author

shlok commented May 16, 2023

@harendra-kumar The issue occurs if the folds are executed on different threads. Here is an example:

{-# LANGUAGE ScopedTypeVariables #-}

module Main where

import Control.Concurrent
import Data.Function
import qualified Streamly.Data.Fold as F
import qualified Streamly.Data.Stream.Prelude as S
import qualified Streamly.Internal.Data.Fold.Concurrent as F

main :: IO ()
main = do
  let foldEven =
        F.parEval id $
          F.foldlM'
            (\() x -> putStrLn $ "Processing even: " ++ show x)
            (return ())
      foldOdd =
        F.parEval id $
          F.foldlM'
            (\() x -> do putStrLn $ "Processing odd: " ++ show x)
            (return ())

  (io, _) <-
    S.fromList [0 :: Int .. 20]
      & S.fold
        ( F.demuxIO
            (\i -> if even i then "even" else "odd")
            ( \i ->
                if even i
                  then return foldEven
                  else return foldOdd
            )
        )
  -- _ <- io -- Commented out for now.

  putStrLn "Doing additional things..."
  threadDelay 10
  putStrLn "Complete."

Example runs:

$ cabal build; cabal exec -- demux-test
Up to date
Doing additional things...
Processing even: 0
Processing odd: 1
Complete.
Processing even: 2
$ cabal build; cabal exec -- demux-test
Up to date
Processing odd: 1
Doing additional things...
Processing even: 0
Processing odd: 3
Processing even: 2
Complete.
Processing odd: 5

This is a problem if the “additional things” rely on the “processing” being fully finished.

After uncommenting that _ <- io line, things work as expected:

Processing even: 0
Processing even: 2
Processing even: 4
Processing even: 6
Processing even: 8
Processing even: 10
Processing odd: 1
Processing even: 12
Processing odd: 3
Processing even: 14
Processing odd: 5
Processing even: 16
Processing odd: 7
Processing even: 18
Processing odd: 9
Processing even: 20
Processing odd: 11
Processing odd: 13
Processing odd: 15
Processing odd: 17
Processing odd: 19
Doing additional things...
Complete.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants