Patch 4 of 6 for ticket #3160
Simon Marlow
marlowsd at gmail.com
Thu Apr 21 09:59:38 CEST 2011
Many thanks, I'll try to work through this in due course (might not be
for a few days or so though). In the meantime, the patches you sent to
the list don't seem to apply for me, I think something has wrapped long
lines somewhere. Perhaps you could resend, including the patches as
attachments rather than in the email body?
Cheers,
Simon
On 20/04/2011 17:49, Chris Kuklewicz wrote:
> I wanted someone to review the QSem code, and now I have gotten my wish...
>
> Warning: this is a long e-mail.
>
> On 20/04/2011 10:23, Simon Marlow wrote:
>> On 19/04/2011 15:26, Chris Kuklewicz wrote:
>>> This is a patch against base. This replaces the implementation of
>>> Control.Concurrent.QSem
>>>
>>> From 21a539a7adceee253cd8602702dcbd3863f5bd55 Mon Sep 17 00:00:00 2001
>>> Subject: [PATCH 1/3] Replace QSem.hs in base
>>
>> Thanks for the patches. I'm quite surprised, and a little worried, at
>> how complex the new versions are, and it's hard for me to verify their
>> correctness. I appreciate the effort you've gone to with commenting the
>> changes, but I didn't get a sense for the rationale, e.g.
>
> Now you will get your wish for a rationale, a long rationale...
>
>>
>> +{- Note [headWait]
>> +~~~~~~~~~~~~~~~~~~
>> +The head of the waiter queue blocks on headWait. The unit () is not
>> +just used to block and wake, but represents a tangible unit of value of
>> +the semaphore. Thus 'signalQSem' can succeed in adding a unit of
>> +quantity by putting a () into headWait.
>> +
>> +The real available total is the Int in qSem plus 1 if the headWait MVar
>> +is full. There are two legal states of the QSem when it has positive
>> +quantity (and one legal state if negative or zero). All the quantity
>> +may be in the Int in qSem and headWait empty, or one less than that in
>> +qSem and headWait full.
>> +-}
>>
>> this doesn't tell me *why* you need to have something called headWait
>> with a "tangible unit of value" (and what does that mean, incidentally?).
>
> I will answer "what" here and "why" below.
>
> In MSem (perhaps not pushed to SafeSemaphore on hackage yet) I have a
> function that asks the semaphore for the available quantity, all the
> "tangible unit of value" stored in the semaphore. It works like this:
>
> peekAvail q = mask_ $ withMVar (qSem q) $ \ ms -> do
> extraFlag<- tryTakeMVar (headWait q)
> case extraFlag of
> Nothing -> return ms
> Just () -> do putMVar (headWait q) () -- cannot block
> return $! succ ms
>
> If it finds a () in headWait then it puts this back and adds one to the value
> that was stored in qSem. This is why I say that there are two states of the
> semaphore that could lead to peekAvail returning a given number like 5. One
> way is headWait is empty and mSem holds 5, the second way is headWait hold ()
> and mSem holds 4. This design actually makes little different to signalQSem
> and makes waitQSem less complicated, as explained at the bottom of this
> e-mail.
>
>> Experience shows that it's very easy to get things wrong in these
>> implementations, so we should be very careful here.
>
> I totally agree. So far I can say that the new implementation passes the old
> tests. And I have added a 3 new tests for ticked #3160, one each for
> QSem,QSemN,SampleVar that the old implementations fail and the new
> implementations pass.
>
> We may want to add more tests to increase confidence. (With 'peekAvail' some
> unit tests would be much easier to write.)
>
> The length of each function is quite short, but their interaction is more
> complex. I will make this e-mail quite long to be as thorough as possible in
> explaining and justifying the design.
>
>> A good way to start would be to explain why we can't just use
>> modifyMVar_ (et.al.) in the usual way to make QSem safe, for example.
>
> Short answer: there is an MVar that is used exactly like that.
>
> I shall compare and contrast QSemN and QSem, and leave SampleVar for another
> time.
>
> The old QSem/QSemN were newtypes around a single MVar. This serialized all
> initial access to the semaphore. Unsatisfied waiters would create a fresh
> MVar () to block on and store this in the list. The signal actions would
> first service this list before incrementing the stored Int.
>
> The new QSem/QSemN use 3 MVars. These are created when the 'new*' operation
> is executed, and after that no new MVars are created.
>
> The three MVars are (with QSemNQuant for clarity):
>
> data QSem = QSem { qSem :: !(MVar Int)
> , queueWait :: !(MVar ())
> , headWait :: !(MVar ())
> }
> data QSemN = MSemN { qSemN :: !(MVar QSemNQuant)
> , queueWait :: !(MVar ())
> , headWait :: !(MVar ())
> }
> data QSemNQuant = MS { avail :: !Int
> , headWants :: !(Maybe Int)
> }
>
> I could have stored all of these in a 4th MVar, or perhaps stored some of the
> 3 MVars inside other ones. But these other choices would only have made the
> code longer, with no actual helpful effect. In fact, I once had 'headWait'
> stored inside 'qSem' and 'qSemN' but this only made the code less clear IMHO.
>
> There is a need to service all 'waitQSem*' in FIFO order. To do this all
> 'waitQSem' is done inside an outermost withMVar (queueWait _) lock:
>
> waitQSem q = mask_ . withMVar (queueWait q) $ \ () ->
>
> waitQSemN q wanted = mask_ . withMVar_ (queueWait q) $ \ () -> do
>
> If there are concurrent waiters, only the head waiter may hold the () from
> queueWait. All others must wait in FIFO order until those waiters in front
> of it are completely done. This makes the new QSem safer than the old: if
> any of these blocked on queueWait thread is killed then that thread has had
> zero interaction with the semaphore, and the semaphore is undisturbed.
>
> If the head waiter exits for any reason then withMVar_ restores () to
> (queueWait _) and the next waitQSem* will proceed automatically.
>
> Sometimes the head waiter, the one holding queueWait, needs to block and wait
> for signal to deliver enough quantity. The head waiter will block by
> takeMVar on the third MVar called 'headWait'. Only this one thread can be
> holding the queueWait and only this thread ever tries to take from headWants.
> The signalQSem* operation may eventually wake this one thread by a put into
> 'headWait'.
>
> Before deciding what to do, the head waiter also takes what you would
> reconize as the "usual way to make QSem safe" by calling 'modifyMVar (qSem*
> q)', which is what 'signalQSem*' also does:
>
> waitQSem q = mask_ . withMVar (queueWait q) $ \ () -> -- Note [mask_]
> id =<< (modifyMVar (qSem q) $ \ qs -> do -- Note [id/join]
> signalQSem q = uninterruptibleMask_ . modifyMVar_ (qSem q) $ \ qs -> do
>
> waitQSemN q wanted = mask_ . withMVar_ (queueWait q) $ \ () -> do -- Note [mask_]
> mustWait<- modifyMVar (qSemN q) $ \ ms -> do
> signalQSemN q size = uninterruptibleMask_ . modifyMVar_ (qSemN q) $ \ ms -> do
>
> So now only the head waiter and the signal commands are trying to get
> concurrent access to qSem(qSemN). This is what makes access to the semaphore
> quantity thread safe.
>
> == QSemN ==
>
> I wrote the new QSemN first. Then I wrote a specialized design for QSem.
> Let me explain the QSemN first:
>
> waitQSemN q wanted = mask_ . withMVar (queueWait q) $ \ () -> do -- Note [mask_]
> mustWait<- modifyMVar (qSemN q) $ \ ms -> do
> -- assert that headWants is Nothing (from prior 'new' or 'signal' or 'cleanup')
> if wanted<= avail ms
> then do
> let avail'down = avail ms - wanted -- avail'down is never negative, barring overflow
> ms'<- evaluate ms { avail = avail'down }
> return (ms', False)
> else do
> ms'<- evaluate ms { headWants = Just wanted }
> return (ms', True)
> when mustWait $ do
> let cleanup = uninterruptibleMask_ $ modifyMVar_ (qSemN q) $ \ms -> do
> mStale<- tryTakeMVar (headWait q)
> let avail' = avail ms + maybe 0 (const wanted) mStale
> evaluate ms { avail = avail', headWants = Nothing }
> takeMVar (headWait q) `onException` cleanup
>
> The modifyMVar_ (qSemN q) code has two outcomes from the if. Either there is
> enough value in (avail ms) and no need to block, or there is not enough value
> and the head waiter needs to block. In the blocking case 'headWants' is set
> to "Just wanted" to let signalQSemN know when it should wake the head waiter.
>
> The 'waitQSem' must then release the modifyMVar_ (qSemN _) lock and check the
> False or True value of mustWait. When blocking this is True and (takeMVar
> (headWait q)) is called. Please ignore 'cleanup' for the moment.
>
> Do notice that (takeMVar headWait) is done while holding queueWait so other
> waitQSemN are still blocked. Do notice that (takeMVar headWait) is done
> while (qSemN) is no longer held and thus 'signalQSemN' can proceed. Do
> notice that (Just wanted) is put in qSemN if and only if waitQSemN will be
> taking from the headWait MVar.
>
> Now consider the operation of signalQSemN:
>
> signalQSemN q size = uninterruptibleMask_ . modifyMVar_ (qSemN q) $ \ ms -> do
> avail'<- evaluate $ avail ms + size
> case headWants ms of
> Just wanted | wanted<= avail' -> do
> putMVar (headWait q) () -- cannot block
> let avail'down = avail' - wanted -- avail'down is never negative, barring overflow
> evaluate ms { avail = avail'down, headWants = Nothing }
> _ -> evaluate ms { avail = avail' }
>
> This locks via modifyMVar_ (qSemN q) as emphasized above. It stricly
> computes the new available quantity. It checks if there is a blocked
> waitQSemN that can now be serviced. If this is not true then it updates the
> available quantity and returns. If it needs to service the blocked waiter
> then it sends () to headWait to unblock it, it computes a reduced avaiable
> quantity, and updates both the available quantity and it sets headWants to
> Nothing before unlocking qSemN.
>
> Let me strongly emphasize that headWants has been reset to Nothing here. The
> value of headWants is born Nothing. From Nothing it can be set to "Just
> wanted" in waitQSemN and then back to Nothing in signalQSemN. In fact it
> MUST be set from "Just wanted" back to Nothing before the next call to
> waitQSemN will be able to set it to "Just wanted" again.
>
> Lets say there is a semaphore with 0 value available and there are 100 thread
> wanting a quantity of 1 all blocked. The lead thread has blocked after
> setting headWants to "Just 1". Now 'signalQSemN q 100' is called. This
> wakes the first thread and puts 99 into available and sets headWants to
> Nothing. The head waiter wakes up and returns, and then 99 threads proceed
> one at a time to to deduct 1 from available and return. This is the wake-one
> semantics of the queueWait MVar.
>
> The 'cleanup' in waitQSemN detects when the blocked thread waiting on
> (takeMVar (headWait q)) has been interrupted. In this case it must set
> headWants back to Nothing and see if a concurrent signalQSemN has signaled
> (headWait q) in the meantime to clean up this possible race.
>
> I ought to emphasize this more: This new design for QSemN is slightly
> different than the old QSemN, in that all requests from waitQSemN will now be
> served in FIFO order regardless of requested quantity. This difference does
> NOT apply to QSem.hs. In the old QSemN a blocked request for a very large
> quantitiy will be skipped by signalQSemN and may end up starving forever.
> This old behavior was not documented either way in the old QSemN; the new
> QSemN documents the new behavior.
>
> Let me present an example that wants the new non-starving behavior: I
> allocate a semaphore with 10 units and various thread take a single unit when
> executing some critical section code. Now a master thread calls 'waitQSemN q
> 10', thus when this succeeds I know that none of the other threads are in the
> critical section. With the old QSemN a busy system would probably never
> unblock the master thread due to the starvation. In this example the old
> design still has progress guarantees but allows starvation (like STM), the
> new design is has a fair FIFO guarantee (like MVar).
>
> I think I could finish my attempted version of QSemN that has the old
> behavior of starving large requests and still fixes ticket #3160, but the
> code to do this is very complicated as I have to maintain a list of MVars
> (like the old code) in the face of interrupted waiting threads.
>
> == QSem ==
>
> One way to derive QSem from is by just using QSemN with the quantity passed
> set to 1. I could even specialize this QSem, e.g. headWants of "Just 1" or
> Nothing can be replaced by True or False.
>
> But I realized this headWants boolean flag was not actually needed for QSem.
> The 'signalQSem' does not need to know if there is a blocked waiter since all
> waiters want a single unit.
>
> The 'signalQSem' is supposed to increment the quantity. It first tries to
> put () into the headWait MVar. If this succeeds then signalQSem considers
> itself to be done. If this fails then signalQSem increments the stored Int:
>
> signalQSem q = uninterruptibleMask_ . modifyMVar_ (qSem q) $ \ qs -> do
> -- Nothing below can block
> if qs< 0
> then return $! succ qs
> else do
> didPlace<- tryPutMVar (headWait q) () -- guarded by holding qSem
> if didPlace
> then return qs
> else return $! succ qs
>
> waitQSem q = mask_ . withMVar (queueWait q) $ \ () -> -- Note [mask_]
> id =<< (modifyMVar (qSem q) $ \ qs -> do -- Note [id/join]
> -- Nothing below can block
> mayGrab<- tryTakeMVar (headWait q) -- guarded by holding queueWait& qSem
> case mayGrab of
> Just () -> return (qs,return ())
> Nothing -> if 1<= qs
> then let qs' = pred qs
> in seq qs' $
> return (qs', return ())
> else return ( qs
> , takeMVar (headWait q) ) -- guarded by holding queueWait but not qSem
> )
>
> In waitQSem the first thing to check is tryTakeMVar (headWait q). If this
> this succeeds then waitQSem considers itself to be done. If this fails then
> it checks the value it got from (qSem q), decrementing this if it is positive
> and returning. If this is not positive then waitQSem must drop the
> (modifyMVar_ (qSem q)) lock and then block with (takeMVar (headWait q)).
>
> This design for QSem.hs means that waitQSem does not need a 'cleanup'
> exception handler if (takeMVar (headWait q)) is interrupted. I also do not
> bother with the 'mustWait' boolean that waitQSemN used, and just use "join"
> (id =<<).
>
> More clearly: If takeMVar is interrupted and signalQSem put a () into
> headWait anyway then then this () will sit there until a future waitQSem
> picks it up. This is harmless, so no 'cleanup' handler is needed.
>
> Let me be clear in my claim: The behavior of the old and new QSem modules
> should be identical except when waiting threads are killed in which case QSem
> works better by not losing quantity.
>
More information about the Cvs-ghc
mailing list