{-# LANGUAGE Trustworthy #-}{-# LANGUAGE CPP #-}------------------------------------------------------------------------------- |-- Module : Control.Concurrent.Chan-- Copyright : (c) The University of Glasgow 2001-- License : BSD-style (see the file libraries/base/LICENSE)---- Maintainer : libraries@haskell.org-- Stability : stable-- Portability : non-portable (concurrency)---- Unbounded channels.---- The channels are implemented with @MVar@s and therefore inherit all the-- caveats that apply to @MVar@s (possibility of races, deadlocks etc). The-- stm (software transactional memory) library has a more robust implementation-- of channels called @TChan@s.-------------------------------------------------------------------------------moduleControl.Concurrent.Chan(-- * The 'Chan' typeChan ,-- abstract-- * OperationsnewChan ,writeChan ,readChan ,dupChan ,-- * Stream interfacegetChanContents ,writeList2Chan ,)whereimportSystem.IO.Unsafe (unsafeInterleaveIO )importControl.Concurrent.MVar importControl.Exception (mask_ )
#define _UPK_(x) {-# UNPACK #-} !(x)
-- A channel is represented by two @MVar@s keeping track of the two ends-- of the channel contents,i.e., the read- and write ends. Empty @MVar@s-- are used to handle consumers trying to read from an empty channel.-- |'Chan' is an abstract type representing an unbounded FIFO channel.dataChan a =Chan _UPK_(MVar(Streama))_UPK_(MVar(Streama))-- Invariant: the Stream a is always an empty MVarderivingChan a -> Chan a -> Bool
(Chan a -> Chan a -> Bool)
-> (Chan a -> Chan a -> Bool) -> Eq (Chan a)
forall a. Chan a -> Chan a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall a. Chan a -> Chan a -> Bool
== :: Chan a -> Chan a -> Bool
$c/= :: forall a. Chan a -> Chan a -> Bool
/= :: Chan a -> Chan a -> Bool
Eq -- ^ @since 4.4.0.0typeStream a =MVar (ChItem a )dataChItem a =ChItem a _UPK_(Streama)-- benchmarks show that unboxing the MVar here is worthwhile, because-- although it leads to higher allocation, the channel data takes up-- less space and is therefore quicker to GC.-- See the Concurrent Haskell paper for a diagram explaining the-- how the different channel operations proceed.-- @newChan@ sets up the read and write end of a channel by initialising-- these two @MVar@s with an empty @MVar@.-- |Build and returns a new instance of 'Chan'.newChan ::IO (Chan a )newChan :: forall a. IO (Chan a)
newChan =doMVar (ChItem a)
hole <-IO (MVar (ChItem a))
forall a. IO (MVar a)
newEmptyMVar MVar (MVar (ChItem a))
readVar <-MVar (ChItem a) -> IO (MVar (MVar (ChItem a)))
forall a. a -> IO (MVar a)
newMVar MVar (ChItem a)
hole MVar (MVar (ChItem a))
writeVar <-MVar (ChItem a) -> IO (MVar (MVar (ChItem a)))
forall a. a -> IO (MVar a)
newMVar MVar (ChItem a)
hole Chan a -> IO (Chan a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar (MVar (ChItem a)) -> MVar (MVar (ChItem a)) -> Chan a
forall a. MVar (Stream a) -> MVar (Stream a) -> Chan a
Chan MVar (MVar (ChItem a))
readVar MVar (MVar (ChItem a))
writeVar )-- To put an element on a channel, a new hole at the write end is created.-- What was previously the empty @MVar@ at the back of the channel is then-- filled in with a new stream element holding the entered value and the-- new hole.-- |Write a value to a 'Chan'.writeChan ::Chan a ->a ->IO ()writeChan :: forall a. Chan a -> a -> IO ()
writeChan (Chan MVar (Stream a)
_MVar (Stream a)
writeVar )a
val =doStream a
new_hole <-IO (Stream a)
forall a. IO (MVar a)
newEmptyMVar IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ doStream a
old_hole <-MVar (Stream a) -> IO (Stream a)
forall a. MVar a -> IO a
takeMVar MVar (Stream a)
writeVar Stream a -> ChItem a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar Stream a
old_hole (a -> Stream a -> ChItem a
forall a. a -> Stream a -> ChItem a
ChItem a
val Stream a
new_hole )MVar (Stream a) -> Stream a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Stream a)
writeVar Stream a
new_hole -- The reason we don't simply do this:---- modifyMVar_ writeVar $ \old_hole -> do-- putMVar old_hole (ChItem val new_hole)-- return new_hole---- is because if an asynchronous exception is received after the 'putMVar'-- completes and before modifyMVar_ installs the new value, it will set the-- Chan's write end to a filled hole.-- |Read the next value from the 'Chan'. Blocks when the channel is empty. Since-- the read end of a channel is an 'MVar', this operation inherits fairness-- guarantees of 'MVar's (e.g. threads blocked in this operation are woken up in-- FIFO order).---- Throws 'Control.Exception.BlockedIndefinitelyOnMVar' when the channel is-- empty and no other thread holds a reference to the channel.readChan ::Chan a ->IO a readChan :: forall a. Chan a -> IO a
readChan (Chan MVar (Stream a)
readVar MVar (Stream a)
_)=MVar (Stream a) -> (Stream a -> IO (Stream a, a)) -> IO a
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar (Stream a)
readVar ((Stream a -> IO (Stream a, a)) -> IO a)
-> (Stream a -> IO (Stream a, a)) -> IO a
forall a b. (a -> b) -> a -> b
$ \Stream a
read_end ->do(ChItem a
val Stream a
new_read_end )<-Stream a -> IO (ChItem a)
forall a. MVar a -> IO a
readMVar Stream a
read_end -- Use readMVar here, not takeMVar,-- else dupChan doesn't work(Stream a, a) -> IO (Stream a, a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream a
new_read_end ,a
val )-- |Duplicate a 'Chan': the duplicate channel begins empty, but data written to-- either channel from then on will be available from both. Hence this creates-- a kind of broadcast channel, where data written by anyone is seen by-- everyone else.---- (Note that a duplicated channel is not equal to its original.-- So: @fmap (c /=) $ dupChan c@ returns @True@ for all @c@.)dupChan ::Chan a ->IO (Chan a )dupChan :: forall a. Chan a -> IO (Chan a)
dupChan (Chan MVar (Stream a)
_MVar (Stream a)
writeVar )=doStream a
hole <-MVar (Stream a) -> IO (Stream a)
forall a. MVar a -> IO a
readMVar MVar (Stream a)
writeVar MVar (Stream a)
newReadVar <-Stream a -> IO (MVar (Stream a))
forall a. a -> IO (MVar a)
newMVar Stream a
hole Chan a -> IO (Chan a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar (Stream a) -> MVar (Stream a) -> Chan a
forall a. MVar (Stream a) -> MVar (Stream a) -> Chan a
Chan MVar (Stream a)
newReadVar MVar (Stream a)
writeVar )-- Operators for interfacing with functional streams.-- |Return a lazy list representing the contents of the supplied-- 'Chan', much like 'System.IO.hGetContents'.getChanContents ::Chan a ->IO [a ]getChanContents :: forall a. Chan a -> IO [a]
getChanContents Chan a
ch =IO [a] -> IO [a]
forall a. IO a -> IO a
unsafeInterleaveIO (doa
x <-Chan a -> IO a
forall a. Chan a -> IO a
readChan Chan a
ch [a]
xs <-Chan a -> IO [a]
forall a. Chan a -> IO [a]
getChanContents Chan a
ch [a] -> IO [a]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
xs ))-- |Write an entire list of items to a 'Chan'.writeList2Chan ::Chan a ->[a ]->IO ()writeList2Chan :: forall a. Chan a -> [a] -> IO ()
writeList2Chan Chan a
ch [a]
ls =[IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ ((a -> IO ()) -> [a] -> [IO ()]
forall a b. (a -> b) -> [a] -> [b]
map (Chan a -> a -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan a
ch )[a]
ls )

AltStyle によって変換されたページ (->オリジナル) /