Difference between revisions of "SafeConcurrent"

From HaskellWiki
Jump to navigation Jump to search
(Add QSemN update)
(Add "fixed" SampleVar)
Line 277: Line 277:
 
(avail',blocked') <- free avail blocked
 
(avail',blocked') <- free avail blocked
 
return (avail',(req,block):blocked')
 
return (avail',(req,block):blocked')
  +
</haskell>
  +
  +
= SampleVar =
  +
  +
This keeps the documented behavior of SampleVar, but not all the detailed behavior.
  +
  +
<haskell>
  +
-- |Proposed replacement for SampleVar that keeps most of the behavior and is now exception safe.
  +
--
  +
-- By Chris Kuklewicz
  +
module Control.Concurrent.SampleVar
  +
(
  +
-- * Sample Variables
  +
SampleVar, -- :: type _ =
  +
  +
newEmptySampleVar, -- :: IO (SampleVar a)
  +
newSampleVar, -- :: a -> IO (SampleVar a)
  +
emptySampleVar, -- :: SampleVar a -> IO ()
  +
readSampleVar, -- :: SampleVar a -> IO a
  +
tryReadSampleVar, -- :: SampleVar a -> IO a
  +
writeSampleVar, -- :: SampleVar a -> a -> IO ()
  +
isEmptySampleVar, -- :: SampleVar a -> IO Bool
  +
  +
) where
  +
  +
import Prelude
  +
  +
import Control.Concurrent.MVar
  +
import Control.Exception(block)
  +
  +
-- |
  +
-- Sample variables are slightly different from a normal 'MVar':
  +
--
  +
-- * Reading an empty 'SampleVar' causes the reader to block.
  +
-- (same as 'takeMVar' on empty 'MVar')
  +
--
  +
-- * Reading a filled 'SampleVar' empties it and returns value.
  +
-- (same as 'takeMVar')
  +
--
  +
-- * Try reading a filled 'SampleVar' returns a Maybe value.
  +
-- (same as 'tryTakeMVar')
  +
--
  +
-- * Writing to an empty 'SampleVar' fills it with a value, and
  +
-- potentially, wakes up a blocked reader (same as for 'putMVar' on
  +
-- empty 'MVar').
  +
--
  +
-- * Writing to a filled 'SampleVar' overwrites the current value.
  +
-- (different from 'putMVar' on full 'MVar'.)
  +
  +
data SampleVar a = SampleVar { readQueue :: MVar ()
  +
, lockedStore :: MVar (MVar a) }
  +
  +
-- |Build a new, empty, 'SampleVar'
  +
newEmptySampleVar :: IO (SampleVar a)
  +
newEmptySampleVar = do
  +
newReadQueue <- newMVar ()
  +
newLockedStore <- newMVar =<< newEmptyMVar
  +
return (SampleVar { readQueue = newReadQueue
  +
, lockedStore = newLockedStore })
  +
  +
-- |Build a 'SampleVar' with an initial value.
  +
newSampleVar :: a -> IO (SampleVar a)
  +
newSampleVar value = do
  +
newReadQueue <- newMVar ()
  +
newLockedStore <- newMVar =<< newMVar value
  +
return (SampleVar { readQueue = newReadQueue
  +
, lockedStore = newLockedStore })
  +
  +
-- |If the SampleVar is full, leave it empty. Otherwise, do nothing. This jumps the FIFO queue of
  +
-- readers. This may momentarily block.
  +
emptySampleVar :: SampleVar a -> IO ()
  +
emptySampleVar svar = block $ withMVar (lockedStore svar) $ \ store -> do
  +
_ <- tryTakeMVar store
  +
return ()
  +
  +
-- |Wait for a value to become available, then take it and return. This may block indefinately
  +
-- waiting for a value.
  +
readSampleVar :: SampleVar a -> IO a
  +
readSampleVar svar = block $
  +
withMVar (readQueue svar) $ \ _ -> do
  +
todo <- withMVar (lockedStore svar) $ \ store -> do
  +
maybeValue <- tryTakeMVar store
  +
case maybeValue of
  +
Nothing -> return (Left store)
  +
Just value -> return (Right value)
  +
-- postcondition for the withMVar (lockedStore svar) is that the store is empty.
  +
case todo of
  +
Left store -> takeMVar store
  +
Right value -> return value -- block indefinately
  +
  +
-- |See if a value is immediately available, then take it and return. This may momentarily block.
  +
