{-# LANGUAGE Trustworthy #-}{-# OPTIONS_GHC -funbox-strict-fields #-}------------------------------------------------------------------------------- |-- Module : Control.Concurrent.QSemN-- Copyright : (c) The University of Glasgow 2001-- License : BSD-style (see the file libraries/base/LICENSE)---- Maintainer : libraries@haskell.org-- Stability : stable-- Portability : non-portable (concurrency)---- Quantity semaphores in which each thread may wait for an arbitrary-- \"amount\".-------------------------------------------------------------------------------moduleControl.Concurrent.QSemN (-- * General Quantity SemaphoresQSemN ,-- abstractnewQSemN ,-- :: Int -> IO QSemNwaitQSemN ,-- :: QSemN -> Int -> IO ()signalQSemN -- :: QSemN -> Int -> IO ())whereimportPrelude importGHC.Internal.Control.Concurrent.MVar (MVar ,newEmptyMVar ,takeMVar ,tryPutMVar ,isEmptyMVar )importGHC.Internal.Control.Exception importGHC.Internal.Control.Monad (when )importGHC.Internal.Data.IORef (IORef ,newIORef ,atomicModifyIORef )importSystem.IO.Unsafe (unsafePerformIO )-- | 'QSemN' is a quantity semaphore in which the resource is acquired-- and released in arbitrary amounts. It provides guaranteed FIFO ordering-- for satisfying blocked `waitQSemN` calls.---- The pattern---- > bracket_ (waitQSemN n) (signalQSemN n) (...)---- is safe; it never loses any of the resource.--dataQSemN =QSemN !(IORef (Int ,[(Int ,MVar ())],[(Int ,MVar ())]))-- The semaphore state (i, xs, ys):---- i is the current resource value---- (xs,ys) is the queue of blocked threads, where the queue is-- given by xs ++ reverse ys. We can enqueue new blocked threads-- by consing onto ys, and dequeue by removing from the head of xs.---- A blocked thread is represented by an empty (MVar ()). To unblock-- the thread, we put () into the MVar.---- A thread can dequeue itself by also putting () into the MVar, which-- it must do if it receives an exception while blocked in waitQSemN.-- This means that when unblocking a thread in signalQSemN we must-- first check whether the MVar is already full.-- |Build a new 'QSemN' with a supplied initial quantity.-- The initial quantity must be at least 0.newQSemN ::Int ->IO QSemN newQSemN :: Int -> IO QSemN newQSemN Int initial |Int initial Int -> Int -> Bool forall a. Ord a => a -> a -> Bool < Int 0=String -> IO QSemN forall a. String -> IO a forall (m :: * -> *) a. MonadFail m => String -> m a fail String "newQSemN: Initial quantity must be non-negative"|Bool otherwise =dosem <-(Int, [(Int, MVar ())], [(Int, MVar ())]) -> IO (IORef (Int, [(Int, MVar ())], [(Int, MVar ())])) forall a. a -> IO (IORef a) newIORef (Int initial ,[],[])return (QSemN sem )-- An unboxed version of Maybe (MVar a)dataMaybeMV a =JustMV !(MVar a )|NothingMV -- |Wait for the specified quantity to become available.waitQSemN ::QSemN ->Int ->IO ()-- We need to mask here. Once we've enqueued our MVar, we need-- to be sure to wait for it. Otherwise, we could lose our-- allocated resource.waitQSemN :: QSemN -> Int -> IO () waitQSemN qs :: QSemN qs @(QSemN IORef (Int, [(Int, MVar ())], [(Int, MVar ())]) m )Int sz =IO () -> IO () forall a. IO a -> IO a mask_ (IO () -> IO ()) -> IO () -> IO () forall a b. (a -> b) -> a -> b $ do-- unsafePerformIO and not unsafeDupablePerformIO. We must-- be sure to wait on the same MVar that gets enqueued.mmvar <-IORef (Int, [(Int, MVar ())], [(Int, MVar ())]) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ())) -> IO (MaybeMV ()) forall a b. IORef a -> (a -> (a, b)) -> IO b atomicModifyIORef IORef (Int, [(Int, MVar ())], [(Int, MVar ())]) m (((Int, [(Int, MVar ())], [(Int, MVar ())]) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ())) -> IO (MaybeMV ())) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ())) -> IO (MaybeMV ()) forall a b. (a -> b) -> a -> b $ \(Int i ,[(Int, MVar ())] b1 ,[(Int, MVar ())] b2 )->IO ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ()) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ()) forall a. IO a -> a unsafePerformIO (IO ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ()) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ())) -> IO ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ()) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ()) forall a b. (a -> b) -> a -> b $ doletz :: Int z =Int i Int -> Int -> Int forall a. Num a => a -> a -> a - Int sz ifInt z Int -> Int -> Bool forall a. Ord a => a -> a -> Bool < Int 0thendob <-IO (MVar ()) forall a. IO (MVar a) newEmptyMVar return ((i ,b1 ,(sz ,b ): b2 ),JustMV b )else((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ()) -> IO ((Int, [(Int, MVar ())], [(Int, MVar ())]), MaybeMV ()) forall a. a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return ((Int z ,[(Int, MVar ())] b1 ,[(Int, MVar ())] b2 ),MaybeMV () forall a. MaybeMV a NothingMV )-- Note: this case match actually allocates the MVar if necessary.casemmvar ofMaybeMV () NothingMV ->() -> IO () forall a. a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return ()JustMV MVar () b ->MVar () -> IO () wait MVar () b wherewait ::MVar ()->IO ()wait :: MVar () -> IO () wait MVar () b =MVar () -> IO () forall a. MVar a -> IO a takeMVar MVar () b IO () -> IO () -> IO () forall a b. IO a -> IO b -> IO a `onException` doalready_filled <-Bool -> Bool not (Bool -> Bool) -> IO Bool -> IO Bool forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> MVar () -> () -> IO Bool forall a. MVar a -> a -> IO Bool tryPutMVar MVar () b ()when already_filled $ signalQSemN qs sz -- |Signal that a given quantity is now available from the 'QSemN'.signalQSemN ::QSemN ->Int ->IO ()-- We don't need to mask here because we should *already* be masked-- here (e.g., by bracket). Indeed, if we're not already masked,-- it's too late to do so.---- What if the unsafePerformIO thunk is forced in another thread,-- and receives an asynchronous exception? That shouldn't be a-- problem: when we force it ourselves, presumably masked, we-- will resume its execution.signalQSemN :: QSemN -> Int -> IO () signalQSemN (QSemN IORef (Int, [(Int, MVar ())], [(Int, MVar ())]) m )Int sz0 =do-- unsafePerformIO and not unsafeDupablePerformIO. We must not-- wake up more threads than we're supposed to.unit <-IORef (Int, [(Int, MVar ())], [(Int, MVar ())]) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]), ())) -> IO () forall a b. IORef a -> (a -> (a, b)) -> IO b atomicModifyIORef IORef (Int, [(Int, MVar ())], [(Int, MVar ())]) m (((Int, [(Int, MVar ())], [(Int, MVar ())]) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]), ())) -> IO ()) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]), ())) -> IO () forall a b. (a -> b) -> a -> b $ \(Int i ,[(Int, MVar ())] a1 ,[(Int, MVar ())] a2 )->IO ((Int, [(Int, MVar ())], [(Int, MVar ())]), ()) -> ((Int, [(Int, MVar ())], [(Int, MVar ())]), ()) forall a. IO a -> a unsafePerformIO (Int -> [(Int, MVar ())] -> [(Int, MVar ())] -> IO ((Int, [(Int, MVar ())], [(Int, MVar ())]), ()) forall {a}. (Num a, Ord a) => a -> [(a, MVar ())] -> [(a, MVar ())] -> IO ((a, [(a, MVar ())], [(a, MVar ())]), ()) loop (Int sz0 Int -> Int -> Int forall a. Num a => a -> a -> a + Int i )[(Int, MVar ())] a1 [(Int, MVar ())] a2 )-- Forcing this will actually wake the necessary threads.evaluate unit whereloop :: a -> [(a, MVar ())] -> [(a, MVar ())] -> IO ((a, [(a, MVar ())], [(a, MVar ())]), ()) loop a 0[(a, MVar ())] bs [(a, MVar ())] b2 =((a, [(a, MVar ())], [(a, MVar ())]), ()) -> IO ((a, [(a, MVar ())], [(a, MVar ())]), ()) forall a. a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return ((a 0,[(a, MVar ())] bs ,[(a, MVar ())] b2 ),())loop a sz [][]=((a, [(a, MVar ())], [(a, MVar ())]), ()) -> IO ((a, [(a, MVar ())], [(a, MVar ())]), ()) forall a. a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return ((a sz ,[],[]),())loop a sz [][(a, MVar ())] b2 =a -> [(a, MVar ())] -> [(a, MVar ())] -> IO ((a, [(a, MVar ())], [(a, MVar ())]), ()) loop a sz ([(a, MVar ())] -> [(a, MVar ())] forall a. [a] -> [a] reverse [(a, MVar ())] b2 )[]loop a sz ((a j ,MVar () b ): [(a, MVar ())] bs )[(a, MVar ())] b2 |a j a -> a -> Bool forall a. Ord a => a -> a -> Bool > a sz =dor <-MVar () -> IO Bool forall a. MVar a -> IO Bool isEmptyMVar MVar () b ifr thenreturn ((sz ,(j ,b ): bs ,b2 ),())elseloop sz bs b2 |Bool otherwise =dor <-MVar () -> () -> IO Bool forall a. MVar a -> a -> IO Bool tryPutMVar MVar () b ()ifr thenloop (sz - j )bs b2 elseloop sz bs b2