[Haskell-cafe] Ideas for better network IO in Combinatorrent?

Jesper Louis Andersen jesper.louis.andersen at gmail.com
Sat May 8 05:59:24 EDT 2010


Hi, Cafe!

(This post got rather long, and in writing it, I probably answered
some of my own questions. I'll still post it because it might be
interesting to read for somebody).

This post concerns the currently central problem in Combinatorrent.
But the problem is interesting from another angle. If you want fast
servers in Haskell, I am pretty sure you will need to crack this nut
as well. Status is that my Erlang-client, which handles all IO by hand
in Erlang uses much less CPU resources than the Combinatorrent client
in Haskell. Especially under load. Productivity is usually around
70-75% so we spend some considerable time in the GC still. It is up
from some 30%, so we are faring much better than earlier. Still, we
use about 4 times as many CPU seconds per downloaded megabyte compared
to Erlang. And it is not like the Erlang code is optimized: The parser
loop is Erlang and not using the C drivers packeting facility.

(Aside: For non-Erlangers -- In erlang you can set options on a TCP
socket about how you want data to be delivered. There are several
(C-hand-optimized protocol parsers for ASN.1, FastCGI, or even HTTP),
but one of them designates that the TCP stream will receive data in
the format of a 4 byte big endian integer header, the length, followed
by the payload of that length. By setting the option {packet, 4} on
the TCP link, you can "outsource" this packeting to the Erlang VM
kernel in C and receive a message (in a process channel) whenever such
a message arrives.)

In Erlang I don't use it because it gives too erratic speed
measurements. I need to know how fast I am communicating because it is
used in the BitTorrent protocol to gauge which peers it is interesting
to talk to. Rather, I run a handrolled parser loop, doing exactly the
thing from above. Essentially the loop collects data from the socket
in what is equivalent to a Lazy ByteString. When the LBS is full, it
is converted to a Strict ByteString and handed on to the parser. While
doing this measurements are done on the size of incoming data from the
operating system. The parser is fairly naive, but it uses the
Erlang-bit-syntax pattern matches to essentially decode the binary
chunk with a pointer walking over it.

Note that The largest packet type is 16 kilobytes + a_small_constant.
Rather than reallocate the binary, the Erlang system just stores a
Slice-triple (u, o, l), of the underlying byte array, u, the offset
into that array, o and the length from the offset, l. When the slice
disappears, the garbage collector can evict the binary. Binaries are
ref-counted since they can not contain pointers and they are stored
outside the process heap. This has to do with the Erlang VMs garbage
collection strategy: Each Erlang process has its own GC and all
message passing happens by copying. Since copying around large chunks
of binary data is expensive, they are stored immutably and shared by
all processes.

It is fairly efficient. There is only one major copy going on when
assembling the LBS into a Strict BS and that copy knows the exact size
of the target allocation area, so it can just allocate the right size
and then copy data into that area.

Enough Erlang, what does Combinatorrent do?

Combinatorrents receiver processes (There are one for each Peer we are
communicating with) currently takes 46% of all allocation on their
own. Why? Here is why. Our main loop in the receiver process looks
like this (explanation below the source code, read it first):

    bs <- liftIO $ recv s 2048
    when (B.length bs == 0) stopP
    loop s c (A.parse getMsg bs)

  where loop s c (A.Done r msg) = do
            liftIO . atomically $ writeTChan c (FromPeer (msg,
fromIntegral $ msgSize msg))
            loop s c (A.parse getMsg r)
        loop s c (prt@(A.Partial _)) = do
            bs <- liftIO $ recv s 4096
            when (B.length bs == 0) stopP
            loop s c (A.feed prt bs)
        loop _ _ (A.Fail _ ctx err) =
                    do warningP $ "Incorrect parse in receiver, context: "
                                        ++ show ctx ++ ", " ++ show err
                       stopP

[full code at: http://github.com/jlouis/combinatorrent/blob/master/src/Process/Peer/Receiver.hs#L36
]

The 'A' module you see is Bryan O'Sullivan's attoparsec package. This
parser is incremental. Either it returns A.Done r msg, where msg is a
decoded message and r is the rest of the input not parsed yet. Or, it
returns a Partial parse where it demands more input to successfully
parse. If you feed that continuation with more data, it happily
continues on. The 'recv' you see is from Johan Tibell's
network-bytestring package. It will block on the Socket s until data
arrives. Then it will fill up to 4096 bytes into the strict bytestring
bs. We use this to feed more data to attoparsec.

One problem is that usually, we are so fast that the packet is only
some 1448 bytes which make sense: the MTU of the ethernet wire is 1500
bytes and sizeable chunks are taken by TCP/IP and also the internet
routers PPPoE/A. This means the bytestring is trimmed down most of the
time, incurring a copy. BitTorrent connections are either dead-slow,
only transferring a few bytes each second on average, to dead-fast,
easily filling up the 4k window. I've toyed with the idea of an
adapting receiver window size, but it is peanuts compared to the next
problem it seems.

The next problem is what attoparsec ends up doing. The relevant parser code is:

getAPMsg :: Int -> Parser Message
getAPMsg l = do
    c <- A.anyWord8
    case c of
        0  -> return Choke
        1  -> return Unchoke
...
        6  -> (Request <$> apW32be <*> (Block <$> apW32be <*> apW32be))
        7  -> (Piece <$> apW32be <*> apW32be <*> A.take (l - 9))
...
        k  -> fail $ "Illegal parse, code: " ++ show k

[full code at: http://github.com/jlouis/combinatorrent/blob/master/src/Protocol/Wire.hs#L178
]

Where the attoparsec parser first reads of a character, c, designating
the message type of the parse. Then it handles the message type by
returning an ADT of the parse. The killer is A.take (l -9). If we look
into the attoparsec source code, we see a call to its 'ensure'
function. This function ensures we have enough data available for
parsing. If not, it demands more input through A.Partial results. When
it gets input it concatenates with '+++' which is really the 'append'
function of Strict ByteStrings which are O(n) and destroys the
allocation rate of the application :)

My current plan for a solution is this: Read the socket and gather up
chunks for a lazy bytestring. We know, due to the 32bit BE header how
many bytes more to expect. When we have all of them, do the Strict BS
conversion and hand that to attoparsec. It means no +++ call inside
attoparsec. Furthermore I get a SBS.splitAt, yielding the
slice-construction I am longing for. But are there any ideas out there
for doing it even better than this? Conjure by David Himmelstrup
removes the Lazy -> Strict conversion as well by preallocating the
Strict ByteString and filling it directly, but I think I can live with
the simpler solution for now. The only important thing currently is to
get attoparsec happy, without feeding it sangria or beer :)

In any event, there is food for thought on some possible extensions of
network-bytestring I think. Another possibility is going back to
Handle's as they have a buffer construction, but Handle's had even
worse CPU resource usage if I remember correctly. I have not analyzed
that problem to this depth however.

I have also considered John Lato's iteratee's, but we are sorely
lacking an iteratee 101 introduction. Can iteratees parse infinite
streams for instance?

-- 
J.


More information about the Haskell-Cafe mailing list