-- This does not jump the FIFO reading queue.
  +
tryReadSampleVar :: SampleVar a -> IO (Maybe a)
  +
tryReadSampleVar svar = block $ do
  +
maybeH <- tryTakeMVar (readQueue svar)
  +
case maybeH of
  +
Nothing -> return Nothing
  +
Just h -> do
  +
maybeVal <- withMVar (lockedStore svar) tryTakeMVar
  +
putMVar (readQueue svar) h
  +
return maybeVal
  +
  +
-- |Write a value into the 'SampleVar', overwriting any previous value that was there. A currently
  +
-- blocked reader will find the new value, not the old value.
  +
writeSampleVar :: SampleVar a -> a -> IO ()
  +
writeSampleVar svar value = do
  +
withMVar (lockedStore svar) $ \ store -> do
  +
_ <- tryTakeMVar store
  +
putMVar store value
  +
-- postcondition for the withMVar (lockedStore svar) is that the store is full.
  +
  +
-- | Returns 'True' if the 'SampleVar' is currently empty.
  +
--
  +
-- Note that this function is only useful if you know that no other
  +
-- threads can be modifying the state of the 'SampleVar', because
  +
-- otherwise the state of the 'SampleVar' may have changed by the time
  +
-- you see the result of 'isEmptySampleVar'.
  +
--
  +
-- This may momentarily block
  +
isEmptySampleVar :: SampleVar a -> IO Bool
  +
isEmptySampleVar svar = withMVar (lockedStore svar) isEmptyMVar
 
</haskell>
 
</haskell>

Revision as of 14:28, 12 April 2009


Motivation

The base package (version 3.0.3.1) code for Control.Concurrent.QSem and QSemN and SamepleVar is not exception safe. This page is for holding proposed replacement code.

Specifically, both the wait and signal operations on a semaphore may block. These may then be interrupted by a killThread or other asynchronous exception. Exception safety means that this will never leave the semaphore in a broken state. Exception correctness means that the semaphore does not lose any of its quantity if the waiter is interrupted before the wait operation finished.

MSem is the proposed replacements for QSem.

A replacement for QSemN is also below. It is merely a slightly improved version of the QSemN code in base.

The SampleVar code is also not exception safe. The replacement has not yet been written.

MSem

This code should be exception safe and exception correct. (And was derived from MSenN below).

Note that it does not allocate any MVars to manage the waiting queue. Only newMSem allocates them. This should be more efficient than QSem.

