From 8f70a935b1f6c364d44d0149177229b7d14d20aa Mon Sep 17 00:00:00 2001 From: Chris Kuklewicz Date: Mon, 18 Apr 2011 23:51:29 +0100 Subject: [PATCH 3/3] Adding new QSemN.hs --- Control/Concurrent/QSemN.hs | 141 +++++++++++++++++++++++++++++++----------- 1 files changed, 104 insertions(+), 37 deletions(-) diff --git a/Control/Concurrent/QSemN.hs b/Control/Concurrent/QSemN.hs index cfcff7f..5ea3473 100644 --- a/Control/Concurrent/QSemN.hs +++ b/Control/Concurrent/QSemN.hs @@ -11,8 +11,16 @@ -- Portability : non-portable (concurrency) -- -- Quantity semaphores in which each thread may wait for an arbitrary --- \"amount\". +-- \"amount\". The SafeSemaphore package offers more +-- complicated semaphore operations over any Integral value. -- +-- Rewritten to make it safe when 'waitQSemN' gets interrupted while +-- blocking (fixes ticket #3160). +-- +-- Overflow warning: These operations do not check for overflow errors. If +-- the Int type is too small to accept the new total then the behavior of +-- these operations is undefined. Using (MSemN Integer) prevents the +-- possibility of an overflow error. ----------------------------------------------------------------------------- module Control.Concurrent.QSemN @@ -24,54 +32,113 @@ module Control.Concurrent.QSemN ) where import Prelude - import Control.Concurrent.MVar -import Control.Exception ( mask_ ) +import Control.Exception +import Control.Monad import Data.Typeable #include "Typeable.h" +data QSemNQuant = MS { avail :: !Int -- This is the quantity available to be taken from the semaphore. + , headWants :: !(Maybe Int) -- If there is waiter then this is Just the amount being waited for. + } + deriving (Eq) + -- |A 'QSemN' is a quantity semaphore, in which the available -- \"quantity\" may be signalled or waited for in arbitrary amounts. -newtype QSemN = QSemN (MVar (Int,[(Int,MVar ())])) deriving Eq +data QSemN = MSemN { qSemN :: !(MVar QSemNQuant) -- Used to lock access to state of semaphore quantity. + , queueWait :: !(MVar ()) -- Used as FIFO queue for waiter, held by head of queue. + , headWait :: !(MVar ()) -- Note [headWait] + } + deriving (Eq) +INSTANCE_TYPEABLE0(QSemNQuant,qSemNQuantTc,"QSemNQuant") INSTANCE_TYPEABLE0(QSemN,qSemNTc,"QSemN") --- |Build a new 'QSemN' with a supplied initial quantity. --- The initial quantity must be at least 0. +{- Note [headWait] +~~~~~~~~~~~~~~~~~~ +The head of the waiter queue blocks on headWait. The unit () is not just +used to block and wake, but represents a tangible amount of value of the +semaphore. If headWait is full then it is because a waiting thread had to +block, and the () in headWait represents the amount passed. + +This amount placed in headWait is considered owned by the waiting thread and +is not considered available. The meaning of headWait in this module is +slightly different than the one of headWait in the QSem.hs module. +-} + +-- |Build a new 'QSemN' with a supplied initial quantity. 'newQSemN' allows +-- positive, zero, and negative initial values. The initial value is forced +-- here to better localize errors. newQSemN :: Int -> IO QSemN -newQSemN initial = - if initial < 0 - then fail "newQSemN: Initial quantity must be non-negative" - else do sem <- newMVar (initial, []) - return (QSemN sem) +newQSemN initial = do + newMS <- newMVar $! (MS { avail = initial + , headWants = Nothing }) + newQueueWait <- newMVar () + newHeadWait <- newEmptyMVar + return (MSemN { qSemN = newMS + , queueWait = newQueueWait + , headWait = newHeadWait }) --- |Wait for the specified quantity to become available +-- |Wait for the specified quantity to become available. The requested +-- quantity maybe positive or zero or negative. Concurrent waiters will be +-- served in FIFO order with wake-one behavior if blocked. +-- +-- To maintain the semaphore quantity in the presence of exceptions, it is +-- strongly recommended to combine 'waitQSemN' with 'signalQSemN' using +-- 'Control.Exception.bracket_' like this: +-- +-- > Control.Exception.bracket_ (waitQSemN qSemN x) (signalQSem qSemN x) todo +-- +-- If 'waitQSemN' returns without interruption then it left the 'QSemN' with +-- a remaining quantity that was greater than or equal to zero. If this is +-- interrupted then no quantity is lost. waitQSemN :: QSemN -> Int -> IO () -waitQSemN (QSemN sem) sz = mask_ $ do - (avail,blocked) <- takeMVar sem -- gain ex. access - let remaining = avail - sz - if remaining >= 0 then - -- discharging 'sz' still leaves the semaphore - -- in an 'unblocked' state. - putMVar sem (remaining,blocked) - else do - b <- newEmptyMVar - putMVar sem (avail, blocked++[(sz,b)]) - takeMVar b +waitQSemN _ 0 = return () +waitQSemN q wanted = mask_ . withMVar (queueWait q) $ \ () -> do -- Note [mask_] + mustWait <- modifyMVar (qSemN q) $ \ ms -> do + -- assert that headDown is Nothing (from prior 'new' or 'signal' or 'cleanup') + if wanted <= avail ms + then do + let avail'down = avail ms - wanted -- avail'down is never negative, barring overflow + ms' <- evaluate ms { avail = avail'down } + return (ms', False) + else do + ms' <- evaluate ms { headWants = Just wanted } + return (ms', True) + when mustWait $ do + let cleanup = uninterruptibleMask_ $ modifyMVar_ (qSemN q) $ \ms -> do + mStale <- tryTakeMVar (headWait q) + let avail' = avail ms + maybe 0 (const wanted) mStale + evaluate ms { avail = avail', headWants = Nothing } + takeMVar (headWait q) `onException` cleanup + +{- Note [mask_] +~~~~~~~~~~~~~~~ +mask_ is needed above because either (Just wantedVal) may be set and this +means we need to get the `onException` setup without being interrupted, or +avail'down was set and we must finish waitQSemN without being interrupted so +that a bracket can ensure a matching signalQSemN can protect the acquired +quantity. +-} --- |Signal that a given quantity is now available from the 'QSemN'. +-- |Signal that a given additional quantity is now available from the +-- 'QSemN'. The additional quantity may be positive or zero or negative. +-- +-- If there is a blocked 'waitQSemN' and new total quantity is greater than +-- or equal to that wanted by the leading waiter then the wait queue +-- unblocks. +-- +-- 'signalQSemN' may block but it cannot be interrupted; this allows it to +-- dependably restore value to the 'QSemN'. Concurrent 'signalQSemN' calls +-- and the head 'waitQSemN' waiter may momentarily block in a FIFO queue. signalQSemN :: QSemN -> Int -> IO () -signalQSemN (QSemN sem) n = mask_ $ do - (avail,blocked) <- takeMVar sem - (avail',blocked') <- free (avail+n) blocked - avail' `seq` putMVar sem (avail',blocked') - where - free avail [] = return (avail,[]) - free avail ((req,b):blocked) - | avail >= req = do - putMVar b () - free (avail-req) blocked - | otherwise = do - (avail',blocked') <- free avail blocked - return (avail',(req,b):blocked') +signalQSemN _ 0 = return () +signalQSemN q size = uninterruptibleMask_ . modifyMVar_ (qSemN q) $ \ ms -> do + avail' <- evaluate $ avail ms + size + case headWants ms of + Just wanted | wanted <= avail' -> do + putMVar (headWait q) () -- cannot block + let avail'down = avail' - wanted -- avail'down is never negative, barring overflow + evaluate ms { avail = avail'down, headWants = Nothing } + _ -> evaluate ms { avail = avail' } -- 1.7.2