From 3de325e86e1451418879d0450971a22f092360c7 Mon Sep 17 00:00:00 2001 From: Chris Kuklewicz Date: Mon, 18 Apr 2011 23:40:22 +0100 Subject: [PATCH 2/3] Adding new SampleVar.hs --- Control/Concurrent/SampleVar.hs | 114 ++++++++++++++++++--------------------- 1 files changed, 53 insertions(+), 61 deletions(-) diff --git a/Control/Concurrent/SampleVar.hs b/Control/Concurrent/SampleVar.hs index ca68a38..2e66f00 100644 --- a/Control/Concurrent/SampleVar.hs +++ b/Control/Concurrent/SampleVar.hs @@ -10,8 +10,10 @@ -- Stability : experimental -- Portability : non-portable (concurrency) -- --- Sample variables +-- Sample variables. -- +-- Rewritten to make it safe when operations gets interrupted while blocking +-- (fixes ticket #3160). ----------------------------------------------------------------------------- module Control.Concurrent.SampleVar @@ -29,13 +31,9 @@ module Control.Concurrent.SampleVar ) where import Prelude - import Control.Concurrent.MVar - -import Control.Exception ( mask_ ) - -import Data.Functor ( (<$>) ) - +import Control.Exception +import Control.Monad import Data.Typeable #include "Typeable.h" @@ -55,78 +53,72 @@ import Data.Typeable -- -- * Writing to a filled 'SampleVar' overwrites the current value. -- (different from 'putMVar' on full 'MVar'.) - -newtype SampleVar a = SampleVar ( MVar ( Int -- 1 == full - -- 0 == empty - -- <0 no of readers blocked - , MVar a - ) - ) - deriving (Eq) +-- +-- The readers queue in FIFO order, with the leading reader joining the +-- writers in a second FIFO queue to access the stored value. Writers can +-- jump the queue of readers to update the value. The leading reader has to +-- wait on all previous writes to finish before taking the value. +-- +-- This design choice emphasises that each reader sees the most up-to-date +-- value possible while still guaranteeing progress. +data SampleVar a = SampleVar { readQueue :: MVar () + , lockedStore :: MVar (MVar a) } + deriving (Eq) INSTANCE_TYPEABLE1(SampleVar,sampleVarTc,"SampleVar") -- |Build a new, empty, 'SampleVar' newEmptySampleVar :: IO (SampleVar a) newEmptySampleVar = do - v <- newEmptyMVar - SampleVar <$> newMVar (0,v) + newReadQueue <- newMVar () + newLockedStore <- newMVar =<< newEmptyMVar + return (SampleVar { readQueue = newReadQueue + , lockedStore = newLockedStore }) --- |Build a 'SampleVar' with an initial value. +-- |Build a 'SampleVar' with an initial value, stored lazily. newSampleVar :: a -> IO (SampleVar a) newSampleVar a = do - v <- newMVar a - SampleVar <$> newMVar (1,v) + newReadQueue <- newMVar () + newLockedStore <- newMVar =<< newMVar a + return (SampleVar { readQueue = newReadQueue + , lockedStore = newLockedStore }) -- |If the SampleVar is full, leave it empty. Otherwise, do nothing. +-- +-- 'emptySampleVar' can block and be interrupted, in which case it does +-- nothing. If 'emptySampleVar' returns then it left the 'SampleVar' in an +-- empty state. emptySampleVar :: SampleVar a -> IO () -emptySampleVar (SampleVar v) = mask_ $ do - s@(readers, var) <- takeMVar v - if readers > 0 then do - _ <- takeMVar var - putMVar v (0,var) - else - putMVar v s +emptySampleVar (SampleVar _ ls) = withMVar ls (void . tryTakeMVar) + -- (withMVar ls) might block, interrupting is okay --- |Wait for a value to become available, then take it and return. -readSampleVar :: SampleVar a -> IO a -readSampleVar (SampleVar svar) = mask_ $ do --- --- filled => make empty and grab sample --- not filled => try to grab value, empty when read val. +-- |Wait for a value to become available, then take it and return. The queue +-- of blocked 'readSampleVar' threads is a fair FIFO queue. -- - (readers,val) <- takeMVar svar - let readers' = readers-1 - readers' `seq` putMVar svar (readers',val) - takeMVar val +-- 'readSampleVar' can block and be interrupted, in which case it takes +-- nothing. If 'readSampleVar returns normally then it has taken a value. +readSampleVar :: SampleVar a -> IO a +readSampleVar (SampleVar rq ls) = mask_ $ withMVar rq $ \ () -> + join $ withMVar ls (return . takeMVar) + -- (withMVar rq) might block, interrupting is okay + -- (withMVar ls) might block, interrupting is okay + -- join (takeMVar _) will block if empty, interrupting is okay -- |Write a value into the 'SampleVar', overwriting any previous value that -- was there. -writeSampleVar :: SampleVar a -> a -> IO () -writeSampleVar (SampleVar svar) v = mask_ $ do -- --- filled => overwrite --- not filled => fill, write val --- - s@(readers,val) <- takeMVar svar - case readers of - 1 -> - swapMVar val v >> - putMVar svar s - _ -> - putMVar val v >> - let readers' = min 1 (readers+1) - in readers' `seq` putMVar svar (readers', val) +-- 'writeSampleVar' can block and be interrupted, in which case it does nothing. +writeSampleVar :: SampleVar a -> a -> IO () +writeSampleVar (SampleVar _ ls) a = mask_ $ withMVar ls $ \ v -> do + void $ tryTakeMVar v + putMVar v a -- cannot block + -- (withMVar ls) might block, interrupting is okay --- | Returns 'True' if the 'SampleVar' is currently empty. --- --- Note that this function is only useful if you know that no other --- threads can be modifying the state of the 'SampleVar', because --- otherwise the state of the 'SampleVar' may have changed by the time --- you see the result of 'isEmptySampleVar'. +-- |Returns 'True' if the 'SampleVar' is currently empty. -- +-- Note that this function is only useful if you know that no other threads +-- can be modifying the state of the 'SampleVar', because otherwise the state +-- of the 'SampleVar' may have changed by the time you see the result of +-- 'isEmptySampleVar'. isEmptySampleVar :: SampleVar a -> IO Bool -isEmptySampleVar (SampleVar svar) = do - (readers, _) <- readMVar svar - return (readers <= 0) - +isEmptySampleVar (SampleVar _ ls) = withMVar ls isEmptyMVar -- 1.7.2