Similar to the code review I posted last week for an agent-based immutable replacement for ConcurrentDictionary
, I have also created an agent-based immutable replacement for ConcurrentQueue
. This uses a MailboxProcessor
and an immutable queue based on Okasaki's implementation in Purely Functional Data Structures with a few extra operations. I am particularly interested in understanding if there's any way I can combine the QueueAgent
and the InternalQueueAgent
into one type (without the mutual-recursion), and if there's any way to do the asynchronous Peek
and Dequeue
operations without the internal ImmutableQueue
s for the PeekListeners
and DequeueListeners
. The idea behind those operations is to support a "yield until a message is available" behavior similar to an asynchronous Peek
or Receive
operation on MSMQ or RabbitMQ. I also welcome any general feedback on the implementation.
My code for the immutable queue is as follows:
open System.Collections.Generic
/// An F# Immutable Queue, based on Okasaki's implementation in Purely-Functional Data Structures
type ImmutableQueue<'message> private (front: 'message list, rear: 'message list) =
let enqueue message =
match front, message::rear with
| [], newRear -> ImmutableQueue(newRear |> List.rev, [])
| _, newRear -> ImmutableQueue(front, newRear)
let enqueueAll messages =
let orderedMessages = messages |> List.rev
match front, orderedMessages@rear with
| [], newRear -> ImmutableQueue(newRear |> List.rev, [])
| _, newRear -> ImmutableQueue(front, newRear)
let dequeue () =
match front with
| message::tail ->
message, (match tail with
| [] -> ImmutableQueue(rear |> List.rev, [])
| _ -> ImmutableQueue(tail, rear))
| _ -> failwith "Cannot dequeue from empty queue!"
let dequeueAll () =
(front @ (rear |> List.rev), ImmutableQueue<'message>([], []) )
let tryDequeue () =
match front with
| message::tail ->
(message, (match tail with
| [] -> ImmutableQueue(rear |> List.rev, [])
| _ -> ImmutableQueue(tail, rear)))
|> Some
| _ -> None
let tryPeek () =
match front with
| message::tail -> Some message
| _ -> None
let reverse () =
match front with
| [] -> ImmutableQueue(rear |> List.rev, [])
| _ -> ImmutableQueue(front, rear)
let getEnumerator () =
(seq {
yield! front
yield! rear |> List.rev
}).GetEnumerator()
static member Empty = ImmutableQueue<'message>([], [])
static member From messages = ImmutableQueue<'message>(messages, [])
member __.IsEmpty = front.IsEmpty && rear.IsEmpty
member __.Length = front.Length + rear.Length
member __.HasMessages = front.IsEmpty |> not
member __.Enqueue message = enqueue message
member __.EnqueueAll messages = enqueueAll messages
member __.Dequeue () = dequeue ()
member __.DequeueAll () = dequeueAll ()
member __.TryDequeue () = tryDequeue()
member __.TryPeek () = tryPeek()
member __.Reverse () = reverse()
member __.GetEnumerator () = getEnumerator()
interface IEnumerable<'message> with
member this.GetEnumerator () = this.GetEnumerator()
interface System.Collections.IEnumerable with
member this.GetEnumerator () = this.GetEnumerator() :> System.Collections.IEnumerator
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Queue =
/// Create an empty queue of the given message type
let empty<'message> = ImmutableQueue<'message>.Empty
/// Enqueue a message in the given queue
let inline enqueue message (queue: ImmutableQueue<'message>) = queue.Enqueue message
/// Enqueue all of the provided messages in the given queue
let inline enqueueAll messages (queue: ImmutableQueue<'message>) = queue.EnqueueAll messages
/// Check if the given queue is empty
let inline isEmpty (queue: ImmutableQueue<'message>) = queue.IsEmpty
/// Compute the length (number of messages) of the given queue
let inline length (queue: ImmutableQueue<'message>) = queue.Length
/// Check if the given queue contains any messages
let inline hasMessages (queue: ImmutableQueue<'message>) = queue.HasMessages
/// Create a queue from an F# list
let inline ofList messages = messages |> ImmutableQueue.From
/// Create a queue fron an F# sequence
let inline ofSeq messages = messages |> Seq.toList |> ofList
/// Dequeue the message at the front of the given queue
let inline dequeue (queue: ImmutableQueue<'message>) = queue.Dequeue()
/// Dequeue all the messages from the given queue
let inline dequeueAll (queue: ImmutableQueue<'message>) = queue.DequeueAll()
/// Try to dequeue the message at the front of the given queue
let inline tryDequeue (queue: ImmutableQueue<'message>) = queue.TryDequeue()
/// Try to peek the message at the front of the given queue
let inline tryPeek (queue: ImmutableQueue<'message>) = queue.TryPeek()
/// Reverse the order of all messages in the given queue
let inline rev (queue: ImmutableQueue<'message>) = queue.Reverse()
And here's my implementation of QueueAgent
:
open System.Collections.Concurrent
open System.Collections.Generic
type private QueueMessage<'a> =
| Enqueue of 'a
| EnqueueAll of 'a list
| TryDequeue of AsyncReplyChannel<'a option>
| TryPeek of AsyncReplyChannel<'a option>
| Dequeue of AsyncReplyChannel<'a>
| DequeueAll of AsyncReplyChannel<'a seq>
| Peek of AsyncReplyChannel<'a>
| Count of AsyncReplyChannel<int>
| GetAll of AsyncReplyChannel<'a seq>
type private InternalQueueMessage<'a> =
| AddDequeueListener of AsyncReplyChannel<'a>
| AddPeekListener of AsyncReplyChannel<'a>
| ItemEnqueued of QueueAgent<'a>
and private Listeners<'a> =
{
PeekListeners: ImmutableQueue<AsyncReplyChannel<'a>>
DequeueListeners: ImmutableQueue<AsyncReplyChannel<'a>>
}
static member Empty() = {PeekListeners = ImmutableQueue<AsyncReplyChannel<'a>>.Empty; DequeueListeners = ImmutableQueue<AsyncReplyChannel<'a>>.Empty}
and QueueAgent<'a> () as this =
let internalQueue = InternalQueueAgent<'a>()
let agent =
MailboxProcessor<QueueMessage<'a>>.Start
<| fun inbox ->
let rec loop state =
async {
let! message = inbox.Receive()
match message with
| Enqueue item ->
let newState = state |> Queue.enqueue item
internalQueue.Post <| ItemEnqueued this
return! loop newState
| EnqueueAll items ->
let newState = state |> Queue.enqueueAll items
items |> List.iter (fun item -> internalQueue.Post <| ItemEnqueued this)
return! loop newState
| TryDequeue channel ->
match state |> Queue.tryDequeue with
| Some (item, newState) ->
channel.Reply <| Some item
return! loop newState
| None ->
channel.Reply None
return! loop state
| TryPeek channel ->
channel.Reply (state |> Queue.tryPeek)
return! loop state
| Dequeue channel ->
match state |> Queue.tryDequeue with
| Some (item, newState) ->
channel.Reply item
return! loop newState
| None ->
internalQueue.Post <| AddDequeueListener channel
return! loop state
| DequeueAll channel ->
let (items, newState) = state |> Queue.dequeueAll
channel.Reply items
return! loop newState
| Peek channel ->
match state |> Queue.tryPeek with
| Some item ->
channel.Reply item
return! loop state
| None ->
internalQueue.Post <| AddPeekListener channel
return! loop state
| Count channel ->
channel.Reply (state |> Queue.length)
return! loop state
| GetAll channel ->
channel.Reply state
return! loop state
}
loop ImmutableQueue<'a>.Empty
let enqueue item =
agent.Post <| Enqueue item
let enqueueAll items =
agent.Post <| EnqueueAll items
let tryDequeue () =
agent.PostAndReply TryDequeue
let asyncTryDequeue () =
agent.PostAndAsyncReply TryDequeue
let tryPeek () =
agent.PostAndReply TryPeek
let asyncTryPeek () =
agent.PostAndAsyncReply TryPeek
let dequeue () =
agent.PostAndReply Dequeue
let asyncDequeue () =
agent.PostAndAsyncReply Dequeue
let dequeueAll () =
agent.PostAndReply DequeueAll
let asyncDequeueAll () =
agent.PostAndAsyncReply DequeueAll
let peek () =
agent.PostAndReply Peek
let asyncPeek () =
agent.PostAndAsyncReply Peek
let count () =
agent.PostAndReply Count
let asyncCount () =
agent.PostAndAsyncReply Count
let getAll () =
agent.PostAndReply GetAll
let asyncGetAll () =
agent.PostAndAsyncReply GetAll
member __.Enqueue item = enqueue item
member __.EnqueueAll items = enqueueAll items
member __.TryDequeue () = tryDequeue ()
member __.AsyncTryDequeue () = asyncTryDequeue ()
member __.TryPeek () = tryPeek ()
member __.AsyncTryPeek () = asyncTryPeek ()
member __.Dequeue () = dequeue ()
member __.AsyncDequeue () = asyncDequeue ()
member __.DequeueAll () = dequeueAll ()
member __.AsyncDequeueAll () = asyncDequeueAll ()
member __.Peek () = peek ()
member __.AsyncPeek () = asyncPeek ()
member __.Count = count()
member __.AsyncCount () = asyncCount ()
member __.GetAll () = getAll ()
member __.AsyncGetAll () = asyncGetAll ()
interface IEnumerable<'a> with
member __.GetEnumerator () = (getAll () :> IEnumerable<'a>).GetEnumerator()
interface System.Collections.IEnumerable with
member __.GetEnumerator () = (getAll () :> System.Collections.IEnumerable).GetEnumerator()
interface IProducerConsumerCollection<'a> with
member __.CopyTo (array: 'a array, index) = getAll () |> Seq.iteri (fun i item -> array.[index + i] <- item)
member __.CopyTo (array: System.Array, index) = getAll () |> Seq.iteri (fun i item -> array.SetValue(item, index + i))
member __.TryAdd item = enqueue item;true
member __.TryTake item =
match tryDequeue () with
| Some element -> item <- element;true
| None -> false
member __.ToArray () = getAll () |> Seq.toArray
member __.Count = count ()
member __.SyncRoot = this |> box
member __.IsSynchronized = true
and private InternalQueueAgent<'a> () =
let agent =
MailboxProcessor<InternalQueueMessage<'a>>.Start
<| fun inbox ->
let rec loop (state: Listeners<'a>) =
async {
let! message = inbox.Receive()
match message with
| AddDequeueListener channel ->
return! loop {state with DequeueListeners = state.DequeueListeners |> Queue.enqueue channel}
| AddPeekListener channel ->
return! loop {state with PeekListeners = state.PeekListeners |> Queue.enqueue channel}
| ItemEnqueued queue ->
let! newState =
async {
match state.PeekListeners |> Queue.tryDequeue with
| None -> return state
| Some (channel, newQueue) ->
let! item = queue.AsyncTryPeek()
match item with
| Some a -> channel.Reply a
| None -> ()
return {state with PeekListeners = newQueue}
}
match newState.DequeueListeners |> Queue.tryDequeue with
| Some (channel, newQueue) ->
let! item = queue.AsyncTryDequeue()
match item with
| Some a ->
channel.Reply a
return! loop {newState with DequeueListeners = newQueue}
| None ->
return! loop state
| None -> return! loop state
}
loop <| Listeners<'a>.Empty()
member __.Post message = agent.Post message
And finally, here are my unit tests for QueueAgent
:
open Microsoft.VisualStudio.TestTools.UnitTesting
[<AutoOpen>]
module Common =
let inline equal expected actual = Assert.AreEqual(expected, actual)
let inline notEqual expected actual = Assert.AreNotEqual(expected, actual)
let inline isTrue value = Assert.IsTrue(value)
let inline isFalse value = Assert.IsFalse(value)
[<TestClass>]
type QueueAgentTests () =
[<TestMethod>]
member __.``Enqueueing should add a message to the queue`` () =
async {
let queue = QueueAgent<string>()
queue.Enqueue "test"
queue.Enqueue "test2"
queue.Enqueue "test3"
let! count = queue.AsyncCount()
count |> equal 3
}
[<TestMethod>]
member __.``Dequeueing should wait for message to be enqueued, then remove the message from the queue`` () =
async {
let queue = QueueAgent<string>()
let dequeueResult = queue.AsyncDequeue()
queue.Enqueue "test"
let! message = dequeueResult
message |> equal "test"
let! count = queue.AsyncCount()
count |> equal 0
} |> Async.RunSynchronously
[<TestMethod>]
member __.``Peeking should return the first message in the queue without removing it`` () =
async {
let queue = QueueAgent<string>()
queue.Enqueue "test"
queue.Enqueue "test2"
let! peekedMessage = queue.AsyncPeek()
peekedMessage |> equal "test"
let! count = queue.AsyncCount()
count |> equal 2
} |> Async.RunSynchronously
[<TestMethod>]
member __.``TryDequeue should return None if there are no messages in the queue`` () =
async {
let queue = QueueAgent<string>()
let! dequeueResult = queue.AsyncTryDequeue()
dequeueResult |> equal None
} |> Async.RunSynchronously
[<TestMethod>]
member __.``TryDequeue should return Some if there are messages in the queue, and remove one message from the queue`` () =
async {
let queue = QueueAgent<string>()
queue.Enqueue "test"
queue.Enqueue "test2"
queue.Enqueue "test3"
let! dequeueResult = queue.AsyncTryDequeue()
dequeueResult |> equal <| Some "test"
let! count = queue.AsyncCount()
count |> equal 2
} |> Async.RunSynchronously
[<TestMethod>]
member __.``TryPeek should return None if there are no messages in the queue`` () =
async {
let queue = QueueAgent<string>()
let! peekResult = queue.AsyncTryPeek()
peekResult |> equal None
} |> Async.RunSynchronously
[<TestMethod>]
member __.``TryPeek should return Some if there are messages in the queue, and not modify the queue`` () =
async {
let queue = QueueAgent<string>()
queue.Enqueue "test"
queue.Enqueue "test2"
queue.Enqueue "test3"
let! peekResult = queue.AsyncTryPeek()
peekResult |> equal <| Some "test"
let! count = queue.AsyncCount()
count |> equal 3
} |> Async.RunSynchronously
[<TestMethod>]
member __.``EnqueueAll should enqueue all elements of a list in the queue`` () =
async {
let queue = QueueAgent<string>()
let list = [1..10] |> List.map (fun i -> sprintf "Test %d" i)
queue.EnqueueAll list
let! count = queue.AsyncCount()
count |> equal 10
} |> Async.RunSynchronously
[<TestMethod>]
member __.``DequeueAll should remove all elements from the queue`` () =
async {
let queue = QueueAgent<string>()
let list = [1..10] |> List.map (fun i -> sprintf "Test %d" i)
queue.EnqueueAll list
let! elements = queue.AsyncDequeueAll()
elements |> Seq.length |> equal 10
list |> List.forall (fun item -> elements |> Seq.contains item) |> isTrue
let! count = queue.AsyncCount()
count |> equal 0
} |> Async.RunSynchronously
[<TestMethod>]
member __.``QueueAgent should be thread-safe`` () =
let queue = QueueAgent<string>()
[1..10]
|> List.map (fun i ->
async {
queue.Enqueue <| sprintf "Test %d" i
})
|> Async.Parallel
|> Async.Ignore
|> Async.RunSynchronously
queue.Count |> equal 10
[1..5]
|> List.map (fun _ -> queue.AsyncDequeue())
|> Async.Parallel
|> Async.Ignore
|> Async.RunSynchronously
queue.Count |> equal 5
1 Answer 1
Not a thorough review, but two things jump out at me as problematic with ImmutableQueue
:
Your
Reverse
method looks wrong: it doesn't do anything iffront
isn't empty, and doesn't reverse if it isn't. I would have expected:let reverse () = ImmutableQueue(rear, front)
There are no tests for this method.
Some of the methods of
ImmutableQueue
seem to assume that the queue is empty iffront
is empty, while others do not. Without looking too carefully, I thinkfront
can never be empty if the whole thing isn't, but this isn't documented, and it's all rather confusing.Enqueue
, for example, will happily handlefront = []
, whiletryPeek
returnsNone
iffront = []
, andtryDequeue
is careful to make sure it doesn't produce anImmutableQueue
wherefront = []
.
Explore related questions
See similar questions with these tags.