Difference between revisions of "Concurrency demos/Zeta"

From HaskellWiki
Jump to navigation Jump to search
(Add a Control.Parallel.Strategies version)
(Parallel strategies for initial "simple example")
 
(11 intermediate revisions by 4 users not shown)
Line 1: Line 1:
 
__TOC__
 
__TOC__
  +
[[Category:Code]]
 
 
== A simple example of parallelism in Haskell ==
 
== A simple example of parallelism in Haskell ==
   
Line 6: Line 6:
   
 
<haskell>
 
<haskell>
import Control.Concurrent
+
import Control.Parallel.Strategies
import Control.Concurrent.MVar
 
 
import Control.Monad
 
import Control.Monad
 
import Data.Complex
 
import Data.Complex
Line 16: Line 15:
 
-- by all the other threads so as to avoid accumulating rounding imprecisions.
 
-- by all the other threads so as to avoid accumulating rounding imprecisions.
 
zetaRange :: (Floating a, Integral b) => a -> (b, b) -> [a]
 
zetaRange :: (Floating a, Integral b) => a -> (b, b) -> [a]
zetaRange s (x,y) = [ (fromIntegral n) ** (-s) | n <- [x..y] ]
+
zetaRange s (x,y) = [ fromIntegral n ** (-s) | n <- [x..y] ]
   
 
cut :: (Integral a) => (a, a) -> a -> [(a, a)]
 
cut :: (Integral a) => (a, a) -> a -> [(a, a)]
Line 34: Line 33:
 
_ -> error "usage: zeta <nthreads> <boundary> <s>"
 
_ -> error "usage: zeta <nthreads> <boundary> <s>"
   
  +
main :: IO ()
  +
main = do
  +
(t, n, s) <- getParams
  +
let ranges = cut (1, n) t
 
results = map (zetaRange s) ranges `using` parList rnf
 
putStr $ unlines [ "Starting thread for range " ++ show r | r <- ranges ]
 
print (sum (concat results))
 
</haskell>
  +
  +
== With concurrent threads ==
  +
  +
Replace:
 
<haskell>
 
import Control.Parallel.Strategies
  +
</haskell>
  +
with:
  +
<haskell>
  +
import Control.Concurrent
 
import Control.Concurrent.MVar
  +
</haskell>
  +
  +
=== Using mutex-variables (<code>MVar</code>) ===
  +
* Replace <code>main</code> with:
  +
:<haskell>
 
main :: IO ()
 
main :: IO ()
 
main = do
 
main = do
Line 44: Line 67:
 
putStrLn ("Starting thread for range " ++ show range)
 
putStrLn ("Starting thread for range " ++ show range)
 
mvar <- newEmptyMVar
 
mvar <- newEmptyMVar
forkIO (putMVar mvar (zetaRange s range))
+
forkIO (do let zs = zetaRange s range
  +
when (zs==zs) $ putMVar mvar zs) -- we need to deepSeq the list
 
return mvar
 
return mvar
 
</haskell>
 
</haskell>
   
=== Or using Strategies ===
+
=== Using a channel (<code>Chan</code>) ===
Replace the <hask>Control.Concurrent...</hask> imports by
+
* Replace <code>main</code> with:
<haskell>
+
:<haskell>
import Control.Parallel.Strategies
 
</haskell>
 
and replace main by
 
<haskell>
 
 
main :: IO ()
 
main :: IO ()
 
main = do
 
main = do
 
(t, n, s) <- getParams
 
(t, n, s) <- getParams
let ranges = cut (1, n) t
+
chan <- newChan
  +
terms <- getChanContents chan
results = map (zetaRange s) ranges `using` parList rnf
 
  +
putStr $ unlines [ "Starting thread for range " ++ show r | r <- ranges ]
 
  +
forM_ (cut (1,n) t) $ thread chan s
print (sum (concat results))
 
  +
  +
let wait xs i result
  +
| i >= t = print result -- Done.
  +
| otherwise = case xs of
  +
Nothing : rest -> wait rest (i + 1) result
  +
Just x : rest -> wait rest i (result + x)
  +
_ -> error "missing thread termination marker"
  +
  +
wait terms 0 0
  +
where
  +
thread chan s range = do
  +
putStrLn ("Starting thread for range " ++ show range)
  +
forkIO $ do
  +
mapM_ (writeChan chan . Just) (zetaRange s range)
  +
writeChan chan Nothing
 
</haskell>
 
</haskell>
   
 
== Benchmarks ==
 
== Benchmarks ==
   
  +
Here's a simple script for runing all three variants, with four threads using 1, 2, and 3 OS threads.
Insert benchmarks here! :-)
 
  +
  +
<pre>
  +
a="$1"
  +
[ -z "$a" ] && { echo Usage: "$0" variant_name; exit 1; }
  +
for n in 1 2 3; do
  +
echo -n $a $n ' ';
  +
/usr/bin/time -f "%Uu %Ss %Ee %PCPU" ./z.$a 4 500000 1:+1 +RTS -N$n > /dev/null;
  +
done;
  +
echo;
  +
</pre>
  +
  +
Results on a dual Opteron system:
  +
  +
* <code>strat</code> - using strategies:
  +
