never executed always true always false
    1 {-|
    2 Module: Flaw.Flow
    3 Description: Different helpful functions for dealing with synchronized flows of operations.
    4 License: MIT
    5 -}
    6 
    7 module Flaw.Flow
    8   ( forkFlow
    9   , forkFlowOS
   10   , Flow()
   11   , newFlow
   12   , newFlowOS
   13   , newMultiFlow
   14   , newMultiFlowOS
   15   , asyncRunInFlow
   16   , runInFlow
   17   ) where
   18 
   19 import Control.Concurrent
   20 import Control.Concurrent.STM
   21 import Control.Exception
   22 import Control.Monad
   23 
   24 import Flaw.Book
   25 
   26 -- | Fork a thread.
   27 forkFlow :: IO () -> IO ((), IO ())
   28 forkFlow = forkFlowInternal forkFinally
   29 
   30 -- | Fork an OS thread.
   31 forkFlowOS :: IO () -> IO ((), IO ())
   32 forkFlowOS = forkFlowInternal $ \action andThen -> mask $ \restore -> forkOS $ try (restore action) >>= andThen
   33 
   34 forkFlowInternal :: (IO () -> (Either SomeException () -> IO ()) -> IO ThreadId) -> IO () -> IO ((), IO ())
   35 forkFlowInternal f work = do
   36   stoppedVar <- newEmptyMVar
   37   threadId <- f work $ \_ -> putMVar stoppedVar ()
   38   let
   39     stop = do
   40       killThread threadId
   41       takeMVar stoppedVar
   42   return ((), stop)
   43 
   44 -- | Operation flow.
   45 newtype Flow = Flow (TQueue (IO ()))
   46 
   47 -- | Create operation flow, i.e. single stream of operations running in a separate
   48 -- thread, and booked into Flaw.Book.
   49 newFlow :: IO (Flow, IO ())
   50 newFlow = newMultiFlow 1
   51 
   52 -- | Create operation flow in a bound thread.
   53 newFlowOS :: IO (Flow, IO ())
   54 newFlowOS = newMultiFlowOS 1
   55 
   56 -- | Create operation multiflow, i.e. multiple threads, booked into Flaw.Book, and using a single queue of operations.
   57 newMultiFlow :: Int -> IO (Flow, IO ())
   58 newMultiFlow threadsCount = newFlowInternal threadsCount forkFlow
   59 
   60 -- | Create operation multiflow using bounded threads.
   61 newMultiFlowOS :: Int -> IO (Flow, IO ())
   62 newMultiFlowOS threadsCount = newFlowInternal threadsCount forkFlowOS
   63 
   64 newFlowInternal :: Int -> (IO () -> IO ((), IO ())) -> IO (Flow, IO ())
   65 newFlowInternal threadsCount f = withSpecialBook $ \bk -> do
   66   queue <- newTQueueIO
   67   forM_ [1..threadsCount] $ \_i -> book bk $ f $ runOperations queue
   68   return $ Flow queue
   69 
   70 asyncRunInFlow :: Flow -> IO () -> STM ()
   71 asyncRunInFlow (Flow queue) operation = writeTQueue queue $ do
   72   operation
   73   runOperations queue
   74 
   75 runInFlow :: Flow -> IO a -> IO a
   76 runInFlow flow operation = do
   77   resultVar <- newEmptyMVar
   78   atomically $ asyncRunInFlow flow $ putMVar resultVar =<< try operation
   79   r <- takeMVar resultVar
   80   case r of
   81     Right a -> return a
   82     Left e -> throwIO (e :: SomeException)
   83 
   84 runOperations :: TQueue (IO ()) -> IO ()
   85 runOperations queue = join $ atomically $ readTQueue queue