Patch 4 of 6 for ticket #3160

Simon Marlow marlowsd at gmail.com
Wed Apr 20 11:23:16 CEST 2011


On 19/04/2011 15:26, Chris Kuklewicz wrote:
> This is a patch against base.  This replaces the implementation of
> Control.Concurrent.QSem
>
>  From 21a539a7adceee253cd8602702dcbd3863f5bd55 Mon Sep 17 00:00:00 2001
> Subject: [PATCH 1/3] Replace QSem.hs in base

Thanks for the patches.  I'm quite surprised, and a little worried, at 
how complex the new versions are, and it's hard for me to verify their 
correctness.  I appreciate the effort you've gone to with commenting the 
changes, but I didn't get a sense for the rationale, e.g.

+{- Note [headWait]
+~~~~~~~~~~~~~~~~~~
+The head of the waiter queue blocks on headWait.  The unit () is not
+just used to block and wake, but represents a tangible unit of value of
+the semaphore.  Thus 'signalQSem' can succeed in adding a unit of
+quantity by putting a () into headWait.
+
+The real available total is the Int in qSem plus 1 if the headWait MVar
+is full.  There are two legal states of the QSem when it has positive
+quantity (and one legal state if negative or zero).  All the quantity
+may be in the Int in qSem and headWait empty, or one less than that in
+qSem and headWait full.
+-}

this doesn't tell me *why* you need to have something called headWait 
with a "tangible unit of value" (and what does that mean, incidentally?).

Experience shows that it's very easy to get things wrong in these 
implementations, so we should be very careful here.

A good way to start would be to explain why we can't just use 
modifyMVar_ (et.al.) in the usual way to make QSem safe, for example.

Cheers,
	Simon



