Personal tools

MapReduce as a monad

From HaskellWiki

(Difference between revisions)
Jump to: navigation, search
(New material about monad transformer)
(New material about monad transformer)
Line 1: Line 1:
  +
[[Category:Applications]][[Category:Monad]][[Category:Libraries]][[Category:Concurrency]][[Category:Parallel]][[Category:Research]]
  +
  +
==Introduction==
  +
  +
MapReduce is a general technique for massively parallel programming developed by Google. It takes its inspiration from ideas in functional programming, but has moved away from that paradigm to a more imperative approach.
  +
I have noticed that MapReduce can be expressed naturally, using functional programming techniques, as a form of monad. The standard implementation of MapReduce is the JAVA-based HADOOP framework, which is very complex and somewhat temperamental. Moreover, it is necessary to write HADOOP-specific code into mappers and reducers. My prototype library takes about 100 lines of code and can wrap generic mapper / reducer functions.
  +
  +
Having shown that we can implement MapReduce as a generalised monad, it transpires that in fact, we can generalise this still further and define a <hask>MapReduceT</hask> monad transformer, so there is a MapReduce type and operation associated to any monad. In particular, it turns out that the <hask>State</hask> monad is just the MapReduce type of the monad <hask>Hom a</hask> of maps <hask>h -> a</hask> where <hask>h</hask> is some fixed type.
  +
  +
==Initial Approach==
  +
  +
===Why a monad?===
  +
  +
What the monadic implementation lets us do is the following:
  +
*Map and reduce look the same.
  +
*You can write a simple wrapper function that takes a mapper / reducer and wraps it in the monad, so authors of mappers / reducers do not need to know anything about the MapReduce framework: they can concentrate on their algorithms.
  +
*All of the guts of MapReduce are hidden in the monad's <hask>bind</hask> function
  +
*The implementation is naturally parallel
  +
*Making a MapReduce program is trivial:<br/>
  +
<hask>
  +
... >>= wrapMR mapper >>= wrapMR reducer >>= ...
  +
</hask><br/>
  +
  +
===Details===
  +
Full details of the implementation and sample code can be found [http://jpembeddedsolutions.wordpress.com/2011/04/02/mapreduce/ here]. I'll just give highlights here.
  +
  +
====Generalised mappers / reducers====
  +
One can generalise MapReduce a bit, so that each stage (map, reduce, etc) becomes a function of signature<br/>
  +
<hask>
  +
a -> ([(s,a)] -> [(s',b)])
  +
</hask><br/>
  +
where <hask>s</hask> and <hask>s'</hask> are data types and <hask>a</hask> and <hask>b</hask> are key values.
  +
  +
====Generalised Monad====
  +
Now, this is suggestive of a monad, but we can't use a monad ''per se'', because the transformation changes the key and value types, and we want to be able to access them separately. Therefore we do the following.
  +
  +
Let <hask>m</hask> be a <hask>Monad'</hask>, a type with four parameters: <hask>m s a s' b</hask>.
  +
  +
Generalise the monadic <hask>bind</hask> operation to:<br/>
  +
<hask>
  +
m s a s' b -> ( b -> m s' b s'' c ) -> m s a s'' c
  +
</hask><br/>
  +
  +
See [http://blog.sigfpe.com/2009/02/beyond-monads.html Parametrized monads].
  +
  +
Then clearly the generalised mapper/reducer above can be written as a <hask>Monad'</hask>, meaning that we can write MapReduce as<br/>
  +
<hask>
  +
... >>= mapper >>= reducer >>= mapper' >>= reducer' >>= ...
  +
</hask>
  +
  +
====Implementation details====
  +
  +
<hask>
  +
class Monad' m where
  +
return :: a -> m s x s a
  +
(>>=) :: (Eq b) => m s a s' b -> ( b -> m s' b s'' c ) -> m s a s'' c
  +
  +
newtype MapReduce s a s' b = MR { runMR :: ([(s,a)] -> [(s',b)]) }
  +
  +
retMR :: a -> MapReduce s x s a
  +
retMR k = MR (\ss -> [(s,k) | s <- fst <$> ss])
  +
  +
bindMR :: (Eq b,NFData s'',NFData c) => MapReduce s a s' b -> (b -> MapReduce s' b s'' c) -> MapReduce s a s'' c
  +
bindMR f g = MR (\s ->
  +
let
  +
fs = runMR f s
  +
gs = P.map g $ nub $ snd <$> fs
  +
in
  +
concat $ map (\g' -> runMR g' fs) gs)
  +
