Concurrency demos/Zeta
Jump to navigation
Jump to search
The printable version is no longer supported and may have rendering errors. Please update your browser bookmarks and please use the default browser print function instead.
A simple example of parallelism in Haskell
This little piece of code computes an approximation of Riemann's zeta function, balancing the work to be done between N threads.
import Control.Concurrent
import Control.Concurrent.MVar
import Control.Monad
import Data.Complex
import System.Environment
-- Return the list of the terms of the zeta function for the given range.
-- We don't sum the terms here but let the main thread sum the lists returned
-- by all the other threads so as to avoid accumulating rounding imprecisions.
zetaRange :: (Floating a, Integral b) => a -> (b, b) -> [a]
zetaRange s (x,y) = [ fromIntegral n ** (-s) | n <- [x..y] ]
cut :: (Integral a) => (a, a) -> a -> [(a, a)]
cut (x,y) n = (x, x + mine - 1) : cut' (x + mine) size (y - mine)
where
(size, modulo) = y `divMod` n
mine = size + modulo
cut' _ _ 0 = []
cut' x' size' n' = (x', x' + size' - 1) : cut' (x' + size') size' (n' - size')
getParams :: IO (Int, Int, Complex Double)
getParams = do
argv <- getArgs
case argv of
(t:n:s:[]) -> return (read t, read n, read s)
_ -> error "usage: zeta <nthreads> <boundary> <s>"
main :: IO ()
main = do
(t, n, s) <- getParams
childs <- mapM (thread s) (cut (1, n) t)
results <- mapM takeMVar childs
print (sum (concat results))
where
thread s range = do
putStrLn ("Starting thread for range " ++ show range)
mvar <- newEmptyMVar
forkIO (do let zs = zetaRange s range
when (zs==zs) $ putMVar mvar zs) -- we need to deepSeq the list
return mvar
Or using Strategies
Replace the Control.Concurrent...
imports by
import Control.Parallel.Strategies
and replace main by
main :: IO ()
main = do
(t, n, s) <- getParams
let ranges = cut (1, n) t
results = map (zetaRange s) ranges `using` parList rnf
putStr $ unlines [ "Starting thread for range " ++ show r | r <- ranges ]
print (sum (concat results))
Using a Chan instead of MVars
Replace the main function with:
main :: IO ()
main = do
(t, n, s) <- getParams
chan <- newChan
terms <- getChanContents chan
forM_ (cut (1,n) t) $ thread chan s
let wait xs i result
| i >= t = print result -- Done.
| otherwise = case xs of
Nothing : rest -> wait rest (i + 1) result
Just x : rest -> wait rest i (result + x)
_ -> error "missing thread termination marker"
wait terms 0 0
where
thread chan s range = do
putStrLn ("Starting thread for range " ++ show range)
forkIO $ do
mapM_ (writeChan chan . Just) (zetaRange s range)
writeChan chan Nothing
Benchmarks
Here's a simple script to run the three variation above, with four threads using 1, 2, and 3 OS threads.
for a in mvar chan strat; do
for n in 1 2 3; do
echo -n $a $n ' ';
/usr/bin/time -f "%Uu %Ss %Ee %PCPU" ./z.$a 4 500000 1:+1 +RTS -N$n > /dev/null;
done;
echo;
done
Results on a dual Opteron system:
mvar 1 2.52u 0.06s 0:02.63e 98%CPU
mvar 2 2.69u 0.05s 0:02.10e 130%CPU
mvar 3 2.85u 0.07s 0:02.30e 126%CPU
chan 1 11.75u 4.06s 0:15.91e 99%CPU
chan 2 9.81u 0.05s 0:09.48e 104%CPU
chan 3 10.96u 3.25s 0:12.24e 116%CPU
strat 1 8.82u 0.07s 0:08.93e 99%CPU
strat 2 4.42u 0.06s 0:03.82e 117%CPU
strat 3 5.01u 0.08s 0:04.46e 114%CPU