<div dir="ltr"><br><div class="gmail_extra"><br><br><div class="gmail_quote">On Thu, Jan 31, 2013 at 11:48 AM, Simon Marechal <span dir="ltr">&lt;<a href="mailto:simon@banquise.net" target="_blank">simon@banquise.net</a>&gt;</span> wrote:<br>

<blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left-width:1px;border-left-color:rgb(204,204,204);border-left-style:solid;padding-left:1ex">Hello,<br>
<br>
        I have found the Conduit abstraction to be very well suited to a set of<br>
problems I am facing. I am however wondering how to implement<br>
&quot;branching&quot; conduits, and even conduit pools.<br>
<br>
        I am currently in the process of rewriting parts (the simple parts) of<br>
the Logstash tool. There is a sample program that I use here:<br>
<br>
<a href="https://github.com/bartavelle/hslogstash/blob/deprecateUtils/examples/RedisToElasticsearch.hs" target="_blank">https://github.com/bartavelle/hslogstash/blob/deprecateUtils/examples/RedisToElasticsearch.hs</a><br>


<br>
        As it can be seen, it uses a &quot;Redis&quot; source, a conduit that decodes the<br>
JSON ByteString into a LogstashMessage, a conduit that stores it into<br>
Elasticsearch and outputs the result of that action as an Either, and<br>
finally a sink that prints the errors.<br>
<br>
        My problem is that I would like more complex behaviour. For example, I<br>
would like to route messages to another server instead of putting them<br>
into Elasticsearch when the LogstashMessage has some tag set. But this<br>
is just an example, and it is probable I will want much more complex<br>
behavior soon.<br>
<br>
        I am not sure how to proceed from here, but have the following ideas:<br>
<br>
 * investigate how the Conduits are made internally to see if I can<br>
create a operator similar to $$, but that would have a signature like:<br>
        Source m (Either a b) -&gt; Sink a m r -&gt; Sink b m r<br>
and would do the branching in a binary fashion. I am not sure this is<br>
even possible.<br>
<br>
 * create a &quot;mvars&quot; connectors constructor, which might have a signature<br>
like this:<br>
<br>
 Int -- ^ branch count<br>
 (LogstashMessage -&gt; Int) -- ^ branching function<br>
 (Sink LogstashMessage m (), [Source m LogstashMessage])<br>
 -- ^ a suitable sink, several sources for the other conduits<br>
<br>
 it would internally create a MVar (Maybe LogstashMessage) for each<br>
branch, and put putMVar accordingly to the branching function. When the<br>
Conduit is destroyed, it will putMVar Nothing in all MVars.<br>
 the sources would takeMVar, check if it is Nothing, or just proceed as<br>
expected.<br>
<br>
 The MVar should guarantee the constant space property, but there is the<br>
risk of inter branch blocking when one of the branches is significantly<br>
slower than the others. It doesn&#39;t really matter to me anyway. And all<br>
the branch Sinks would have to have some synchronization mechanism so<br>
that the main thread waits for them (as they are going to be launched by<br>
a forkIO).<br>
<br>
<br>
<br>
  This is the simplest scheme I have thought of, and it is probably not<br>
a very good one. I am very interested in suggestions here.<br>
<br>
_______________________________________________<br>
Haskell-Cafe mailing list<br>
<a href="mailto:Haskell-Cafe@haskell.org">Haskell-Cafe@haskell.org</a><br>
<a href="http://www.haskell.org/mailman/listinfo/haskell-cafe" target="_blank">http://www.haskell.org/mailman/listinfo/haskell-cafe</a><br>
</blockquote></div><br></div><div class="gmail_extra"><br></div><div class="gmail_extra" style>Hi Simon,</div><div class="gmail_extra" style><br></div><div class="gmail_extra" style>For your first approach, I think what you&#39;re looking to do is combine two Sinks together, something like:</div>

<div class="gmail_extra" style><br></div><div class="gmail_extra" style><div class="gmail_extra">combine :: Monad m</div><div class="gmail_extra">        =&gt; Sink i1 m r1</div><div class="gmail_extra">        -&gt; Sink i2 m r2</div>

<div class="gmail_extra">        -&gt; Sink (Either i1 i2) m (r1, r2)</div><div><br></div><div style>Then you&#39;d be able to use the standard $$ and =$ operators on it. I&#39;ve put up an example implementation here[1]. The majority of the code is simple pattern matching on the different possible combination, but some things to point out:</div>

<div style><br></div><div style>* To simplify, we start off with a call to injectLeftovers. This means that we can entirely ignore the Leftover constructor in the main function.</div><div style>* Since a Sink will never yield values, we can also ignore the HaveOutput constructor.</div>

<div style>* As soon as either of the Sinks terminates, we terminate the other one as well and return the results.</div><div style><br></div><div style>You can also consider going the mutable container route if you like. Instead of creating a lot of stuff from scratch with MVars, you could use stm-conduit[2]. In fact, that package already contains some kind of merging behavior for sources, it might make sense to ask the author about including unmerging behavior for Sinks.</div>

<div style><br></div><div style>Michael</div><div style><br></div><div style>[1] <a href="https://gist.github.com/4682609">https://gist.github.com/4682609</a></div><div style>[2] <a href="http://hackage.haskell.org/package/stm-conduit">http://hackage.haskell.org/package/stm-conduit</a></div>

</div></div>