[PATCH 2/3] Adding new SampleVar.hs
Thu Feb 24 17:58:36 CET 2011
---
Control/Concurrent/SampleVar.hs | 114
++++++++++++++++++---------------------
1 files changed, 53 insertions(+), 61 deletions(-)
diff --git a/Control/Concurrent/SampleVar.hs
b/Control/Concurrent/SampleVar.hs
index ca68a38..2e66f00 100644
--- a/Control/Concurrent/SampleVar.hs
+++ b/Control/Concurrent/SampleVar.hs
@@ -10,8 +10,10 @@
-- Stability : experimental
-- Portability : non-portable (concurrency)
--
--- Sample variables
+-- Sample variables.
--
+-- Rewritten to make it safe when operations gets interrupted while
blocking
+-- (fixes ticket #3160).
-----------------------------------------------------------------------------
module Control.Concurrent.SampleVar
@@ -29,13 +31,9 @@ module Control.Concurrent.SampleVar
) where
import Prelude
-
import Control.Concurrent.MVar
-
-import Control.Exception ( mask_ )
-
-import Data.Functor ( (<$>) )
-
+import Control.Exception
+import Control.Monad
import Data.Typeable
#include "Typeable.h"
@@ -55,78 +53,72 @@ import Data.Typeable
--
-- * Writing to a filled 'SampleVar' overwrites the current value.
-- (different from 'putMVar' on full 'MVar'.)
-
-newtype SampleVar a = SampleVar ( MVar ( Int -- 1 == full
- -- 0 == empty
- -- <0 no of readers blocked
- , MVar a
- )
- )
- deriving (Eq)
+--
+-- The readers queue in FIFO order, with the leading reader joining the
+-- writers in a second FIFO queue to access the stored value. Writers can
+-- jump the queue of readers to update the value. The leading reader
has to
+-- wait on all previous writes to finish before taking the value.
+--
+-- This design choice emphasises that each reader sees the most up-to-date
+-- value possible while still guaranteeing progress.
+data SampleVar a = SampleVar { readQueue :: MVar ()
+ , lockedStore :: MVar (MVar a) }
+ deriving (Eq)
INSTANCE_TYPEABLE1(SampleVar,sampleVarTc,"SampleVar")
-- |Build a new, empty, 'SampleVar'
newEmptySampleVar :: IO (SampleVar a)
newEmptySampleVar = do
- v <- newEmptyMVar
- SampleVar <$> newMVar (0,v)
+ newReadQueue <- newMVar ()
+ newLockedStore <- newMVar =<< newEmptyMVar
+ return (SampleVar { readQueue = newReadQueue
+ , lockedStore = newLockedStore })
--- |Build a 'SampleVar' with an initial value.
+-- |Build a 'SampleVar' with an initial value, stored lazily.
newSampleVar :: a -> IO (SampleVar a)
newSampleVar a = do
- v <- newMVar a
- SampleVar <$> newMVar (1,v)
+ newReadQueue <- newMVar ()
+ newLockedStore <- newMVar =<< newMVar a
+ return (SampleVar { readQueue = newReadQueue
+ , lockedStore = newLockedStore })
-- |If the SampleVar is full, leave it empty. Otherwise, do nothing.
+--
+-- 'emptySampleVar' can block and be interrupted, in which case it does
+-- nothing. If 'emptySampleVar' returns then it left the 'SampleVar' in an
+-- empty state.
emptySampleVar :: SampleVar a -> IO ()
-emptySampleVar (SampleVar v) = mask_ $ do
- s@(readers, var) <- takeMVar v
- if readers > 0 then do
- _ <- takeMVar var
- putMVar v (0,var)
- else
- putMVar v s
+emptySampleVar (SampleVar _ ls) = withMVar ls (void . tryTakeMVar)
+ -- (withMVar ls) might block, interrupting is okay
--- |Wait for a value to become available, then take it and return.
-readSampleVar :: SampleVar a -> IO a
-readSampleVar (SampleVar svar) = mask_ $ do
---
--- filled => make empty and grab sample
--- not filled => try to grab value, empty when read val.
+-- |Wait for a value to become available, then take it and return. The
queue
+-- of blocked 'readSampleVar' threads is a fair FIFO queue.
--
- (readers,val) <- takeMVar svar
- let readers' = readers-1
- readers' `seq` putMVar svar (readers',val)
- takeMVar val
+-- 'readSampleVar' can block and be interrupted, in which case it takes
+-- nothing. If 'readSampleVar returns normally then it has taken a value.
+readSampleVar :: SampleVar a -> IO a
+readSampleVar (SampleVar rq ls) = mask_ $ withMVar rq $ \ () ->
+ join $ withMVar ls (return . takeMVar)
+ -- (withMVar rq) might block, interrupting is okay
+ -- (withMVar ls) might block, interrupting is okay
+ -- join (takeMVar _) will block if empty, interrupting is okay
-- |Write a value into the 'SampleVar', overwriting any previous value that
-- was there.
-writeSampleVar :: SampleVar a -> a -> IO ()
-writeSampleVar (SampleVar svar) v = mask_ $ do
--
--- filled => overwrite
--- not filled => fill, write val
---
- s@(readers,val) <- takeMVar svar
- case readers of
- 1 ->
- swapMVar val v >>
- putMVar svar s
- _ ->
- putMVar val v >>
- let readers' = min 1 (readers+1)
- in readers' `seq` putMVar svar (readers', val)
+-- 'writeSampleVar' can block and be interrupted, in which case it does
nothing.
+writeSampleVar :: SampleVar a -> a -> IO ()
+writeSampleVar (SampleVar _ ls) a = mask_ $ withMVar ls $ \ v -> do
+ void $ tryTakeMVar v
+ putMVar v a -- cannot block
+ -- (withMVar ls) might block, interrupting is okay
--- | Returns 'True' if the 'SampleVar' is currently empty.
---
--- Note that this function is only useful if you know that no other
--- threads can be modifying the state of the 'SampleVar', because
--- otherwise the state of the 'SampleVar' may have changed by the time
--- you see the result of 'isEmptySampleVar'.
+-- |Returns 'True' if the 'SampleVar' is currently empty.
--
+-- Note that this function is only useful if you know that no other threads
+-- can be modifying the state of the 'SampleVar', because otherwise the
state
+-- of the 'SampleVar' may have changed by the time you see the result of
+-- 'isEmptySampleVar'.
isEmptySampleVar :: SampleVar a -> IO Bool
-isEmptySampleVar (SampleVar svar) = do
- (readers, _) <- readMVar svar
- return (readers <= 0)
-
+isEmptySampleVar (SampleVar _ ls) = withMVar ls isEmptyMVar
--
1.7.2
More information about the Cvs-ghc
mailing list