</hask><br/>
  +
The key point here is that <hask>P.map</hask> is a parallel version of the simple <hask>map</hask> function.
  +
  +
Now we can write a wrapper function<br/>
  +
<hask>
  +
wrapMR :: (Eq a) => ([s] -> [(s',b)]) -> (a -> MapReduce s a s' b)
  +
wrapMR f = (\k -> MR (g k))
  +
where
  +
g k ss = f $ fst <$> filter (\s -> k == snd s) ss
  +
</hask><br/>
  +
which takes a conventional mapper / reducer and wraps it in the <hask>Monad'</hask>. Note that this means that the mapper / reducer functions ''do not need to know anything about the way MapReduce is implemented''. So a standard MapReduce job becomes<br/>
  +
<hask>
  +
mapReduce :: [String] -> [(String,Int)]
  +
mapReduce state = runMapReduce mr state
  +
where
  +
mr = return () >>= wrapMR mapper >>= wrapMR reducer
  +
</hask><br/>
  +
I have tested the implementation with the standard word-counter mapper and reducer, and it works perfectly (full code is available via the link above).
  +
  +
==The monad transformer approach==
  +
  +
Define the monad transformer type <hask>MapReduceT</hask> by:<br/>
  +
  +
<hask>
  +
newtype (Monad m) => MapReduceT m t u = MR {run :: m t -> m u}
  +
</hask>
  +
  +
with operations<br/>
  +
  +
<hask>
  +
lift :: (Monad m) => m t -> MapReduceT m t t
  +
lift x = MR (const x)
  +
  +
return :: (Monad m) => t -> MapReduceT m t t
  +
return x = lift (return x)
  +
  +
bind :: (Monad m) => MapReduceT m u u -> MapReduceT m t u -> (u -> MapReduceT m u v) -> MapReduceT m t v
  +
bind p f g = MR (\ xs -> ps xs >>= gs xs)
  +
where
  +
ps xs = (f >>> p) -< xs
  +
gs xs x = (f >>> g x) -< xs
  +
</hask>
  +
  +
where <hask> >>> </hask> and <hask> -< </hask> are the obvious arrow operations on <hask>MapeduceT</hask> types.
  +
  +
Then we show in [http://media.jpembeddedsolutions.com/pdf/mrmonad.pdf this paper] that:
  +
* <hask>MapReduce == MapReduceT []</hask> with <hask> >>= = bind nub</hask>
  +
* For a suitable choice of <hask>p</hask> the standard <hask>State</hask> monad is <hask>MapReduceT Hom</hask> where
  +
  +
:<hask>
  +
data Hom a b = H {run :: (a -> b)}
  +
  +
return x = H (const x)
  +
f >>= g = H (\ x -> g' (f' x) x)
  +
where
  +
f' = run f
  +
g' x y = run (g x) y
  +
</hask>
  +
  +
==Future Directions==
  +
  +
*My code so far runs concurrently and in multiple threads within a single OS image. It won't work on clustered systems. I have started work in this, see [[MapReduce_with_CloudHaskell|here]].
  +
*Currently all of the data is sent to all of the mappers / reducers at each iteration. This is okay on a single machine, but may be prohibitive on a cluster.
  +
  +
I would be eager for collaborative working on taking this forward.
  +
  +
[[User:Julianporter|julianporter]] 18:10, 31 October 2011 (UTC)

Revision as of 18:10, 31 October 2011


Contents

1 Introduction

MapReduce is a general technique for massively parallel programming developed by Google. It takes its inspiration from ideas in functional programming, but has moved away from that paradigm to a more imperative approach. I have noticed that MapReduce can be expressed naturally, using functional programming techniques, as a form of monad. The standard implementation of MapReduce is the JAVA-based HADOOP framework, which is very complex and somewhat temperamental. Moreover, it is necessary to write HADOOP-specific code into mappers and reducers. My prototype library takes about 100 lines of code and can wrap generic mapper / reducer functions.

Having shown that we can implement MapReduce as a generalised monad, it transpires that in fact, we can generalise this still further and define a
MapReduceT
monad transformer, so there is a MapReduce type and operation associated to any monad. In particular, it turns out that the
State
monad is just the MapReduce type of the monad
Hom a
of maps
h -> a
where
h
is some fixed type.

2 Initial Approach

2.1 Why a monad?

What the monadic implementation lets us do is the following:

  • Map and reduce look the same.
  • You can write a simple wrapper function that takes a mapper / reducer and wraps it in the monad, so authors of mappers / reducers do not need to know anything about the MapReduce framework: they can concentrate on their algorithms.
  • All of the guts of MapReduce are hidden in the monad's
    bind
    function
  • The implementation is naturally parallel
  • Making a MapReduce program is trivial:
... >>= wrapMR mapper >>= wrapMR reducer >>= ...

2.2 Details

Full details of the implementation and sample code can be found here. I'll just give highlights here.

2.2.1 Generalised mappers / reducers

One can generalise MapReduce a bit, so that each stage (map, reduce, etc) becomes a function of signature

a -> ([(s,a)] -> [(s',b)])

where
s
and
s'
are data types and
a
and
b
are key values.

2.2.2 Generalised Monad

Now, this is suggestive of a monad, but we can't use a monad per se, because the transformation changes the key and value types, and we want to be able to access them separately. Therefore we do the following.

Let
m
be a
Monad'
, a type with four parameters:
m s a s' b
. Generalise the monadic
bind
operation to:
m s a s' b -> ( b -> m s' b s'' c ) -> m s a s'' c

See Parametrized monads.

Then clearly the generalised mapper/reducer above can be written as a
Monad'
, meaning that we can write MapReduce as
... >>= mapper >>= reducer >>= mapper' >>= reducer' >>= ...

2.2.3 Implementation details

class Monad' m where
        return :: a -> m s x s a
        (>>=)  :: (Eq b) => m s a s' b -> ( b -> m s' b s'' c ) -> m s a s'' c

newtype MapReduce s a s' b = MR { runMR :: ([(s,a)] -> [(s',b)]) }

