[PATCH 3/3] Adding new QSemN.hs
Thu Feb 24 17:58:36 CET 2011
---
Control/Concurrent/QSemN.hs | 141
+++++++++++++++++++++++++++++++-----------
1 files changed, 104 insertions(+), 37 deletions(-)
diff --git a/Control/Concurrent/QSemN.hs b/Control/Concurrent/QSemN.hs
index cfcff7f..5ea3473 100644
--- a/Control/Concurrent/QSemN.hs
+++ b/Control/Concurrent/QSemN.hs
@@ -11,8 +11,16 @@
-- Portability : non-portable (concurrency)
--
-- Quantity semaphores in which each thread may wait for an arbitrary
--- \"amount\".
+-- \"amount\". The SafeSemaphore package offers more
+-- complicated semaphore operations over any Integral value.
--
+-- Rewritten to make it safe when 'waitQSemN' gets interrupted while
+-- blocking (fixes ticket #3160).
+--
+-- Overflow warning: These operations do not check for overflow errors. If
+-- the Int type is too small to accept the new total then the behavior of
+-- these operations is undefined. Using (MSemN Integer) prevents the
+-- possibility of an overflow error.
-----------------------------------------------------------------------------
module Control.Concurrent.QSemN
@@ -24,54 +32,113 @@ module Control.Concurrent.QSemN
) where
import Prelude
-
import Control.Concurrent.MVar
-import Control.Exception ( mask_ )
+import Control.Exception
+import Control.Monad
import Data.Typeable
#include "Typeable.h"
+data QSemNQuant = MS { avail :: !Int -- This is the
quantity available to be taken from the semaphore.
+ , headWants :: !(Maybe Int) -- If there is waiter
then this is Just the amount being waited for.
+ }
+ deriving (Eq)
+
-- |A 'QSemN' is a quantity semaphore, in which the available
-- \"quantity\" may be signalled or waited for in arbitrary amounts.
-newtype QSemN = QSemN (MVar (Int,[(Int,MVar ())])) deriving Eq
+data QSemN = MSemN { qSemN :: !(MVar QSemNQuant) -- Used to lock access
to state of semaphore quantity.
+ , queueWait :: !(MVar ()) -- Used as FIFO queue
for waiter, held by head of queue.
+ , headWait :: !(MVar ()) -- Note [headWait]
+ }
+ deriving (Eq)
+INSTANCE_TYPEABLE0(QSemNQuant,qSemNQuantTc,"QSemNQuant")
INSTANCE_TYPEABLE0(QSemN,qSemNTc,"QSemN")
--- |Build a new 'QSemN' with a supplied initial quantity.
--- The initial quantity must be at least 0.
+{- Note [headWait]
+~~~~~~~~~~~~~~~~~~
+The head of the waiter queue blocks on headWait. The unit () is not just
+used to block and wake, but represents a tangible amount of value of the
+semaphore. If headWait is full then it is because a waiting thread had to
+block, and the () in headWait represents the amount passed.
+
+This amount placed in headWait is considered owned by the waiting
thread and
+is not considered available. The meaning of headWait in this module is
+slightly different than the one of headWait in the QSem.hs module.
+-}
+
+-- |Build a new 'QSemN' with a supplied initial quantity. 'newQSemN'
allows
+-- positive, zero, and negative initial values. The initial value is
forced
+-- here to better localize errors.
newQSemN :: Int -> IO QSemN
-newQSemN initial =
- if initial < 0
- then fail "newQSemN: Initial quantity must be non-negative"
- else do sem <- newMVar (initial, [])
- return (QSemN sem)
+newQSemN initial = do
+ newMS <- newMVar $! (MS { avail = initial
+ , headWants = Nothing })
+ newQueueWait <- newMVar ()
+ newHeadWait <- newEmptyMVar
+ return (MSemN { qSemN = newMS
+ , queueWait = newQueueWait
+ , headWait = newHeadWait })
--- |Wait for the specified quantity to become available
+-- |Wait for the specified quantity to become available. The requested
+-- quantity maybe positive or zero or negative. Concurrent waiters will be
+-- served in FIFO order with wake-one behavior if blocked.
+--
+-- To maintain the semaphore quantity in the presence of exceptions, it is
+-- strongly recommended to combine 'waitQSemN' with 'signalQSemN' using
+-- 'Control.Exception.bracket_' like this:
+--
+-- > Control.Exception.bracket_ (waitQSemN qSemN x) (signalQSem qSemN
x) todo
+--
+-- If 'waitQSemN' returns without interruption then it left the 'QSemN'
with
+-- a remaining quantity that was greater than or equal to zero. If this is
+-- interrupted then no quantity is lost.
waitQSemN :: QSemN -> Int -> IO ()
-waitQSemN (QSemN sem) sz = mask_ $ do
- (avail,blocked) <- takeMVar sem -- gain ex. access
- let remaining = avail - sz
- if remaining >= 0 then
- -- discharging 'sz' still leaves the semaphore
- -- in an 'unblocked' state.
- putMVar sem (remaining,blocked)
- else do
- b <- newEmptyMVar
- putMVar sem (avail, blocked++[(sz,b)])
- takeMVar b
+waitQSemN _ 0 = return ()
+waitQSemN q wanted = mask_ . withMVar (queueWait q) $ \ () -> do --
Note [mask_]
+ mustWait <- modifyMVar (qSemN q) $ \ ms -> do
+ -- assert that headDown is Nothing (from prior 'new' or 'signal' or
'cleanup')
+ if wanted <= avail ms
+ then do
+ let avail'down = avail ms - wanted -- avail'down is never
negative, barring overflow
+ ms' <- evaluate ms { avail = avail'down }
+ return (ms', False)
+ else do
+ ms' <- evaluate ms { headWants = Just wanted }
+ return (ms', True)
+ when mustWait $ do
+ let cleanup = uninterruptibleMask_ $ modifyMVar_ (qSemN q) $ \ms -> do
+ mStale <- tryTakeMVar (headWait q)
+ let avail' = avail ms + maybe 0 (const wanted) mStale
+ evaluate ms { avail = avail', headWants = Nothing }
+ takeMVar (headWait q) `onException` cleanup
+
+{- Note [mask_]
+~~~~~~~~~~~~~~~
+mask_ is needed above because either (Just wantedVal) may be set and this
+means we need to get the `onException` setup without being interrupted, or
+avail'down was set and we must finish waitQSemN without being
interrupted so
+that a bracket can ensure a matching signalQSemN can protect the acquired
+quantity.
+-}
--- |Signal that a given quantity is now available from the 'QSemN'.
+-- |Signal that a given additional quantity is now available from the
+-- 'QSemN'. The additional quantity may be positive or zero or negative.
+--
+-- If there is a blocked 'waitQSemN' and new total quantity is greater than
+-- or equal to that wanted by the leading waiter then the wait queue
+-- unblocks.
+--
+-- 'signalQSemN' may block but it cannot be interrupted; this allows it to
+-- dependably restore value to the 'QSemN'. Concurrent 'signalQSemN' calls
+-- and the head 'waitQSemN' waiter may momentarily block in a FIFO queue.
signalQSemN :: QSemN -> Int -> IO ()
-signalQSemN (QSemN sem) n = mask_ $ do
- (avail,blocked) <- takeMVar sem
- (avail',blocked') <- free (avail+n) blocked
- avail' `seq` putMVar sem (avail',blocked')
- where
- free avail [] = return (avail,[])
- free avail ((req,b):blocked)
- | avail >= req = do
- putMVar b ()
- free (avail-req) blocked
- | otherwise = do
- (avail',blocked') <- free avail blocked
- return (avail',(req,b):blocked')
+signalQSemN _ 0 = return ()
+signalQSemN q size = uninterruptibleMask_ . modifyMVar_ (qSemN q) $ \
ms -> do
+ avail' <- evaluate $ avail ms + size
+ case headWants ms of
+ Just wanted | wanted <= avail' -> do
+ putMVar (headWait q) () -- cannot block
+ let avail'down = avail' - wanted -- avail'down is never negative,
barring overflow
+ evaluate ms { avail = avail'down, headWants = Nothing }
+ _ -> evaluate ms { avail = avail' }
--
1.7.2
More information about the Cvs-ghc
mailing list