:{|
  +
|<pre>
  +
strat 1 8.82u 0.07s 0:08.93e 99%CPU
  +
strat 2 4.42u 0.06s 0:03.82e 117%CPU
  +
strat 3 5.01u 0.08s 0:04.46e 114%CPU
  +
  +
</pre>
  +
|}
  +
  +
* <code>mvar</code> - using mutex-variables:
  +
:{|
  +
|<pre>
  +
mvar 1 2.52u 0.06s 0:02.63e 98%CPU
  +
mvar 2 2.69u 0.05s 0:02.10e 130%CPU
  +
mvar 3 2.85u 0.07s 0:02.30e 126%CPU
  +
  +
</pre>
  +
|}
  +
  +
* <code>chan</code> - using channels:
  +
:{|
  +
|<pre>
  +
chan 1 11.75u 4.06s 0:15.91e 99%CPU
  +
chan 2 9.81u 0.05s 0:09.48e 104%CPU
  +
chan 3 10.96u 3.25s 0:12.24e 116%CPU
  +
  +
</pre>
  +
|}

Latest revision as of 10:58, 22 June 2021

A simple example of parallelism in Haskell

This little piece of code computes an approximation of Riemann's zeta function, balancing the work to be done between N threads.

import Control.Parallel.Strategies
import Control.Monad
import Data.Complex
import System.Environment

-- Return the list of the terms of the zeta function for the given range.
-- We don't sum the terms here but let the main thread sum the lists returned
-- by all the other threads so as to avoid accumulating rounding imprecisions.
zetaRange :: (Floating a, Integral b) => a -> (b, b) -> [a]
zetaRange s (x,y) = [ fromIntegral n ** (-s) | n <- [x..y] ]

cut :: (Integral a) => (a, a) -> a -> [(a, a)]
cut (x,y) n = (x, x + mine - 1) : cut' (x + mine) size (y - mine)
 where
  (size, modulo)   = y `divMod` n
  mine             = size + modulo

  cut' _ _ 0       = []
  cut' x' size' n' = (x', x' + size' - 1) : cut' (x' + size') size' (n' - size') 

getParams :: IO (Int, Int, Complex Double)
getParams = do
  argv <- getArgs
  case argv of
    (t:n:s:[]) -> return (read t, read n, read s)
    _          -> error "usage: zeta <nthreads> <boundary> <s>"

main :: IO ()
main = do
  (t, n, s) <- getParams
  let ranges    = cut (1, n) t
      results   = map (zetaRange s) ranges `using` parList rnf
  putStr $ unlines [ "Starting thread for range " ++ show r | r <- ranges ]
  print (sum (concat results))

With concurrent threads

Replace:

import Control.Parallel.Strategies

with:

import Control.Concurrent
import Control.Concurrent.MVar

Using mutex-variables (MVar)

  • Replace main with:
main :: IO ()
main = do
  (t, n, s) <- getParams
  childs    <- mapM (thread s) (cut (1, n) t)
  results   <- mapM takeMVar childs
  print (sum (concat results))
 where
  thread s range = do
    putStrLn ("Starting thread for range " ++ show range)
    mvar <- newEmptyMVar
    forkIO (do let zs = zetaRange s range
               when (zs==zs) $ putMVar mvar zs) -- we need to deepSeq the list
    return mvar

Using a channel (Chan)

  • Replace main with:
main :: IO ()
main = do
  (t, n, s) <- getParams
  chan      <- newChan
  terms     <- getChanContents chan

  forM_ (cut (1,n) t) $ thread chan s

  let wait xs i result
        | i >= t    = print result  -- Done.
        | otherwise = case xs of
           Nothing : rest -> wait rest (i + 1) result
           Just x  : rest -> wait rest i (result + x)
           _              -> error "missing thread termination marker"

  wait terms 0 0
 where
  thread chan s range = do
    putStrLn ("Starting thread for range " ++ show range)
    forkIO $ do
      mapM_ (writeChan chan . Just) (zetaRange s range)
      writeChan chan Nothing

Benchmarks

Here's a simple script for runing all three variants, with four threads using 1, 2, and 3 OS threads.

a="$1"
[ -z "$a" ] && { echo Usage: "$0" variant_name; exit 1; }
for n in 1 2 3; do 
    echo -n $a $n ' '; 
    /usr/bin/time -f "%Uu %Ss %Ee %PCPU" ./z.$a 4 500000 1:+1 +RTS -N$n > /dev/null;
done;
echo;

Results on a dual Opteron system:

  • strat - using strategies:
 strat 1  8.82u 0.07s 0:08.93e 99%CPU
 strat 2  4.42u 0.06s 0:03.82e 117%CPU
 strat 3  5.01u 0.08s 0:04.46e 114%CPU

  • mvar - using mutex-variables:
 mvar 1  2.52u 0.06s 0:02.63e 98%CPU
 mvar 2  2.69u 0.05s 0:02.10e 130%CPU
 mvar 3  2.85u 0.07s 0:02.30e 126%CPU

  • chan - using channels:
 chan 1  11.75u 4.06s 0:15.91e 99%CPU
 chan 2  9.81u 0.05s 0:09.48e 104%CPU
 chan 3  10.96u 3.25s 0:12.24e 116%CPU