Skip to content

Commit

Permalink
Catch MVar blocked error and then exit worker thread
Browse files Browse the repository at this point in the history
This only occurs when Workers is garbage collected and then we should indeed exit the worker threads.
Also split wakeAll in wakeAll and exitAll,
and adda special state to SleepScope to denote that threads should exit.
  • Loading branch information
ivogabe committed Apr 10, 2024
1 parent d574656 commit 122d3a0
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ instance Show Activity where
schedule :: Workers -> Job -> IO ()
schedule workers job = do
pushL (workerTaskQueue workers) job
wakeAll (workerSleep workers) Work
wakeAll $ workerSleep workers

runWorker :: Workers -> ThreadIdx -> IO ()
runWorker !workers !threadIdx = do
Expand Down Expand Up @@ -182,7 +182,7 @@ hireWorkersOn caps = do
let count = length caps
activities <- newArray count Inactive
ioref <- newIORef ()
_ <- mkWeakIORef ioref $ wakeAll sleep Exit
_ <- mkWeakIORef ioref $ exitAll sleep
let workers = Workers count sleep queue activities ioref
forM_ caps $ \cpu -> do
tid <- forkOn cpu $ do
Expand Down Expand Up @@ -271,7 +271,7 @@ resolveSignal !workers (NativeSignal ioref) = do
executeKernel :: forall env. Workers -> ThreadIdx -> KernelCall env -> Job -> IO ()
executeKernel !workers !myIdx (KernelCall fun arg) continuation = do
writeArray (workerActivity workers) myIdx $ Active @env Proxy fun arg continuation
wakeAll (workerSleep workers) Work
wakeAll $ workerSleep workers
helpKernel workers myIdx myIdx (return ()) (return ())

{-# INLINE helpKernel #-}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
--

module Data.Array.Accelerate.LLVM.Native.Execute.Sleep
( SleepScope, newSleepScope, sleepIf, wakeAll
( SleepScope, newSleepScope, sleepIf, wakeAll, exitAll
, WakeReason(..)
) where

import Data.Atomics
import Data.IORef
import Control.Monad
import Control.Concurrent.MVar
import Control.Exception
import Data.Array.Accelerate.Error

newtype SleepScope = SleepScope (IORef State)

Expand All @@ -40,8 +42,10 @@ data State
-- All threads are busy. The MVar is currently not used (and is empty).
-- It will be used when the state changes to waiting.
| Busy {-# UNPACK #-} !(MVar WakeReason)
-- All work is done. The worker threads should exit.
| Done

data WakeReason = Work | Exit
data WakeReason = Work | Exit deriving Show

-- Invariants:
-- * If the state is Waiting, then 'sleepIf' will not write to the state.
Expand All @@ -58,7 +62,7 @@ sleepIf (SleepScope ref) condition = do
c <- condition
if c then
-- Start waiting
readMVar mvar
readMVar mvar `catch` (\(_ :: SomeException) -> return Exit)
else
-- Don't wait
return Work
Expand All @@ -73,15 +77,16 @@ sleepIf (SleepScope ref) condition = do
-- A CAS is needed, compared to a normal write, as this function can be
-- interleaved by other threads doing 'sleepIf' and 'wakeAll'.
-- Start waiting
readMVar mvar
readMVar mvar `catch` (\(_ :: SomeException) -> return Exit)
-- readMVar is blocking until a value is available. All threads waiting
-- will be woken when a value is written.
else
-- Don't wait
return Work
Done -> return Exit

wakeAll :: SleepScope -> WakeReason -> IO ()
wakeAll (SleepScope ref) reason = do
wakeAll :: SleepScope -> IO ()
wakeAll (SleepScope ref) = do
ticket <- readForCAS ref
case peekTicket ticket of
-- No need to wake anyone!
Expand All @@ -95,5 +100,26 @@ wakeAll (SleepScope ref) reason = do
-- interleaved by other threads doing 'wakeAll' and 'sleepIf'.

-- Wake all threads
when success $ putMVar mvar reason
when success $ putMVar mvar Work
Done -> internalError "Cannot wake threads after exit" -- Should be impossible

exitAll :: SleepScope -> IO ()
exitAll (SleepScope ref) = do
ticket <- readForCAS ref
case peekTicket ticket of
Busy _ -> do
print "exitAll busy"
(success, _) <- casIORef ref ticket Done
print success
unless success $ exitAll (SleepScope ref)
Waiting mvar -> do
print "exitAll waiting"
new <- newEmptyMVar
(success, _) <- casIORef ref ticket Done
print success
if success then
putMVar mvar Exit
else
exitAll (SleepScope ref)
Done -> print "exitAll done" -- return ()

0 comments on commit 122d3a0

Please sign in to comment.