[commit: stm] master: Add TBQueue, bump version to 2.4 and document changes since 2.3 (b72fafc)
Simon Marlow
marlowsd at gmail.com
Wed Jul 4 13:31:42 CEST 2012
Repository : ssh://darcs.haskell.org//srv/darcs/packages/stm
On branch : master
http://hackage.haskell.org/trac/ghc/changeset/b72fafc5d58d90034d615d5426c744b849dc31b6
>---------------------------------------------------------------
commit b72fafc5d58d90034d615d5426c744b849dc31b6
Author: Simon Marlow <marlowsd at gmail.com>
Date: Wed Jul 4 11:21:10 2012 +0100
Add TBQueue, bump version to 2.4 and document changes since 2.3
>---------------------------------------------------------------
Control/Concurrent/STM/TBQueue.hs | 175 +++++++++++++++++++++++++++++++++++++
stm.cabal | 16 +++-
2 files changed, 189 insertions(+), 2 deletions(-)
diff --git a/Control/Concurrent/STM/TBQueue.hs b/Control/Concurrent/STM/TBQueue.hs
new file mode 100644
index 0000000..42b04b2
--- /dev/null
+++ b/Control/Concurrent/STM/TBQueue.hs
@@ -0,0 +1,175 @@
+{-# OPTIONS_GHC -fno-warn-name-shadowing #-}
+{-# LANGUAGE CPP, DeriveDataTypeable #-}
+
+#if __GLASGOW_HASKELL__ >= 701
+{-# LANGUAGE Trustworthy #-}
+#endif
+
+-----------------------------------------------------------------------------
+-- |
+-- Module : Control.Concurrent.STM.TBQueue
+-- Copyright : (c) The University of Glasgow 2012
+-- License : BSD-style (see the file libraries/base/LICENSE)
+--
+-- Maintainer : libraries at haskell.org
+-- Stability : experimental
+-- Portability : non-portable (requires STM)
+--
+-- 'TBQueue' is a bounded version of 'TQueue'. The queue has a maximum
+-- capacity set when it is created. If the queue already contains the
+-- maximum number of elements, then 'writeTBQueue' blocks until an
+-- element is removed from the queue.
+--
+-- The implementation is based on the traditional purely-functional
+-- queue representation that uses two lists to obtain amortised /O(1)/
+-- enqueue and dequeue operations.
+--
+-----------------------------------------------------------------------------
+
+module Control.Concurrent.STM.TBQueue (
+ -- * TBQueue
+ TBQueue,
+ newTBQueue,
+ newTBQueueIO,
+ readTBQueue,
+ tryReadTBQueue,
+ peekTBQueue,
+ tryPeekTBQueue,
+ writeTBQueue,
+ unGetTBQueue,
+ isEmptyTBQueue,
+ ) where
+
+
+import Control.Concurrent.STM
+
+#define _UPK_(x) {-# UNPACK #-} !(x)
+
+-- | 'TBQueue' is an abstract type representing a bounded FIFO channel.
+data TBQueue a
+ = TBQueue _UPK_(TVar Int) -- CR: read capacity
+ _UPK_(TVar [a]) -- R: elements waiting to be read
+ _UPK_(TVar Int) -- CW: write capacity
+ _UPK_(TVar [a]) -- W: elements written (head is most recent)
+
+-- Total channel capacity remaining is CR + CW. Reads only need to
+-- access CR, writes usually need to access only CW but sometimes need
+-- CR. So in the common case we avoid contention between CR and CW.
+--
+-- - when removing an element from R:
+-- CR := CR + 1
+--
+-- - when adding an element to W:
+-- if CW is non-zero
+-- then CW := CW - 1
+-- then if CR is non-zero
+-- then CW := CR - 1; CR := 0
+-- else **FULL**
+
+-- |Build and returns a new instance of 'TBQueue'
+newTBQueue :: Int -- ^ maximum number of elements the queue can hold
+ -> STM (TBQueue a)
+newTBQueue size = do
+ read <- newTVar []
+ write <- newTVar []
+ rsize <- newTVar 0
+ wsize <- newTVar size
+ return (TBQueue rsize read wsize write)
+
+-- |@IO@ version of 'newTBQueue'. This is useful for creating top-level
+-- 'TBQueue's using 'System.IO.Unsafe.unsafePerformIO', because using
+-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
+-- possible.
+newTBQueueIO :: Int -> IO (TBQueue a)
+newTBQueueIO size = do
+ read <- newTVarIO []
+ write <- newTVarIO []
+ rsize <- newTVarIO 0
+ wsize <- newTVarIO size
+ return (TBQueue rsize read wsize write)
+
+-- |Write a value to a 'TBQueue'; blocks if the queue is full.
+writeTBQueue :: TBQueue a -> a -> STM ()
+writeTBQueue (TBQueue rsize _read wsize write) a = do
+ w <- readTVar wsize
+ if (w /= 0)
+ then do writeTVar wsize (w - 1)
+ else do
+ r <- readTVar rsize
+ if (r /= 0)
+ then do writeTVar rsize 0
+ writeTVar wsize (r - 1)
+ else retry
+ listend <- readTVar write
+ writeTVar write (a:listend)
+
+-- |Read the next value from the 'TBQueue'.
+readTBQueue :: TBQueue a -> STM a
+readTBQueue (TBQueue rsize read _wsize write) = do
+ xs <- readTVar read
+ r <- readTVar rsize
+ writeTVar rsize (r + 1)
+ case xs of
+ (x:xs') -> do
+ writeTVar read xs'
+ return x
+ [] -> do
+ ys <- readTVar write
+ case ys of
+ [] -> retry
+ _ -> do
+ let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be
+ -- short, otherwise it will conflict
+ writeTVar write []
+ writeTVar read zs
+ return z
+
+-- | A version of 'readTBQueue' which does not retry. Instead it
+-- returns @Nothing@ if no value is available.
+tryReadTBQueue :: TBQueue a -> STM (Maybe a)
+tryReadTBQueue c = fmap Just (readTBQueue c) `orElse` return Nothing
+
+-- | Get the next value from the @TBQueue@ without removing it,
+-- retrying if the channel is empty.
+peekTBQueue :: TBQueue a -> STM a
+peekTBQueue c = do
+ x <- readTBQueue c
+ unGetTBQueue c x
+ return x
+
+-- | A version of 'peekTBQueue' which does not retry. Instead it
+-- returns @Nothing@ if no value is available.
+tryPeekTBQueue :: TBQueue a -> STM (Maybe a)
+tryPeekTBQueue c = do
+ m <- tryReadTBQueue c
+ case m of
+ Nothing -> return Nothing
+ Just x -> do
+ unGetTBQueue c x
+ return m
+
+-- |Put a data item back onto a channel, where it will be the next item read.
+-- Blocks if the queue is full.
+unGetTBQueue :: TBQueue a -> a -> STM ()
+unGetTBQueue (TBQueue rsize read wsize _write) a = do
+ r <- readTVar rsize
+ if (r > 0)
+ then do writeTVar rsize (r - 1)
+ else do
+ w <- readTVar wsize
+ if (w > 0)
+ then writeTVar wsize (w - 1)
+ else retry
+ xs <- readTVar read
+ writeTVar read (a:xs)
+
+-- |Returns 'True' if the supplied 'TBQueue' is empty.
+isEmptyTBQueue :: TBQueue a -> STM Bool
+isEmptyTBQueue (TBQueue _rsize read _wsize write) = do
+ xs <- readTVar read
+ case xs of
+ (_:_) -> return False
+ [] -> do ys <- readTVar write
+ case ys of
+ [] -> return True
+ _ -> return False
diff --git a/stm.cabal b/stm.cabal
index 7c9366b..2db764d 100644
--- a/stm.cabal
+++ b/stm.cabal
@@ -1,11 +1,22 @@
name: stm
-version: 2.3
+version: 2.4
license: BSD3
license-file: LICENSE
maintainer: libraries at haskell.org
synopsis: Software Transactional Memory
category: Concurrency
-description: A modular composable concurrency abstraction.
+description:
+ A modular composable concurrency abstraction.
+ .
+ Changes in version 2.4
+ .
+ * Added "Control.Concurrent.STM.TQueue" (a faster @TChan@)
+ * Added "Control.Concurrent.STM.TBQueue" (a bounded channel based on @TQueue@)
+ * @TChan@ has an @Eq@ instances
+ * Added @newBroadcastTChan@ and @newBroadcastTChanIO@
+ * Some performance improvements for @TChan@
+ * Added @cloneTChan@
+
build-type: Simple
cabal-version: >=1.6
@@ -23,6 +34,7 @@ library
Control.Concurrent.STM.TChan
Control.Concurrent.STM.TMVar
Control.Concurrent.STM.TQueue
+ Control.Concurrent.STM.TBQueue
Control.Monad.STM
other-modules:
Control.Sequential.STM
More information about the Cvs-libraries
mailing list