> ---
>   Control/Concurrent/QSem.hs |  145
> +++++++++++++++++++++++++++++--------------
>   1 files changed, 98 insertions(+), 47 deletions(-)
>
> diff --git a/Control/Concurrent/QSem.hs b/Control/Concurrent/QSem.hs
> index 22f6c0c..1f49452 100644
> --- a/Control/Concurrent/QSem.hs
> +++ b/Control/Concurrent/QSem.hs
> @@ -1,5 +1,3 @@
> -{-# LANGUAGE CPP #-}
> -
>   -----------------------------------------------------------------------------
>   -- |
>   -- Module      :  Control.Concurrent.QSem
> @@ -10,7 +8,12 @@
>   -- Stability   :  experimental
>   -- Portability :  non-portable (concurrency)
>   --
> --- Simple quantity semaphores.
> +-- Simple quantity semaphores.  "Control.Concurrent.QSemN" offers a
> +-- quantity semaphore.  The SafeSemaphore package offers more
> +-- complicated semaphore operations over any Integral value.
> +--
> +-- Rewritten to make it safe when 'waitQSem' gets interrupted while
> +-- blocking (fixes ticket #3160).
>   --
>   -----------------------------------------------------------------------------
>
> @@ -24,62 +27,110 @@ module Control.Concurrent.QSem
>
>   import Prelude
>   import Control.Concurrent.MVar
> -import Control.Exception ( mask_ )
> +import Control.Exception
>   import Data.Typeable
>
>   #include "Typeable.h"
>
> --- General semaphores are also implemented readily in terms of shared
> --- @MVar at s, only have to catch the case when the semaphore is tried
> --- waited on when it is empty (==0). Implement this in the same way as
> --- shared variables are implemented - maintaining a list of @MVar at s
> --- representing threads currently waiting. The counter is a shared
> --- variable, ensuring the mutual exclusion on its access.
> -
>   -- |A 'QSem' is a simple quantity semaphore, in which the available
> --- \"quantity\" is always dealt with in units of one.
> -newtype QSem = QSem (MVar (Int, [MVar ()])) deriving Eq
> +-- \"quantity\" is always dealt with in units of one.  This can start
> +-- with positive, zero, or negative value.
> +data QSem = QSem { qSem :: !(MVar Int)     -- Used to lock access to
> state of semaphore quantity.
> +                 , queueWait :: !(MVar ()) -- Used as FIFO queue for
> waiter, held by head of queue.
> +                 , headWait :: !(MVar ())  -- Note [headWait]
> +                 }
> +  deriving (Eq)
> +
> +{- Note [headWait]
> +~~~~~~~~~~~~~~~~~~
> +The head of the waiter queue blocks on headWait.  The unit () is not
> +just used to block and wake, but represents a tangible unit of value of
> +the semaphore.  Thus 'signalQSem' can succeed in adding a unit of
> +quantity by putting a () into headWait.
> +
> +The real available total is the Int in qSem plus 1 if the headWait MVar
> +is full.  There are two legal states of the QSem when it has positive
> +quantity (and one legal state if negative or zero).  All the quantity
> +may be in the Int in qSem and headWait empty, or one less than that in
> +qSem and headWait full.
> +-}
>
>   INSTANCE_TYPEABLE0(QSem,qSemTc,"QSem")
>
> --- |Build a new 'QSem' with a supplied initial quantity.
> ---  The initial quantity must be at least 0.
> +-- |Build a new 'QSem' with a supplied initial quantity.  The initial
> +-- quantity is allowed be positive or zero or negative, and is stricly
> +-- evaluated by 'newQSem'.
> +--
> +-- Note that once 'signalQSem' causes the value to cease to be negative
> +-- then it can never become negative again later.  Using a negative
> +-- initial value can be practical, e.g. making a one-shot barrier.
>   newQSem :: Int ->  IO QSem
> -newQSem initial =
> -    if initial<  0
> -    then fail "newQSem: Initial quantity must be non-negative"
> -    else do sem<- newMVar (initial, [])
> -            return (QSem sem)
> +newQSem initial = do
> +  newQS<- newMVar $! initial
> +  newHeadWait<- newEmptyMVar
> +  newQueueWait<- newMVar ()
> +  return (QSem { qSem = newQS
> +               , queueWait = newQueueWait
> +               , headWait = newHeadWait })
>
> --- |Wait for a unit to become available
> +-- |Wait for a unit to become available.  This may block and be
> +-- interrupted.  Concurrent waiters will be served in FIFO order with
> +-- wake-one behavior if blocked.
> +--
> +-- To maintain the semaphore quantity in the presence of exceptions, it
> +-- is strongly recommended to combine 'waitQSem' with 'signalQSem' using
> +-- 'Control.Exception.bracket_' like this:
> +--
> +-->  Control.Exception.bracket_ (waitQSem qSem)  (signalQSem qSem) todo
> +--
> +-- If 'waitQSem' is interrupted then no quantity is lost.  If 'waitQSem'
> +-- returns without interruption then it left the 'QSem' with a remaining
> +-- quantity that was greater than or equal to zero.
>   waitQSem :: QSem ->  IO ()
> -waitQSem (QSem sem) = mask_ $ do
> -   (avail,blocked)<- takeMVar sem  -- gain ex. access
> -   if avail>  0 then
> -     let avail' = avail-1
> -     in avail' `seq` putMVar sem (avail',[])
> -    else do
> -     b<- newEmptyMVar
> -      {-
> -        Stuff the reader at the back of the queue,
> -        so as to preserve waiting order. A signalling
> -        process then only have to pick the MVar at the
> -        front of the blocked list.
> +waitQSem q = mask_ . withMVar (queueWait q) $ \ () ->  -- Note [mask_]
> +  id =<<  (modifyMVar (qSem q) $ \ qs ->  do -- Note [id/join]
> +    -- Nothing below can block
> +    mayGrab<- tryTakeMVar (headWait q) -- guarded by holding queueWait
> &  qSem
> +    case mayGrab of
> +      Just () ->  return (qs,return ())
> +      Nothing ->  if 1<= qs
> +                   then let qs' = pred qs
> +                        in seq qs' $
> +                           return (qs', return ())
> +                   else return ( qs
> +                               , takeMVar (headWait q) ) -- guarded by
> holding queueWait but not qSem
> +         )
>
> -        The version of waitQSem given in the paper could
> -        lead to starvation.
> -      -}
> -     putMVar sem (0, blocked++[b])
> -     takeMVar b
> +{- Note [mask_]
> +~~~~~~~~~~~~~~~
> +mask_ is needed above because after tryTakeMVar or modifyMVar below we
> +must finish 'waitQSem' without being interrupted so that a 'bracket' can
> +ensure a matching 'signalQSem' will be called.
> +-}
> +{- Note [id/join]
> +~~~~~~~~~~~~~~~~~
> +id/join executing 'takeMVar' is expected to block but might not block: a
> +'signalQSem' could have already arrived, or throwTo/killThread might be
> +already pending.
> +-}
>
>   -- |Signal that a unit of the 'QSem' is available
> +--
> +-- 'signalQSem' may block but it cannot be interrupted; this allows it
> +-- to dependably restore value to the 'QSem'.  Concurrent 'signalQSem'
> +-- calls and the head 'waitQSem' waiter may momentarily block in a FIFO
> +-- queue.
> +--
> +-- Overflow warning: 'signalQSem' does NOT check if it is adding one to
> +-- a QSem holding 'maxBound :: Int'.  If overflow occurs then the
> +-- semaphore state and future operations are undefined.
>   signalQSem :: QSem ->  IO ()
> -signalQSem (QSem sem) = mask_ $ do
> -   (avail,blocked)<- takeMVar sem
> -   case blocked of
> -     [] ->  let avail' = avail+1
> -           in avail' `seq` putMVar sem (avail',blocked)
> -
> -     (b:blocked') ->  do
> -           putMVar sem (0,blocked')
> -           putMVar b ()
> +signalQSem q = uninterruptibleMask_ . modifyMVar_ (qSem q) $ \ qs ->  do
> +  -- Nothing below can block
> +  if qs<  0
> +    then return $! succ qs
> +    else do
> +      didPlace<- tryPutMVar (headWait q) () -- guarded by holding qSem
> +      if didPlace
> +        then return qs
> +        else return $! succ qs




More information about the Cvs-ghc mailing list