{-# OPTIONS_GHC -fglasgow-exts #-}

-----------------------------------------------------------------------------
-- |
-- Module      :  Data.Array.Parallel.Distributed.Gang
-- Copyright   :  (c) 2006 Roman Leshchinskiy
-- License     :  see libraries/ndp/LICENSE
-- 
-- Maintainer  :  Roman Leshchinskiy <[email protected]>
-- Stability   :  experimental
-- Portability :  non-portable (GHC Extensions)
--
-- Gang primitives.
--
-- /TODO:/
--
-- * Implement busy waiting.
--
-- * Benchmark.
--
-- * Generalise thread indices?

module Data.Array.Parallel.Unlifted.Distributed.Gang (
  Gang, forkGang, gangSize, gangIO, gangST,
  sequentialGang, seqGang
) where

--import GHC.Prim                  ( unsafeCoerce# )
import GHC.IOBase
import GHC.ST
import GHC.Conc                  ( forkOnIO )

import Control.Concurrent.MVar   ( MVar, newEmptyMVar, takeMVar, putMVar )
-- import Control.Monad.ST          ( ST, unsafeIOToST, stToIO )
import Control.Exception         ( assert )
import Control.Monad             ( zipWithM, zipWithM_ )

-- ---------------------------------------------------------------------------
-- Requests and operations on them

-- | The 'Req' type encapsulates work requests for individual members of a
-- gang. It is made up of an 'IO' action, parametrised by the index of the
-- worker which executes it, and an 'MVar' which is written to when the action
-- has been executed and can be waited upon.
type Req = (Int -> IO (), MVar ())

-- | Create a new request for the given action.
newReq :: (Int -> IO ()) -> IO Req
newReq p = do
             mv <- newEmptyMVar
             return (p, mv)

-- | Block until the request has been executed. Note that only one thread can
-- wait for a request.
waitReq :: Req -> IO ()
waitReq = takeMVar . snd

-- | Execute the request and signal its completion.
execReq :: Int -> Req -> IO ()
execReq i (p, s) = p i >> putMVar s ()

-- ---------------------------------------------------------------------------
-- Thread gangs and operations on them

-- | A 'Gang' is a either group of threads which execute arbitrary work
-- requests. A /sequential/ 'Gang' simulates such a group by executing work
-- requests sequentially.
data Gang = Gang !Int [MVar Req] -- ^ The number of 'Gang' threads, and an 
                                 -- 'MVar' per thread; empty for sequential 
                                 -- 'Gang's.
-- To get the gang to do work, write Req-uest values to its MVars

-- | The worker thread of a 'Gang'.
gangWorker :: Int -> MVar Req -> IO ()
gangWorker i mv =
  do
    req <- takeMVar mv
    execReq i req
    gangWorker i mv

-- | Fork a 'Gang' with the given number of threads (at least 1).
forkGang :: Int -> IO Gang
forkGang n = assert (n > 0) $
             do
               mvs <- sequence . replicate n $ newEmptyMVar
               zipWithM_ forkOnIO [0..] (zipWith gangWorker [0 .. n-1] mvs)
               return $ Gang n mvs

-- | Yield a sequential 'Gang' which simulates the given number of threads.
sequentialGang :: Int -> Gang
sequentialGang n = assert (n > 0) $ Gang n []

-- | Yield a sequential 'Gang' which simulates the given one.
seqGang :: Gang -> Gang
seqGang = sequentialGang . gangSize

-- | The number of threads in the 'Gang'.
gangSize :: Gang -> Int
gangSize (Gang n _) = n

-- | Issue work requests for the 'Gang' and wait until they have been executed.
gangIO :: Gang -> (Int -> IO ()) -> IO ()
gangIO (Gang n [])  p = mapM_ p [0 .. n-1]
gangIO (Gang n mvs) p =
  do
    reqs <- sequence . replicate n $ newReq p
    zipWithM putMVar mvs reqs
    mapM_ waitReq reqs

-- | Same as 'gangIO' but in the 'ST' monad.
gangST :: Gang -> (Int -> ST s ()) -> ST s ()
gangST g p = unsafeIOToST . gangIO g $ unsafeSTToIO . p

instance Show Gang where
  showsPrec p (Gang n []) = showString "<<"
                          . showsPrec p n
                          . showString " threads (simulated)>>"
  showsPrec p (Gang n _)  = showString "<<"
                          . showsPrec p n
                          . showString " threads>>"

{- Comes from GHC.IOBase now...
-- | Unsafely embed an 'ST' computation in the 'IO' monad without fixing the
-- state type. This should go into 'Control.Monad.ST'.
unsafeSTToIO :: ST s a -> IO a
unsafeSTToIO (ST m) = IO $ \ s -> (unsafeCoerce# m) s
 -}