retMR :: a -> MapReduce s x s a
retMR k = MR (\ss -> [(s,k) | s <- fst <$> ss])

bindMR :: (Eq b,NFData s'',NFData c) => MapReduce s a s' b -> (b -> MapReduce s' b s'' c) -> MapReduce s a s'' c
bindMR f g = MR (\s ->
        let
                fs = runMR f s
                gs = P.map g $ nub $ snd <$> fs
        in
        concat $ map (\g' -> runMR g' fs) gs)

The key point here is that
P.map
is a parallel version of the simple
map
function.

Now we can write a wrapper function

wrapMR :: (Eq a) => ([s] -> [(s',b)]) -> (a -> MapReduce s a s' b)
wrapMR f = (\k -> MR (g k))
        where
        g k ss = f $ fst <$> filter (\s -> k == snd s) ss

which takes a conventional mapper / reducer and wraps it in the
Monad'
. Note that this means that the mapper / reducer functions do not need to know anything about the way MapReduce is implemented. So a standard MapReduce job becomes
mapReduce :: [String] -> [(String,Int)]
mapReduce state = runMapReduce mr state
        where
        mr = return () >>= wrapMR mapper >>= wrapMR reducer

I have tested the implementation with the standard word-counter mapper and reducer, and it works perfectly (full code is available via the link above).

3 The monad transformer approach

Define the monad transformer type
MapReduceT
by:
newtype (Monad m) => MapReduceT m t u = MR {run :: m t -> m u}

with operations

lift :: (Monad m) => m t -> MapReduceT m t t
lift x = MR (const x)

return :: (Monad m) => t -> MapReduceT m t t
return x = lift (return x)

bind :: (Monad m) => MapReduceT m u u -> MapReduceT m t u -> (u -> MapReduceT m u v) -> MapReduceT m t v
bind p f g = MR (\ xs -> ps xs >>= gs xs)
        where
            ps xs = (f >>> p) -< xs
            gs xs x = (f >>> g x) -< xs
where
 >>>
and
 -<
are the obvious arrow operations on
MapeduceT
types.

Then we show in this paper that:

  • MapReduce == MapReduceT []
    with
     >>= = bind nub
  • For a suitable choice of
    p
    the standard
    State
    monad is
    MapReduceT Hom
    where
data Hom a b = H {run :: (a ->  b)}

return x = H (const x)
f >>= g = H (\ x -> g' (f' x) x)
    where
        f' = run f
        g' x y = run (g x) y

4 Future Directions

  • My code so far runs concurrently and in multiple threads within a single OS image. It won't work on clustered systems. I have started work in this, see here.
  • Currently all of the data is sent to all of the mappers / reducers at each iteration. This is okay on a single machine, but may be prohibitive on a cluster.

I would be eager for collaborative working on taking this forward.

julianporter 18:10, 31 October 2011 (UTC)