Patch 4 of 6 for ticket #3160
Simon Marlow
marlowsd at gmail.com
Wed Apr 20 11:23:16 CEST 2011
On 19/04/2011 15:26, Chris Kuklewicz wrote:
> This is a patch against base. This replaces the implementation of
> Control.Concurrent.QSem
>
> From 21a539a7adceee253cd8602702dcbd3863f5bd55 Mon Sep 17 00:00:00 2001
> Subject: [PATCH 1/3] Replace QSem.hs in base
Thanks for the patches. I'm quite surprised, and a little worried, at
how complex the new versions are, and it's hard for me to verify their
correctness. I appreciate the effort you've gone to with commenting the
changes, but I didn't get a sense for the rationale, e.g.
+{- Note [headWait]
+~~~~~~~~~~~~~~~~~~
+The head of the waiter queue blocks on headWait. The unit () is not
+just used to block and wake, but represents a tangible unit of value of
+the semaphore. Thus 'signalQSem' can succeed in adding a unit of
+quantity by putting a () into headWait.
+
+The real available total is the Int in qSem plus 1 if the headWait MVar
+is full. There are two legal states of the QSem when it has positive
+quantity (and one legal state if negative or zero). All the quantity
+may be in the Int in qSem and headWait empty, or one less than that in
+qSem and headWait full.
+-}
this doesn't tell me *why* you need to have something called headWait
with a "tangible unit of value" (and what does that mean, incidentally?).
Experience shows that it's very easy to get things wrong in these
implementations, so we should be very careful here.
A good way to start would be to explain why we can't just use
modifyMVar_ (et.al.) in the usual way to make QSem safe, for example.
Cheers,
Simon
> ---
> Control/Concurrent/QSem.hs | 145
> +++++++++++++++++++++++++++++--------------
> 1 files changed, 98 insertions(+), 47 deletions(-)
>
> diff --git a/Control/Concurrent/QSem.hs b/Control/Concurrent/QSem.hs
> index 22f6c0c..1f49452 100644
> --- a/Control/Concurrent/QSem.hs
> +++ b/Control/Concurrent/QSem.hs
> @@ -1,5 +1,3 @@
> -{-# LANGUAGE CPP #-}
> -
> -----------------------------------------------------------------------------
> -- |
> -- Module : Control.Concurrent.QSem
> @@ -10,7 +8,12 @@
> -- Stability : experimental
> -- Portability : non-portable (concurrency)
> --
> --- Simple quantity semaphores.
> +-- Simple quantity semaphores. "Control.Concurrent.QSemN" offers a
> +-- quantity semaphore. The SafeSemaphore package offers more
> +-- complicated semaphore operations over any Integral value.
> +--
> +-- Rewritten to make it safe when 'waitQSem' gets interrupted while
> +-- blocking (fixes ticket #3160).
> --
> -----------------------------------------------------------------------------
>
> @@ -24,62 +27,110 @@ module Control.Concurrent.QSem
>
> import Prelude
> import Control.Concurrent.MVar
> -import Control.Exception ( mask_ )
> +import Control.Exception
> import Data.Typeable
>
> #include "Typeable.h"
>
> --- General semaphores are also implemented readily in terms of shared
> --- @MVar at s, only have to catch the case when the semaphore is tried
> --- waited on when it is empty (==0). Implement this in the same way as
> --- shared variables are implemented - maintaining a list of @MVar at s
> --- representing threads currently waiting. The counter is a shared
> --- variable, ensuring the mutual exclusion on its access.
> -
> -- |A 'QSem' is a simple quantity semaphore, in which the available
> --- \"quantity\" is always dealt with in units of one.
> -newtype QSem = QSem (MVar (Int, [MVar ()])) deriving Eq
> +-- \"quantity\" is always dealt with in units of one. This can start
> +-- with positive, zero, or negative value.
> +data QSem = QSem { qSem :: !(MVar Int) -- Used to lock access to
> state of semaphore quantity.
> + , queueWait :: !(MVar ()) -- Used as FIFO queue for
> waiter, held by head of queue.
> + , headWait :: !(MVar ()) -- Note [headWait]
> + }
> + deriving (Eq)
> +
> +{- Note [headWait]
> +~~~~~~~~~~~~~~~~~~
> +The head of the waiter queue blocks on headWait. The unit () is not
> +just used to block and wake, but represents a tangible unit of value of
> +the semaphore. Thus 'signalQSem' can succeed in adding a unit of
> +quantity by putting a () into headWait.
> +
> +The real available total is the Int in qSem plus 1 if the headWait MVar
> +is full. There are two legal states of the QSem when it has positive
> +quantity (and one legal state if negative or zero). All the quantity
> +may be in the Int in qSem and headWait empty, or one less than that in
> +qSem and headWait full.
> +-}
>
> INSTANCE_TYPEABLE0(QSem,qSemTc,"QSem")
>
> --- |Build a new 'QSem' with a supplied initial quantity.
> --- The initial quantity must be at least 0.
> +-- |Build a new 'QSem' with a supplied initial quantity. The initial
> +-- quantity is allowed be positive or zero or negative, and is stricly
> +-- evaluated by 'newQSem'.
> +--
> +-- Note that once 'signalQSem' causes the value to cease to be negative
> +-- then it can never become negative again later. Using a negative
> +-- initial value can be practical, e.g. making a one-shot barrier.
> newQSem :: Int -> IO QSem
> -newQSem initial =
> - if initial< 0
> - then fail "newQSem: Initial quantity must be non-negative"
> - else do sem<- newMVar (initial, [])
> - return (QSem sem)
> +newQSem initial = do
> + newQS<- newMVar $! initial
> + newHeadWait<- newEmptyMVar
> + newQueueWait<- newMVar ()
> + return (QSem { qSem = newQS
> + , queueWait = newQueueWait
> + , headWait = newHeadWait })
>
> --- |Wait for a unit to become available
> +-- |Wait for a unit to become available. This may block and be
> +-- interrupted. Concurrent waiters will be served in FIFO order with
> +-- wake-one behavior if blocked.
> +--
> +-- To maintain the semaphore quantity in the presence of exceptions, it
> +-- is strongly recommended to combine 'waitQSem' with 'signalQSem' using
> +-- 'Control.Exception.bracket_' like this:
> +--
> +--> Control.Exception.bracket_ (waitQSem qSem) (signalQSem qSem) todo
> +--
> +-- If 'waitQSem' is interrupted then no quantity is lost. If 'waitQSem'
> +-- returns without interruption then it left the 'QSem' with a remaining
> +-- quantity that was greater than or equal to zero.
> waitQSem :: QSem -> IO ()
> -waitQSem (QSem sem) = mask_ $ do
> - (avail,blocked)<- takeMVar sem -- gain ex. access
> - if avail> 0 then
> - let avail' = avail-1
> - in avail' `seq` putMVar sem (avail',[])
> - else do
> - b<- newEmptyMVar
> - {-
> - Stuff the reader at the back of the queue,
> - so as to preserve waiting order. A signalling
> - process then only have to pick the MVar at the
> - front of the blocked list.
> +waitQSem q = mask_ . withMVar (queueWait q) $ \ () -> -- Note [mask_]
> + id =<< (modifyMVar (qSem q) $ \ qs -> do -- Note [id/join]
> + -- Nothing below can block
> + mayGrab<- tryTakeMVar (headWait q) -- guarded by holding queueWait
> & qSem
> + case mayGrab of
> + Just () -> return (qs,return ())
> + Nothing -> if 1<= qs
> + then let qs' = pred qs
> + in seq qs' $
> + return (qs', return ())
> + else return ( qs
> + , takeMVar (headWait q) ) -- guarded by
> holding queueWait but not qSem
> + )
>
> - The version of waitQSem given in the paper could
> - lead to starvation.
> - -}
> - putMVar sem (0, blocked++[b])
> - takeMVar b
> +{- Note [mask_]
> +~~~~~~~~~~~~~~~
> +mask_ is needed above because after tryTakeMVar or modifyMVar below we
> +must finish 'waitQSem' without being interrupted so that a 'bracket' can
> +ensure a matching 'signalQSem' will be called.
> +-}
> +{- Note [id/join]
> +~~~~~~~~~~~~~~~~~
> +id/join executing 'takeMVar' is expected to block but might not block: a
> +'signalQSem' could have already arrived, or throwTo/killThread might be
> +already pending.
> +-}
>
> -- |Signal that a unit of the 'QSem' is available
> +--
> +-- 'signalQSem' may block but it cannot be interrupted; this allows it
> +-- to dependably restore value to the 'QSem'. Concurrent 'signalQSem'
> +-- calls and the head 'waitQSem' waiter may momentarily block in a FIFO
> +-- queue.
> +--
> +-- Overflow warning: 'signalQSem' does NOT check if it is adding one to
> +-- a QSem holding 'maxBound :: Int'. If overflow occurs then the
> +-- semaphore state and future operations are undefined.
> signalQSem :: QSem -> IO ()
> -signalQSem (QSem sem) = mask_ $ do
> - (avail,blocked)<- takeMVar sem
> - case blocked of
> - [] -> let avail' = avail+1
> - in avail' `seq` putMVar sem (avail',blocked)
> -
> - (b:blocked') -> do
> - putMVar sem (0,blocked')
> - putMVar b ()
> +signalQSem q = uninterruptibleMask_ . modifyMVar_ (qSem q) $ \ qs -> do
> + -- Nothing below can block
> + if qs< 0
> + then return $! succ qs
> + else do
> + didPlace<- tryPutMVar (headWait q) () -- guarded by holding qSem
> + if didPlace
> + then return qs
> + else return $! succ qs
More information about the Cvs-ghc
mailing list