<font face="verdana,sans-serif">Finally, I&#39;ve uploaded a new version of stm-conduit [1] with these combinators included. You should &quot;cabal update&quot; and then &quot;cabal install stm-conduit&quot; to get the latest version, and now you can vertically compose your sources!<br>

<br>Regards,<br>  - clark<br><br>[1] <a href="http://hackage.haskell.org/package/stm-conduit-0.2.3.0">http://hackage.haskell.org/package/stm-conduit-0.2.3.0</a><br></font><br><div class="gmail_quote">On Tue, Feb 28, 2012 at 2:58 PM, Clark Gaebel <span dir="ltr">&lt;<a href="mailto:cgaebel@csclub.uwaterloo.ca">cgaebel@csclub.uwaterloo.ca</a>&gt;</span> wrote:<br>

<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">First of all, I&#39;d probably name that operator &gt;=&lt;, since &gt;=&gt; is Kleisli composition in Control.Monad.<br>

<br>Second, you&#39;re going to need new threads for this, since you&#39;ll be reading from two sources concurrently. This isn&#39;t as big a problem as you might think, because Haskell threads are dirt cheap, orders of magnitude cheaper than pthread threads. If you&#39;re using multiple threads with conduits, I just wrote a library to help you out with that! As Michael already mentioned, stm-conduit could do this synchronization for you. This turns your &gt;=&lt; function into:<br>


<br>infixl 5 &gt;=&lt;<br>(&gt;=&lt;) :: ResourceIO m<br>      =&gt; Source m a<br>      -&gt; Source m a<br>      -&gt; ResourceT m (Source m a)<br>sa &gt;=&lt; sb = do c &lt;- liftIO . atomically $ newTMChan<br>               _ &lt;- resourceForkIO $ sa $$ sinkTMChan c<br>


               _ &lt;- resourceForkIO $ sb $$ sinkTMChan c<br>               return $ sourceTMChan c<br><br>which returns a new source, combining two sources.<br><br>This can further be generalized to combining any number of sources:<br>


<br>mergeSources :: ResourceIO m<br>             =&gt; [Source m a]<br>             -&gt; ResourceT m (Source m a)<br>mergeSources sx = do c &lt;- liftIO . atomically $ newTMChan<br>                       mapM_ (\s -&gt; resourceForkIO $ s $$ sinkTMChan c) sx<br>


                       return $ sourceTMChan c<br><br>Hope this helps somewhat,<br>  - clark<div class="HOEnZb"><div class="h5"><br><br>On Tue, Feb 28, 2012 at 11:04 AM, Alexander V Vershilov &lt;<a href="mailto:alexander.vershilov@gmail.com" target="_blank">alexander.vershilov@gmail.com</a>&gt; wrote:<br>


&gt;<br>&gt; Hello, cafe.<br>&gt;<br>&gt; Is it possible to read data from different concurrent sources,<br>&gt; i.e. read data from source as soon as it become avaliable, e.g.<br>&gt;<br>&gt;  runResourceT $ (source1 stdin $= CL.map Left)<br>


&gt;                   &gt;=&gt; (source2 handle $= CL.map Right)<br>&gt;              $= application<br>&gt;              $$ sink<br>&gt;    where &gt;=&gt; - stands for concurrent combining of sources<br>&gt;<br>&gt; It would be good if it can be sources of different types (handle or<br>


&gt; STM channel, etc..).<br>&gt;<br>&gt; Currently I&#39;ve found no good way to handle with this situation,<br>&gt; except of using STM Channels for collecting data<br>&gt;<br>&gt;   source1 ---+            |<br>&gt;              |   sink     |                       output sink<br>


&gt;              +---] Channel [-------&gt; application-----&gt;]<br>&gt;              |          source<br>&gt;   source2 ---+            |<br>&gt;<br>&gt; From this point of view application takes concurent data, but this<br>


&gt; implementation requires additional thread per data processing. Also<br>&gt; in many cases it will require run additional runResourceT (see later<br>&gt; example).<br>&gt;<br>&gt; So if there any possible simplifications? Or ideas how to make (&gt;=&gt;)<br>


&gt; operator.<br>&gt;<br>&gt; Example:<br>&gt;<br>&gt; So I&#39;ve got next code in my network-conduit based application:<br>&gt;<br>&gt;   main :: IO ()<br>&gt;   main = do<br>&gt;     pool &lt;- createDBPool &quot;...&quot; 10<br>


&gt;     let r = ServerInit pool<br>&gt;     forkIO $ forever clientConsole --read channel list and send &quot;Left&quot;<br>&gt;     flip runReaderT r $<br>&gt;       runTCPServer (ServerSettings 3500 Nothing) (protoServer)<br>


&gt;<br>&gt;   myServer src sink = do<br>&gt;    ch &lt;- liftIO $ atomically $ newTBMChan 16<br>&gt;    initState &lt;- lift $ ask<br>&gt;    _  &lt;- liftIO $ fork . (flip runReaderT initState) $<br>&gt;                   runResourceT $ src $= C.sequence decode<br>


&gt;                                      $= CL.map Right $$ sinkTBMChan ch<br>&gt;    sourceTBMChan ch<br>&gt;                 $= process $= C.sequence encode $$ sinkHandle stdout<br>&gt;<br>&gt; But in this situation I don&#39;t know if freeing of all resources are guaranteed,<br>


&gt; because I&#39;m running additional resourceT in main resourceT scope.<br>&gt;<br>&gt; So can you advice is it possible to make concurrent sources now with currenly<br>&gt; implemented library?<br>&gt; If it&#39;s not possible but worth of implementing, so I can make that functions?<br>


&gt; Is it correct to runResourceT inside another resourceT?<br>&gt;<br>&gt; --<br>&gt; Best regards,<br>&gt;   Alexander V Vershilov<br>&gt;<br></div></div><div class="HOEnZb"><div class="h5">&gt; _______________________________________________<br>

&gt; Haskell-Cafe mailing list<br>
&gt; <a href="mailto:Haskell-Cafe@haskell.org" target="_blank">Haskell-Cafe@haskell.org</a><br>&gt; <a href="http://www.haskell.org/mailman/listinfo/haskell-cafe" target="_blank">http://www.haskell.org/mailman/listinfo/haskell-cafe</a><br>

&gt;<br>
</div></div></blockquote></div><br>