{-# LANGUAGE DeriveDataTypeable #-}
-- |This modules is intended to replace "Control.Concurrent.QSem".  Unlike QSem, this MSem module
-- should be exception safe and correct.  This means that when signalMSem and waitQSem operations
-- receive an asynchronous exception such as killThread they will leave the MSem in a non-broken
-- state, and will not lose any quantity of the semaphore's value.
--
-- TODO : drop the MSem suffix from the operations.
--
-- Author : Chris Kuklewicz < haskell @at@ list .dot. mightyreason .dot. com >
-- Copyright : BSD3 2009
module MSem(MSem,newMSem,signalMSem,waitMSem,MSem'Exception) where

import Control.Concurrent.MVar
import Control.Exception(Exception,throwIO,block)
import Data.Maybe(fromMaybe)
import Data.Typeable(Typeable)

newtype MSem = MSem (MVar M)

data M = M { avail :: Int
           , headWants :: Bool
           , headWait :: MVar ()
           , tailWait :: MVar () }

newtype MSem'Exception = MSem'Exception String deriving (Show,Typeable)
instance Exception MSem'Exception

-- |'newSem' allows positive, zero, and negative initial values.
newMSem initial = do
  newHeadWait <- newEmptyMVar
  newTailWait <- newMVar ()
  let m = M { avail = initial
            , headWants = False
            , headWait = newHeadWait
            , tailWait = newTailWait }
  sem <- newMVar m
  return (MSem sem)

-- |Waiters block in FIFO order.  This returns when it is the front waiter and the available value
-- is positive.  If this throws an exception then no quantity of semaphore will be lost.
waitMSem :: MSem -> IO ()
waitMSem (MSem sem) = block $ do
  -- sem throw?
  advance <- withMVar sem $ \ m -> return (tailWait m)
  -- advance throw?
  withMVar advance $ \ _ -> do
    -- sem throw? withMVar cleans advance
    todo <- modifyMVar sem $ \ m -> do
              -- clean up if previous waiter died
              mStale <- tryTakeMVar (headWait m)
              let avail' = avail m + maybe 0 (const 1) mStale
              -- ensure the sem is in a sane state
              if avail' >= 1
                then do return (m { avail = avail' - 1, headWants = False }, Nothing)
                else do return (m { avail = avail', headWants = True }, Just (headWait m))
    case todo of
      Nothing -> return ()
      Just wait -> do
        -- takeMVar throw? the headWants is still set to True, withMVar cleans advance
        takeMVar wait

-- |Add one to the semaphore, if the new value is greater than 0 then the first waiter is woken.
-- This may momentarily block, and thus may throw an exception and leave then MSem unchanged.
signalMSem :: MSem -> IO ()
signalMSem msem@(MSem sem) = block $ modifyMVar_ sem $ \ m -> do
  case headWants m of
    False -> return (m { avail = avail m + 1 })
    True ->
      if avail m >= 0
        then do
          ok <- tryPutMVar (headWait m) ()
          if ok then return (m { headWants = False })
            else throwIO . MSem'Exception $ 
                   "MSem.signalMSem: impossible happened, the headWait MVar was full"
        else return (m { avail = avail m + 1 })

MSemN

The MSemN has different semantics than QSemN. The first waiter in line is the only one being considered for waking.

{-# LANGUAGE DeriveDataTypeable #-}
-- |This modules is intended to replace "Control.Concurrent.QSemN".  Unlike QSemN, this MSemN module
-- should be exception safe and correct.  This means that when signalMSemN and waitQSemN operations
-- receive an asynchronous exception such as killThread they will leave the MSemN in a non-broken
-- state, and will not lose any quantity of the semaphore's value.
--
-- TODO : drop the MSem suffix from the operations.
--
-- Author : Chris Kuklewicz < haskell @at@ list .dot. mightyreason .dot. com >
-- Copyright : BSD3 2009
module MSemN(MSemN,newMSemN,signalMSemN,waitMSemN,MSemN'Exception) where

import Control.Concurrent.MVar
import Control.Exception(Exception,throwIO,block)
import Data.Maybe(fromMaybe)
import Data.Typeable(Typeable)

newtype MSemN = MSemN (MVar M)

data M = M { avail :: Int
           , headWants :: Maybe Int
           , headWait :: MVar Int
           , tailWait :: MVar () }

newtype MSemN'Exception = MSemN'Exception String deriving (Show,Typeable)
instance Exception MSemN'Exception

-- |'newSemN' allows positive, zero, and negative initial values.
newMSemN initial = do
  newHeadWait <- newEmptyMVar
  newTailWait <- newMVar ()
  let m = M { avail = initial
            , headWants = Nothing
            , headWait = newHeadWait
            , tailWait = newTailWait }
  sem <- newMVar m
  return (MSemN sem)

-- |'waitMSemN' allow positive, zero, and negative wanted values.  Waiters block in FIFO order.
-- This returns when it is the front waiter and the available value is not less than the wanted
-- value.  If this throws an exception then no quantity of semaphore will be lost.
waitMSemN :: MSemN -> Int -> IO ()
waitMSemN (MSemN sem) wanted = block $ do
  -- sem throw?
  advance <- withMVar sem $ \ m -> return (tailWait m)
  -- advance throw?
  withMVar advance $ \ _ -> do
    -- sem throw? withMVar cleans advance
    todo <- modifyMVar sem $ \ m -> do
              -- clean up if previous waiter died
              mStale <- tryTakeMVar (headWait m)
              let avail' = avail m + fromMaybe 0 mStale
              -- ensure the sem is in a sane state
              if avail' >= wanted
                then do return (m { avail = avail' - wanted, headWants = Nothing }, Nothing)
                else do return (m { avail = avail', headWants = Just wanted }, Just (headWait m))
    case todo of
      Nothing -> return ()
      Just wait -> getWanted wait
 where
  getWanted wait = do
    -- takeMVar throw? clean up with next waiter
    given <- takeMVar wait
    if given == wanted
      then return ()
      else throwIO . MSemN'Exception $
             "MSemN.waitMSemN: impossible happened, (wanted,given) == "++ show (wanted,given)

-- |'signalMSemN' allows positive, zero, and negative size values.  If the new total is greater than
-- the value waited for then the first waiter is woken.  This may momentarily block, and thus may
-- throw an exception and leave then MSemN unchanged.
signalMSemN :: MSemN -> Int -> IO ()
signalMSemN _ 0 = return ()
signalMSemN msem@(MSemN sem) size = block $ modifyMVar_ sem $ \ m -> do
  case headWants m of
    Nothing -> return (m { avail = avail m + size })
    Just wanted -> do
      let avail' = avail m + size
      if avail' >= wanted
        then do
          ok <- tryPutMVar (headWait m) wanted
          if ok then return (m { avail = avail' - wanted, headWants = Nothing })
            else throwIO . MSemN'Exception $
                  "MSemN.signalMSemN: impossible happened, the headWait MVar was full"
        else return (m { avail = avail' })

{-

-- |'queryMSemN' returns two value, the first is the available value in the semaphore.  The second
-- value, if not Nothing, is Just the value wanted by the first blocked waiter.  If the second value
-- is Nothing that does not imply there are no blocked waiters.
-- 
-- Warning: the first value may be momentarily wrong (and the second Nothing) if the previous waiter
-- died between being signaled and receiving its wanted value.
queryMSemN :: MSemN -> IO (Int,Maybe Int)
queryMSemN (MSemN sem) = withMVar sem $ \ m -> return (avail m, headWants m)

-}

QSemN

This is a slightly improved version of QSemN that should be exception safe. It is also nearly, but not quite exception correct. There is still a race between the dying waitQSemN and the signalQSemN.

-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Concurrent.QSemN
-- Copyright   :  (c) The University of Glasgow 2001
-- License     :  BSD-style (see the file libraries/base/LICENSE)
-- 
-- Maintainer  :  libraries@haskell.org
-- Stability   :  experimental
-- Portability :  non-portable (concurrency)
--
-- Quantity semaphores in which each thread may wait for an arbitrary
-- \"amount\".  Modified by Chris Kuklewicz to make it exception safe.
--
-----------------------------------------------------------------------------

module Control.Concurrent.QSemN
        (  -- * General Quantity Semaphores
          QSemN,        -- abstract
          newQSemN,     -- :: Int   -> IO QSemN
          waitQSemN,    -- :: QSemN -> Int -> IO ()
          signalQSemN   -- :: QSemN -> Int -> IO ()
      ) where

import Prelude

import Control.Concurrent.MVar
import Control.Exception(block,onException)
import Data.Typeable

#include "Typeable.h"

-- |A 'QSemN' is a quantity semaphore, in which the available
-- \"quantity\" may be signalled or waited for in arbitrary amounts.
newtype QSemN = QSemN (MVar (Int,[(Int,MVar ())]))

INSTANCE_TYPEABLE0(QSemN,qSemNTc,"QSemN")

-- |Build a new 'QSemN' with a supplied initial quantity.
newQSemN :: Int -> IO QSemN 
newQSemN initial = do
   sem <- newMVar (initial, [])
   return (QSemN sem)

-- |Wait for the specified quantity to become available
waitQSemN :: QSemN -> Int -> IO ()
waitQSemN (QSemN sem) sz = block $ do
  todo <- modifyMVar sem $ \ (avail,blocked) -> do
    if (avail - sz) >= 0
      then return ((avail-sz,blocked),Nothing)
      else do
        block <- newEmptyMVar
        return ((avail, blocked++[(sz,block)]),Just block)
  case todo of
    Nothing -> return ()
    Just block -> onException (takeMVar block) (tryPutMVar block ())

-- |Signal that a given quantity is now available from the 'QSemN'.
signalQSemN :: QSemN -> Int  -> IO ()
signalQSemN (QSemN sem) n = block $ modifyMVar_ sem $
  \ (avail,blocked) -> free (avail+n) blocked
 where
   free avail []    = return (avail,[])
   free avail ((req,block):blocked)
     | avail >= req = do
        ok <- tryPutMVar block ()
        if ok then free (avail-req) blocked
          else free avail blocked
     | otherwise    = do
        (avail',blocked') <- free avail blocked
        return (avail',(req,block):blocked')

SampleVar

This keeps the documented behavior of SampleVar, but not all the detailed behavior.

-- |Proposed replacement for SampleVar that keeps most of the behavior and is now exception safe.
--
-- By Chris Kuklewicz
module Control.Concurrent.SampleVar
       (
         -- * Sample Variables
         SampleVar,         -- :: type _ =
 
         newEmptySampleVar, -- :: IO (SampleVar a)
         newSampleVar,      -- :: a -> IO (SampleVar a)
         emptySampleVar,    -- :: SampleVar a -> IO ()
         readSampleVar,     -- :: SampleVar a -> IO a
         tryReadSampleVar,  -- :: SampleVar a -> IO a
         writeSampleVar,    -- :: SampleVar a -> a -> IO ()
         isEmptySampleVar,  -- :: SampleVar a -> IO Bool

       ) where

import Prelude

import Control.Concurrent.MVar
import Control.Exception(block)

-- |
-- Sample variables are slightly different from a normal 'MVar':
-- 
--  * Reading an empty 'SampleVar' causes the reader to block.
--    (same as 'takeMVar' on empty 'MVar')
-- 
--  * Reading a filled 'SampleVar' empties it and returns value.
--    (same as 'takeMVar')
--
--  * Try reading a filled 'SampleVar' returns a Maybe value.
--    (same as 'tryTakeMVar')
-- 
--  * Writing to an empty 'SampleVar' fills it with a value, and
--    potentially, wakes up a blocked reader (same as for 'putMVar' on
--    empty 'MVar').
--
--  * Writing to a filled 'SampleVar' overwrites the current value.
--    (different from 'putMVar' on full 'MVar'.)

data SampleVar a = SampleVar { readQueue :: MVar ()
                             , lockedStore :: MVar (MVar a) }

-- |Build a new, empty, 'SampleVar'
newEmptySampleVar :: IO (SampleVar a)
newEmptySampleVar = do
  newReadQueue <- newMVar ()
  newLockedStore <- newMVar =<< newEmptyMVar
  return (SampleVar { readQueue = newReadQueue
                    , lockedStore = newLockedStore })

-- |Build a 'SampleVar' with an initial value.
newSampleVar :: a -> IO (SampleVar a)
newSampleVar value = do
  newReadQueue <- newMVar ()
  newLockedStore <- newMVar =<< newMVar value
  return (SampleVar { readQueue = newReadQueue
                    , lockedStore = newLockedStore })

-- |If the SampleVar is full, leave it empty.  Otherwise, do nothing.  This jumps the FIFO queue of
-- readers.  This may momentarily block.
emptySampleVar :: SampleVar a -> IO ()
emptySampleVar svar = block $ withMVar (lockedStore svar) $ \ store -> do
  _ <- tryTakeMVar store
  return ()

-- |Wait for a value to become available, then take it and return.  This may block indefinately
-- waiting for a value.
readSampleVar :: SampleVar a -> IO a
readSampleVar svar = block $
  withMVar (readQueue svar) $ \ _ -> do
    todo <- withMVar (lockedStore svar) $ \ store -> do
              maybeValue <- tryTakeMVar store
              case maybeValue of
                Nothing -> return (Left store)
                Just value -> return (Right value)
    -- postcondition for the withMVar (lockedStore svar) is that the store is empty.
    case todo of
      Left store -> takeMVar store
      Right value -> return value  -- block indefinately

-- |See if a value is immediately available, then take it and return.  This may momentarily block.
-- This does not jump the FIFO reading queue.
tryReadSampleVar :: SampleVar a -> IO (Maybe a)
tryReadSampleVar svar = block $ do
  maybeH <- tryTakeMVar (readQueue svar)
  case maybeH of
    Nothing -> return Nothing
    Just h -> do
      maybeVal <- withMVar (lockedStore svar) tryTakeMVar
      putMVar (readQueue svar) h
      return maybeVal

-- |Write a value into the 'SampleVar', overwriting any previous value that was there.  A currently
-- blocked reader will find the new value, not the old value.
writeSampleVar :: SampleVar a -> a -> IO ()
writeSampleVar svar value = do
  withMVar (lockedStore svar) $ \ store -> do
    _ <- tryTakeMVar store
    putMVar store value
  -- postcondition for the withMVar (lockedStore svar) is that the store is full.

-- | Returns 'True' if the 'SampleVar' is currently empty.
--
-- Note that this function is only useful if you know that no other
-- threads can be modifying the state of the 'SampleVar', because
-- otherwise the state of the 'SampleVar' may have changed by the time
-- you see the result of 'isEmptySampleVar'.
--
-- This may momentarily block
isEmptySampleVar :: SampleVar a -> IO Bool
isEmptySampleVar svar = withMVar (lockedStore svar) isEmptyMVar