Concurrency demos/Zeta

From HaskellWiki
< Concurrency demos
Revision as of 08:52, 15 January 2007 by KetilMalde (talk | contribs) (typo)
Jump to navigation Jump to search
The printable version is no longer supported and may have rendering errors. Please update your browser bookmarks and please use the default browser print function instead.

A simple example of parallelism in Haskell

This little piece of code computes an approximation of Riemann's zeta function, balancing the work to be done between N threads.

import Control.Concurrent
import Control.Concurrent.MVar
import Control.Monad
import Data.Complex
import System.Environment

-- Return the list of the terms of the zeta function for the given range.
-- We don't sum the terms here but let the main thread sum the lists returned
-- by all the other threads so as to avoid accumulating rounding imprecisions.
zetaRange :: (Floating a, Integral b) => a -> (b, b) -> [a]
zetaRange s (x,y) = [ fromIntegral n ** (-s) | n <- [x..y] ]

cut :: (Integral a) => (a, a) -> a -> [(a, a)]
cut (x,y) n = (x, x + mine - 1) : cut' (x + mine) size (y - mine)
 where
  (size, modulo)   = y `divMod` n
  mine             = size + modulo

  cut' _ _ 0       = []
  cut' x' size' n' = (x', x' + size' - 1) : cut' (x' + size') size' (n' - size') 

getParams :: IO (Int, Int, Complex Double)
getParams = do
  argv <- getArgs
  case argv of
    (t:n:s:[]) -> return (read t, read n, read s)
    _          -> error "usage: zeta <nthreads> <boundary> <s>"

main :: IO ()
main = do
  (t, n, s) <- getParams
  childs    <- mapM (thread s) (cut (1, n) t)
  results   <- mapM takeMVar childs
  print (sum (concat results))
 where
  thread s range = do
    putStrLn ("Starting thread for range " ++ show range)
    mvar <- newEmptyMVar
    forkIO (do let zs = zetaRange s range
               when (zs==zs) $ putMVar mvar zs) -- we need to deepSeq the list
    return mvar

Or using Strategies

Replace the Control.Concurrent... imports by

import Control.Parallel.Strategies

and replace main by

main :: IO ()
main = do
  (t, n, s) <- getParams
  let ranges    = cut (1, n) t
      results   = map (zetaRange s) ranges `using` parList rnf
  putStr $ unlines [ "Starting thread for range " ++ show r | r <- ranges ]
  print (sum (concat results))

Using a Chan instead of MVars

Replace the main function with:

main :: IO ()
main = do
  (t, n, s) <- getParams
  chan      <- newChan
  terms     <- getChanContents chan

  forM_ (cut (1,n) t) $ thread chan s

  let wait xs i result
        | i >= t    = print result  -- Done.
        | otherwise = case xs of
           Nothing : rest -> wait rest (i + 1) result
           Just x  : rest -> wait rest i (result + x)
           _              -> error "missing thread termination marker"

  wait terms 0 0
 where
  thread chan s range = do
    putStrLn ("Starting thread for range " ++ show range)
    forkIO $ do
      mapM_ (writeChan chan . Just) (zetaRange s range)
      writeChan chan Nothing


Benchmarks

Here's a simple script to run the three variation above, with four threads using 1, 2, and 3 OS threads.

for a in mvar chan strat; do
   for n in 1 2 3; do 
       echo -n $a $n ' '; 
       /usr/bin/time -f "%Uu %Ss %Ee %PCPU" ./z.$a 4 500000 1:+1 +RTS -N$n > /dev/null;
   done;
   echo;
done

Results on a dual Opteron system:

mvar 1  2.52u 0.06s 0:02.63e 98%CPU
mvar 2  2.69u 0.05s 0:02.10e 130%CPU
mvar 3  2.85u 0.07s 0:02.30e 126%CPU
chan 1  11.75u 4.06s 0:15.91e 99%CPU
chan 2  9.81u 0.05s 0:09.48e 104%CPU
chan 3  10.96u 3.25s 0:12.24e 116%CPU
strat 1  8.82u 0.07s 0:08.93e 99%CPU
strat 2  4.42u 0.06s 0:03.82e 117%CPU
strat 3  5.01u 0.08s 0:04.46e 114%CPU