# MapReduce as a monad

### From HaskellWiki

(Difference between revisions)

(→Generalised Monad) |
Julianporter (Talk | contribs) (New material about monad transformer) |
||

Line 1: | Line 1: | ||

− | [[Category:Applications]][[Category:Monad]][[Category:Libraries]][[Category:Concurrency]][[Category:Parallel]][[Category:Research]] |
||

− | |||

− | ==Introduction== |
||

− | |||

− | MapReduce is a general technique for massively parallel programming developed by Google. It takes its inspiration from ideas in functional programming, but has moved away from that paradigm to a more imperative approach. |
||

− | I have noticed that MapReduce can be expressed naturally, using functional programming techniques, as a form of monad. |
||

− | |||

− | The standard implementation of MapReduce is the JAVA-based HADOOP framework, which is very complex and somewhat temperamental. Moreover, it is necessary to write HADOOP-specific code into mappers and reducers. My prototype library takes about 100 lines of code and can wrap generic mapper / reducer functions. |
||

− | |||

− | ==Why a monad?== |
||

− | |||

− | What the monadic implementation lets us do is the following: |
||

− | *Map and reduce look the same. |
||

− | *You can write a simple wrapper function that takes a mapper / reducer and wraps it in the monad, so authors of mappers / reducers do not need to know anything about the MapReduce framework: they can concentrate on their algorithms. |
||

− | *All of the guts of MapReduce are hidden in the monad's <hask>bind</hask> function |
||

− | *The implementation is naturally parallel |
||

− | *Making a MapReduce program is trivial:<br/> |
||

− | <hask> |
||

− | ... >>= wrapMR mapper >>= wrapMR reducer >>= ... |
||

− | </hask><br/> |
||

− | |||

− | ==Details== |
||

− | Full details of the implementation and sample code can be found [http://jpembeddedsolutions.wordpress.com/2011/04/02/mapreduce/ here]. I'll just give highlights here. |
||

− | |||

− | ===Generalised mappers / reducers=== |
||

− | One can generalise MapReduce a bit, so that each stage (map, reduce, etc) becomes a function of signature<br/> |
||

− | <hask> |
||

− | a -> ([(s,a)] -> [(s',b)]) |
||

− | </hask><br/> |
||

− | where <hask>s</hask> and <hask>s'</hask> are data types and <hask>a</hask> and <hask>b</hask> are key values. |
||

− | |||

− | ===Generalised Monad=== |
||

− | Now, this is suggestive of a monad, but we can't use a monad ''per se'', because the transformation changes the key and value types, and we want to be able to access them separately. Therefore we do the following. |
||

− | |||

− | Let <hask>m</hask> be a <hask>Monad'</hask>, a type with four parameters: <hask>m s a s' b</hask>. |
||

− | |||

− | Generalise the monadic <hask>bind</hask> operation to:<br/> |
||

− | <hask> |
||

− | m s a s' b -> ( b -> m s' b s'' c ) -> m s a s'' c |
||

− | </hask><br/> |
||

− | |||

− | See [http://blog.sigfpe.com/2009/02/beyond-monads.html Parametrized monads]. |
||

− | |||

− | Then clearly the generalised mapper/reducer above can be written as a <hask>Monad'</hask>, meaning that we can write MapReduce as<br/> |
||

− | <hask> |
||

− | ... >>= mapper >>= reducer >>= mapper' >>= reducer' >>= ... |
||

− | </hask> |
||

− | |||

− | ===Implementation details=== |
||

− | |||

− | <hask> |
||

− | class Monad' m where |
||

− | return :: a -> m s x s a |
||

− | (>>=) :: (Eq b) => m s a s' b -> ( b -> m s' b s'' c ) -> m s a s'' c |
||

− | |||

− | newtype MapReduce s a s' b = MR { runMR :: ([(s,a)] -> [(s',b)]) } |
||

− | |||

− | retMR :: a -> MapReduce s x s a |
||

− | retMR k = MR (\ss -> [(s,k) | s <- fst <$> ss]) |
||

− | |||

− | bindMR :: (Eq b,NFData s'',NFData c) => MapReduce s a s' b -> (b -> MapReduce s' b s'' c) -> MapReduce s a s'' c |
||

− | bindMR f g = MR (\s -> |
||

− | let |
||

− | fs = runMR f s |
||

− | gs = P.map g $ nub $ snd <$> fs |
||

− | in |
||

− | concat $ map (\g' -> runMR g' fs) gs) |
||

− | </hask><br/> |
||

− | The key point here is that <hask>P.map</hask> is a parallel version of the simple <hask>map</hask> function. |
||

− | |||

− | Now we can write a wrapper function<br/> |
||

− | <hask> |
||

− | wrapMR :: (Eq a) => ([s] -> [(s',b)]) -> (a -> MapReduce s a s' b) |
||

− | wrapMR f = (\k -> MR (g k)) |
||

− | where |
||

− | g k ss = f $ fst <$> filter (\s -> k == snd s) ss |
||

− | </hask><br/> |
||

− | which takes a conventional mapper / reducer and wraps it in the <hask>Monad'</hask>. Note that this means that the mapper / reducer functions ''do not need to know anything about the way MapReduce is implemented''. So a standard MapReduce job becomes<br/> |
||

− | <hask> |
||

− | mapReduce :: [String] -> [(String,Int)] |
||

− | mapReduce state = runMapReduce mr state |
||

− | where |
||

− | mr = return () >>= wrapMR mapper >>= wrapMR reducer |
||

− | </hask><br/> |
||

− | I have tested the implementation with the standard word-counter mapper and reducer, and it works perfectly (full code is available via the link above). |
||

− | |||

− | ==Future Directions== |
||

− | |||

− | *My code so far runs concurrently and in multiple threads within a single OS image. It won't work on clustered systems. This is clearly where work should go next. |
||

− | *Currently all of the data is sent to all of the mappers / reducers at each iteration. This is okay on a single machine, but may be prohibitive on a cluster. |
||

− | |||

− | I would be eager for collaborative working on taking this forward. |
||

− | |||

− | [[User:Julianporter|julianporter]] 18:32, 2 April 2011 (UTC) |