{-# LANGUAGE Trustworthy #-}{-# LANGUAGE CPP #-}------------------------------------------------------------------------------- |-- Module : Control.Concurrent.Chan-- Copyright : (c) The University of Glasgow 2001-- License : BSD-style (see the file libraries/base/LICENSE)---- Maintainer : libraries@haskell.org-- Stability : stable-- Portability : non-portable (concurrency)---- Unbounded channels.---- The channels are implemented with 'Control.Concurrent.MVar's and therefore inherit all the-- caveats that apply to @MVar@s (possibility of races, deadlocks etc). The-- @stm@ (software transactional memory) library has a more robust implementation-- of channels called @TChan@s.-------------------------------------------------------------------------------moduleControl.Concurrent.Chan (-- * The 'Chan' typeChan ,-- abstract-- * OperationsnewChan ,writeChan ,readChan ,dupChan ,-- * Stream interfacegetChanContents ,writeList2Chan ,)whereimportPrelude importSystem.IO.Unsafe (unsafeInterleaveIO )importGHC.Internal.Control.Concurrent.MVar importGHC.Internal.Control.Exception (mask_ )
#define _UPK_(x) {-# UNPACK #-} !(x)
-- A channel is represented by two @MVar@s keeping track of the two ends-- of the channel contents, i.e., the read- and write ends. Empty @MVar@s-- are used to handle consumers trying to read from an empty channel.-- |'Chan' is an abstract type representing an unbounded FIFO channel.dataChan a =Chan _UPK_(MVar(Streama))_UPK_(MVar(Streama))-- Invariant: the Stream a is always an empty MVarderivingChan a -> Chan a -> Bool
(Chan a -> Chan a -> Bool)
-> (Chan a -> Chan a -> Bool) -> Eq (Chan a)
forall a. Chan a -> Chan a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: forall a. Chan a -> Chan a -> Bool
== :: Chan a -> Chan a -> Bool
$c/= :: forall a. Chan a -> Chan a -> Bool
/= :: Chan a -> Chan a -> Bool
Eq -- ^ @since 4.4.0.0typeStream a =MVar (ChItem a )dataChItem a =ChItem a _UPK_(Streama)-- benchmarks show that unboxing the MVar here is worthwhile, because-- although it leads to higher allocation, the channel data takes up-- less space and is therefore quicker to GC.-- See the Concurrent Haskell paper for a diagram explaining-- how the different channel operations proceed.-- @newChan@ sets up the read and write end of a channel by initialising-- these two @MVar@s with an empty @MVar@.-- |Build and return a new instance of 'Chan'.newChan ::IO (Chan a )newChan :: forall a. IO (Chan a)
newChan =dohole <-IO (MVar (ChItem a))
forall a. IO (MVar a)
newEmptyMVar readVar <-newMVar hole writeVar <-newMVar hole return (Chan readVar writeVar )-- To put an element on a channel, a new hole at the write end is created.-- What was previously the empty @MVar@ at the back of the channel is then-- filled in with a new stream element holding the entered value and the-- new hole.-- |Write a value to a 'Chan'.writeChan ::Chan a ->a ->IO ()writeChan :: forall a. Chan a -> a -> IO ()
writeChan (Chan MVar (Stream a)
_MVar (Stream a)
writeVar )a
val =donew_hole <-IO (Stream a)
forall a. IO (MVar a)
newEmptyMVar mask_ $ doold_hole <-takeMVar writeVar putMVar old_hole (ChItem val new_hole )putMVar writeVar new_hole -- The reason we don't simply do this:---- modifyMVar_ writeVar $ \old_hole -> do-- putMVar old_hole (ChItem val new_hole)-- return new_hole---- is because if an asynchronous exception is received after the 'putMVar'-- completes and before modifyMVar_ installs the new value, it will set the-- Chan's write end to a filled hole.-- |Read the next value from the 'Chan'. Blocks when the channel is empty. Since-- the read end of a channel is an 'MVar', this operation inherits fairness-- guarantees of 'MVar's (e.g. threads blocked in this operation are woken up in-- FIFO order).---- Throws 'Control.Exception.BlockedIndefinitelyOnMVar' when the channel is-- empty and no other thread holds a reference to the channel.readChan ::Chan a ->IO a readChan :: forall a. Chan a -> IO a
readChan (Chan MVar (Stream a)
readVar MVar (Stream a)
_)=MVar (Stream a) -> (Stream a -> IO (Stream a, a)) -> IO a
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar (Stream a)
readVar ((Stream a -> IO (Stream a, a)) -> IO a)
-> (Stream a -> IO (Stream a, a)) -> IO a
forall a b. (a -> b) -> a -> b
$ \Stream a
read_end ->do(ChItem val new_read_end )<-Stream a -> IO (ChItem a)
forall a. MVar a -> IO a
readMVar Stream a
read_end -- Use readMVar here, not takeMVar,-- else dupChan doesn't workreturn (new_read_end ,val )-- |Duplicate a 'Chan': the duplicate channel begins empty, but data written to-- either channel from then on will be available from both. Hence this creates-- a kind of broadcast channel, where data written by anyone is seen by-- everyone else.---- (Note that a duplicated channel is not equal to its original.-- So: @fmap (c /=) $ dupChan c@ returns @True@ for all @c@.)dupChan ::Chan a ->IO (Chan a )dupChan :: forall a. Chan a -> IO (Chan a)
dupChan (Chan MVar (Stream a)
_MVar (Stream a)
writeVar )=dohole <-MVar (Stream a) -> IO (Stream a)
forall a. MVar a -> IO a
readMVar MVar (Stream a)
writeVar newReadVar <-newMVar hole return (Chan newReadVar writeVar )-- Operators for interfacing with functional streams.-- |Return a lazy list representing the contents of the supplied-- 'Chan', much like 'GHC.Internal.System.IO.hGetContents'.getChanContents ::Chan a ->IO [a ]getChanContents :: forall a. Chan a -> IO [a]
getChanContents Chan a
ch =IO [a] -> IO [a]
forall a. IO a -> IO a
unsafeInterleaveIO (dox <-Chan a -> IO a
forall a. Chan a -> IO a
readChan Chan a
ch xs <-getChanContents ch return (x : xs ))-- |Write an entire list of items to a 'Chan'.writeList2Chan ::Chan a ->[a ]->IO ()writeList2Chan :: forall a. Chan a -> [a] -> IO ()
writeList2Chan Chan a
ch [a]
ls =[IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ ((a -> IO ()) -> [a] -> [IO ()]
forall a b. (a -> b) -> [a] -> [b]
map (Chan a -> a -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan a
ch )[a]
ls )

AltStyle によって変換されたページ (->オリジナル) /