1
\$\begingroup\$

I'm modelling two processes which have been put into a cyclic pipeline - the output of each feeds into the input of the other - so as to work out when they've deadlocked (i.e., neither can progress since they're each waiting on output from the other).

I've used the conduit package for this, as I couldn't see an easy way to do it using pipes, and streaming doesn't really look suited to this sort of task. It looks like monad-coroutine would be another possibility for this, but I didn't investigate it further.

Define the problem as follows:

We have 2 processes executing a program made up of two sorts of instruction: "send" and "receive". Each process has an incoming mailbox and can send to an outgoing mailbox, each of which is the end of a FIFO queue of unbounded length. Let's assume the items of data they're sending and receiving are Ints.

The instruction send n will send the int n to the process's outgoing mailbox (from which it can be retrieved by whatever program is monitoring the other end of the queue). receive will try and retrieve an int from the process's incoming mailbox. If there's nothing in the mailbox, the process will block until there is.

Processes execute each instruction in turn, and if there are no more instructions, they exit.

Now, assume we join 2 processes A and B "head to toe" in a cycle: process A receives data from process B's outbox, and sends data to process B's inbox, and vice versa.

Given a particular program P, we wish to simulate their execution to find out (a) whether the 2 processes deadlock, when given program P, and (b) how many items of data each process sends before the 2 of them either deadlock or come to the end of their instructions.

I think this problem is of interest because pre-emptive concurrency doesn't seem like a good way of solving it -- most concurrency libraries do their best to help you avoid deadlock, not model processes that have got into deadlock. (But perhaps I'm wrong, and this is easily modelled with a standard concurrency library - I'd be keen to hear.) Also it gave me a good reason to look at some of the streaming data packages (conduit, pipes and streaming), all of which I believe are modelled around the idea of processes that can "yield" data "downstream", or "await" it from "upstream", which is exactly what this problem requires.

Here's my code:

(NB: contains possible spoilers for the Advent of Code 2017, day 18 problem, Part 2 - but this is not relevant to my question, which is about modelling deadlock with coroutines.)

-- requires packages:
-- microlens-platform
-- mtl
-- conduit
-- conduit-combinators
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE FlexibleContexts #-}
import Conduit ( (.|), ConduitM, yield, await, fuseBoth,
 yieldMany, runConduitPure ) 
import Data.Conduit.Lift (execStateC)
import Data.Conduit.List (consume) 
import Lens.Micro.Platform
import Control.Monad.State (MonadState)
import Control.Monad (when, unless)
data Instr =
 Send Int 
 | Recv
 -- ... stub: imagine further operations here, e.g.
 -- acting on a store, conditional jumps to other instructions, etc.
 deriving Show
-- | state of a program
data ProgState = ProgState {
 _program :: [Instr] -- ^ remaining instructions to run 
 , _outputCnt :: Int -- ^ how many times we've done a "Send"
 }
 deriving Show
-- programs initially haven't sent anything
mkProgState :: [Instr] -> ProgState
mkProgState instrs = ProgState instrs 0
makeLenses ''ProgState
-- | perform one operation, using 'yield' and 'await'
-- to "send" and "receive" values.
-- return a Bool - whether we can continue, or are 
-- blocked on a receive and should abort.
applyOp
 :: MonadState ProgState m => 
 Instr -> ConduitM Int Int m Bool
applyOp instr = case instr of
 Send n -> do yield n
 outputCnt += 1
 return True
 Recv -> do valM <- await
 case valM of
 Nothing -> return False
 Just _val -> -- stub: ..do something with received vals
 return True
-- Given initial state:
-- Execute instructions in sequence until either
-- (a) there are no more left, or
-- (b) we're blocked while receiving
-- and return the new state.
runLoop :: Monad m => ProgState -> ConduitM Int Int m ProgState
runLoop state =
 execStateC state loop
 where
 loop :: MonadState ProgState m => ConduitM Int Int m ()
 loop = do
 prog <- use program
 unless (null prog) $ do -- still more instructions
 let instr = head prog
 canContinue <- applyOp instr
 when canContinue $ do
 program %= tail -- step forward 1 instruction
 loop
-- | put 2 program processes in sequence, one feeding the other.
-- In addition to program states, takes input to program A,
-- and returns output from program B.
pipe
 :: [Int]
 -> (ProgState, ProgState) -> ((ProgState, ProgState), [Int])
pipe input (stateA, stateB) = 
 let
 (=.|=) = fuseBoth -- join 2 conduits in sequence,
 -- and return results from both as a tuple
 -- get the side effect result of both programs A and B,
 -- also what B emits, as a list (using 'consume')
 conduit = 
 yieldMany input .| 
 runLoop stateA =.|=
 runLoop stateB =.|=
 consume
 in runConduitPure conduit
-- simulate the effect of joining our pipeline to its own
-- start, creating a cycle - and keep running until
-- the processes finish or are deadlocked (i.e.,
-- produce no output because neither can continue)
runCycle :: ProgState -> ProgState -> (ProgState, ProgState)
runCycle =
 loop [] 
 where
 loop input stateA stateB = do
 let ((stateA', stateB'), output) = pipe input (stateA, stateB)
 if null output
 then (stateA', stateB')
 else loop output stateA' stateB'
-- Give 2 processes a program to run that is guaranteed
-- to result in them deadlocking, when joined in a cyclic
-- pipeline. 
-- count how many items each outputs before deadlock happens.
test :: (Int, Int)
test = 
 let instrs = [
 Send 1
 , Send 2
 , Recv
 , Recv
 , Recv
 ]
 (stateA, stateB) = runCycle (mkProgState instrs) (mkProgState instrs)
 in (stateA ^. outputCnt, stateB ^. outputCnt)
main :: IO ()
main = do
 let (aCount, bCount) = test
 putStrLn $ "program A emitted " ++ show aCount ++ " items"
 putStrLn $ "program B emitted " ++ show bCount ++ " items"

My questions are:

  • Can you see any opportunities for improvement here, especially simpler ways of modelling the problem?
  • Could pipes or streaming be used instead? I couldn't see an obvious way to do so. Or would some other package be better - should I have tried monad-coroutine? Or machines, perhaps? (I saw from its description that it might be relevant, but haven't investigated further -- ...there's only so many hours in the day.)
  • I assume modelling this with any sort of pre-emptive multitasking library, like Control.Concurrent, would be tricky and pointless. (Since concurrency libraries tend to be about avoiding deadlock, rather than letting it happen and letting you inspect threads' current state.)
asked Jan 11, 2018 at 6:18
\$\endgroup\$
2
  • 1
    \$\begingroup\$ Honestly I see nothing in your code resembling the problem in the link you provided. Keep in mind that we can only see part 1. Could you spell the problem statement directly? \$\endgroup\$ Commented Jan 11, 2018 at 6:54
  • \$\begingroup\$ @vnp, I've edited the description so as to (hopefully) clearly define the problem. The note about a spoiler isn't relevant to the problem, so I've shifted it further down. My code is about the more general problem of modelling processes which can block each other and get into deadlock. I hope that helps. \$\endgroup\$ Commented Jan 11, 2018 at 8:32

1 Answer 1

1
\$\begingroup\$

Here's your simpler way of modelling the problem: Each Send Int eliminates the other program's next Recv until neither sends.

data Instr = Send Int | Recv deriving Eq
execute :: [Instr] -> [Instr] -> (Bool, [Int], [Int]) -- Success, outputs
execute (Send i:x) y = (\(a,b,c) -> (a,i:b,c)) $ execute x (delete Recv y)
execute x (Send i:y) = (\(a,b,c) -> (a,b,i:c)) $ execute (delete Recv x) y
execute [] [] = (True, [], [])
execute _ _ = (False, [], [])
answered Jan 18, 2018 at 20:34
\$\endgroup\$
0

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.