[Haskell-cafe] How to increase performance using concurrency for sequential producer-consumer problem

Mario Blažević blamario at acanac.net
Tue Feb 14 06:36:30 CET 2012


On 12-02-13 10:12 AM, Roel van Dijk wrote:
> Hello,
>
> I have a program which I believe can benefit from concurrency. But I
> am wondering if the mechanisms I need already exist somewhere on
> Hackage.

     You can try monad-coroutine. Here is an incomplete transcription of 
your code:

import Control.Monad
import Control.Monad.Coroutine
import Control.Monad.Coroutine.SuspensionFunctors
import Control.Monad.Trans.Class

type Producer a = Coroutine (Yield a) IO ()
type Converter a b = Coroutine (EitherFunctor (Await a) (Yield b)) IO ()
type Consumer b = Coroutine (Await b) IO ()

producer :: Producer Int
producer = forM_ [1..10] yield

converter :: Converter Int Int
converter =  forever $ do a <- mapSuspension LeftF await
                           mapSuspension RightF $ yield (10 * a)

consumer :: Consumer Int
consumer = forever (await >>= lift . print)

main = seesaw parallelBinder awaitYieldResolver consumer producer

     The main function is incorrect because it's not using the 
converter. I didn't add that because there's no ready function for 
invoking it in the package so it wouldn't be a one-liner, and also 
because I'm not sure if it should be a monadic coroutine of its own or a 
pure function.

     The producer and consumer coroutines' steps should be running in 
parallel. I say "should" because I never got any decent parallel speedup 
with this scheme, but I'm not sure if the problem is in the library code 
or in the earlier versions of GHC.

     One thing that's missing here is a way for the producer to run 
ahead of the consumer and produce more results that would be buffered. 
At the moment the producer blocks until the consumer is ready. I don't 
have a ready solution for this, but I'll give it some thought.


>
> Here is a sketch of my program, in literate Haskell:
>
>> module Problem where
>> import Control.Monad ( forM_ )
> The producer produces values. It blocks until there are now more
> values to produce. Each value is given to a callback function.
>
>> type Producer a = (a ->  IO ()) ->  IO ()
> The converter does some work with a value. This work is purely CPU and
> it is the bottleneck of the program. The amount of work it has to do
> is variable.
>
>> type Converter a b = a ->  b
> The consumer does something with the value calculated by the
> converter. It is very important that the consumer consumes the values
> in the same order as they are produced.
>
>> type Consumer b = b ->  IO ()
> Dummy producer, converter and consumer:
>
>> producer :: Producer Int
>> producer callback = forM_ [1..10] callback
>> converter :: Converter Int Int
>> converter = (*10)
>> consumer :: Consumer Int
>> consumer = print
> A simple driver. Does not exploit concurrency.
>
>> simpleDriver :: Producer a ->  Converter a b ->  Consumer b ->  IO ()
>> simpleDriver producer converter consumer = producer (consumer . converter)
>> current_situation :: IO ()
>> current_situation = simpleDriver producer converter consumer
> Ideally I would like a driver that spawns a worker thread for each
> core in my system. But the trick is in ensuring that the consumer is
> offered results in the same order as they are generated by the
> producer.
>
> I can envision that some kind of storage is necessary to keep track of
> results which can not yet be offered to the consumer because it is
> still waiting for an earlier result.
>
> Is there any package on Haskell that can help me with this problem? Or
> do I have to implement it using lower level concurrency primitives?
>
> Regards,
> Roel
>
> _______________________________________________
> Haskell-Cafe mailing list
> Haskell-Cafe at haskell.org
> http://www.haskell.org/mailman/listinfo/haskell-cafe




More information about the Haskell-Cafe mailing list