I'd like to get input on a F# actor that coordinates receives around a blocking message buffer. The actor is a piece of code that continuously tries to fetch messages from Azure Service Bus.
(*
Copyright 2012 Henrik Feldt
Licensed under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of the
License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
*)
namespace MassTransit.Async
open Microsoft.ServiceBus
open Microsoft.ServiceBus.Messaging
open FSharp.Control
open System
open System.Threading
open System.Runtime.CompilerServices
open System.Collections.Concurrent
open MassTransit.AzureServiceBus
open MassTransit.Async.Queue
open MassTransit.Async.AsyncRetry
type internal Agent<'T> = AutoCancelAgent<'T>
/// communication with the worker agent
type RecvMsg =
Start
| Pause
| Halt of AsyncReplyChannel<unit>
| SubscribeQueue of QueueDescription * Concurrency
| UnsubscribeQueue of QueueDescription
| SubscribeTopic of TopicDescription * Concurrency
| UnsubscribeTopic of TopicDescription
/// concurrently outstanding asynchronous requests (workers)
and Concurrency = uint32
/// State-keeping structure, mapping a description to a pair of cancellation token source and
/// receiver set list. The CancellationTokenSource can be used to stop the subscription that
/// it corresponds to.
type WorkerState =
{ QSubs : Map<QueueDescription, CancellationTokenSource * ReceiverSet list>;
TSubs : Map<TopicDescription,
// unsubscribe action
(unit -> Async<unit>) * CancellationTokenSource * ReceiverSet list> }
/// A pair of a messaging factory and a list of message receivers that
/// were created from that messaging factory.
and ReceiverSet = Pair of MessagingFactory * MessageReceiver list
type ReceiverDefaults() =
interface ReceiverSettings with
member x.Concurrency = 1u
member x.BufferSize = 1000u
member x.NThAsync = 5u
member x.ReceiveTimeout = TimeSpan.FromMilliseconds 50.0
/// Create a new receiver, with a queue description,
/// a factory for messaging factories and some control flow data
type Receiver(desc : QueueDescription,
newMf : (unit -> MessagingFactory),
nm : NamespaceManager,
receiverName : string,
?settings : ReceiverSettings) =
let sett = defaultArg settings (ReceiverDefaults() :> ReceiverSettings)
let logger = MassTransit.Logging.Logger.Get(typeof<Receiver>)
/// The 'scratch' buffer that tunnels messages from the ASB receivers
/// to the consumers of the Receiver class.
let messages = new BlockingCollection<_>(int <| sett.BufferSize)
/// Creates a new child token from the parent cancellation token source
let childTokenFrom ( cts : CancellationTokenSource ) =
let childCTS = new CancellationTokenSource()
let reg = cts.Token.Register(fun () -> childCTS.Dispose()) // what to do with the IDisposable...?
childCTS
let getToken ( cts : CancellationTokenSource ) = cts.Token
/// Starts stop/nthAsync new clients and messaging factories, so for stop=500, nthAsync=100
/// it loops 500 times and starts 5 new clients
let initReceiverSet newMf stop newReceiver pathDesc =
let rec inner curr pairs =
async {
match curr with
| _ when stop = curr ->
// we're stopping
return pairs
| _ when curr % sett.NThAsync = 0u ->
// we're at the first item, create a new pair
logger.DebugFormat("creating new mf & recv '{0}'", (pathDesc : PathBasedEntity).Path)
let mf = newMf ()
let! r = pathDesc |> newReceiver mf
let p = Pair(mf, r :: [])
return! inner (curr + 1u) (p :: pairs)
| _ ->
// if we're not at an even location, just create a new receiver for
// the same messaging factory
match pairs with // of mf<-> receiver list
| [] -> return failwith "curr != 1, but pairs empty. curr > 1 -> pairs.Length > 0"
| (Pair(mf, rs) :: rest) ->
logger.Debug(sprintf "creating new recv '%s'" (desc.ToString()))
let! r = desc |> newReceiver mf // the new receiver
let p = Pair(mf, r :: rs) // add the receiver to the list of receivers for this mf
return! inner (curr + 1u) (p :: rest) }
inner 0u []
let initReceiverSet1 : ((MessagingFactory -> PathBasedEntity -> Async<MessageReceiver>) -> PathBasedEntity -> _) =
initReceiverSet newMf (sett.Concurrency)
/// creates an async workflow worker, given a message receiver client
let worker client =
//logger.Debug "worker called"
async {
while true do
//logger.Debug "worker loop"
let! bmsg = sett.ReceiveTimeout |> recv client
if bmsg <> null then
logger.Debug(sprintf "received message on '%s'" (desc.ToString()))
messages.Add bmsg
else
//logger.Debug("got null msg due to timeout receiving")
() }
let startPairsAsync pairs token =
async {
for Pair(mf, rs) in pairs do
for r in rs do
Async.Start(r |> worker, token) }
/// cleans out the message buffer and disposes all messages therein
let clearLocks () =
async {
while messages.Count > 0 do
let m = ref null
if messages.TryTake(m, TimeSpan.FromMilliseconds(4.0)) then
try do! Async.FromBeginEnd((!m).BeginAbandon, (!m).EndAbandon)
(!m).Dispose()
with | x -> let entry = sprintf "could not abandon message#%s" <| (!m).MessageId
logger.Error(entry, x) }
/// Closes the pair of a messaging factory and a list of receivers
let closePair pair =
let (Pair(mf, rs)) = pair
logger.InfoFormat("closing {0} receivers and their single messaging factory", rs.Length)
if not(mf.IsClosed) then
try mf.Close() // this statement has cost a LOT of money in developer time, waiting for a timeout
with | x -> logger.Error("could not close messaging factory", x)
for r in rs do
if not(r.IsClosed) then
try r.Close()
with | x -> logger.Error("could not close receiver", x)
/// An agent that implements the reactor pattern, reacting to messages.
/// The mutually recursive function 'initial' uses the explicit functional
/// state pattern as described here:
/// http://www.cl.cam.ac.uk/~tp322/papers/async.pdf
///
/// A receiver can have these states:
/// --------------------------------
/// * Initial (waiting for a Start or Halt(chan) message).
/// * Starting (getting all messaging factories and receivers up and running)
/// * Started (main message loop)
/// * Paused (cancelling all async workflows)
/// * Halted (closing things)
/// * Final (implicit; reference is GCed)
///
/// with transitions:
/// -----------------
/// Initial -> Starting
/// Initial -> Halted
///
/// Starting -> Started
///
/// Started -> Paused
/// Started -> Halted
///
/// Paused -> Starting
/// Paused -> Halted
///
/// Halted -> Final (GC-ed here)
///
let a = Agent<RecvMsg>.Start(fun inbox ->
let rec initial () =
async {
logger.Debug "initial"
let! msg = inbox.Receive ()
match msg with
| Start ->
// create WorkerState for initial subscription (that of the queue)
// and move to the started state
let! rSet = initReceiverSet1 newReceiver desc
let mappedRSet = Map.empty |> Map.add desc (new CancellationTokenSource(), rSet)
return! starting { QSubs = mappedRSet ; TSubs = Map.empty }
| Halt(chan) -> return! halted { QSubs = Map.empty; TSubs = Map.empty } chan
| _ ->
// because we only care about the Start message in the initial state,
// we will ignore all other messages.
return! initial () }
and starting state =
async {
logger.Debug "starting"
do! desc |> create nm
use ct = new CancellationTokenSource ()
// start all subscriptions
state.QSubs
|> Seq.map (fun x -> x.Value)
|> Seq.append (state.TSubs |> Seq.map(fun x -> let (sd,cts,rs) = x.Value in cts,rs))
|> Seq.collect (fun ctsAndRs -> snd ctsAndRs)
|> Seq.collect (fun (Pair(_, rs)) -> rs)
|> Seq.iter (fun r -> Async.Start(r |> worker, ct.Token))
return! started state ct }
and started state cts =
async {
logger.DebugFormat("started '{0}'", desc.Path)
let! msg = inbox.Receive()
match msg with
| Pause ->
cts.Cancel()
return! paused state
| Halt(chan) ->
logger.DebugFormat("halt '{0}'", desc.Path)
cts.Cancel()
logger.DebugFormat("moving to halted state '{0}'", desc.Path)
return! halted state chan
| SubscribeQueue(qd, cc) ->
logger.DebugFormat("SubscribeQueue '{0}'", qd.Path)
do! qd |> create nm
// create new receiver sets for the queue description and kick them off as workflows
let childAsyncCts = childTokenFrom cts // get a new child token to control the computation with
let! recvSet = initReceiverSet1 newReceiver qd // initialize the receivers and potentially new messaging factories
do! childAsyncCts |> getToken |> startPairsAsync recvSet // start the actual async workflow
let qsubs' = state.QSubs.Add(qd, (childAsyncCts, recvSet)) // update the subscriptions mapping, from description to cts*ReceiverSet list.
return! started { QSubs = qsubs' ; TSubs = state.TSubs } cts
| UnsubscribeQueue qd ->
logger.Warn "SKIP:UnsubscribeQueue (TODO - do we even need this?)"
return! started state cts
| SubscribeTopic(td, cc) ->
logger.DebugFormat("SubscribeTopic '{0}'", td.Path)
let childAsyncCts = childTokenFrom cts
let! sub = td |> Topic.subscribe nm receiverName
let! pairs = initReceiverSet1 (Topic.newReceiver sub) td
do! childAsyncCts |> getToken |> startPairsAsync pairs
let tsubs' = state.TSubs.Add(td, ((fun () -> sub |> Topic.unsubscribe nm td), childAsyncCts, pairs))
return! started { QSubs = state.QSubs ; TSubs = tsubs' } cts
| UnsubscribeTopic td ->
logger.DebugFormat("UnsubscribeTopic '{0}'", td.Path)
match state.TSubs.TryFind td with
| None ->
logger.WarnFormat("Called UnsubscribeTopic('{0}') on non-subscribed topic!", td.Path)
return! started state cts
| Some( unsubscribe, childCts, recvSet ) ->
childCts.Cancel()
recvSet |> List.iter (fun set -> closePair set)
let tsubs' = state.TSubs.Remove(td)
do! unsubscribe ()
return! started { QSubs = state.QSubs ; TSubs = tsubs' } cts
| Start -> return! started state cts }
and paused state =
async {
logger.DebugFormat("paused '{0}'", desc.Path)
let! msg = inbox.Receive()
match msg with
| Start -> return! starting state
| Halt(chan) -> return! halted state chan
| _ as x -> logger.Warn(sprintf "got %A, despite being paused" x) }
and halted state chan =
async {
logger.DebugFormat("halted '{0}'", desc.Path)
let subs =
asyncSeq {
for x in (state.QSubs |> Seq.collect (fun x -> snd x.Value))
do yield x
for x in (state.TSubs |> Seq.collect (fun x -> let (s,cts,rs) = x.Value in rs)) do
yield x }
for rs in subs do
closePair rs
for unsub in state.TSubs |> Seq.map (fun x -> let (s, _, _) = x.Value in s) do
do! unsub ()
do! clearLocks ()
// then exit
chan.Reply() }
initial ())
/// Starts the receiver which starts the consuming from the service bus
/// and creates the queue if it doesn't exist
member x.Start () =
logger.InfoFormat("start called for queue '{0}'", desc)
a.Post Start
/// Stops the receiver; allowing it to start once again.
/// (Stopping is done by disposing the receiver.)
member x.Pause () =
logger.InfoFormat("stop called for queue '{0}'", desc)
a.Post Pause
member x.Subscribe ( td : TopicDescription ) =
logger.InfoFormat("subscribe called for topic description '{0}'", td)
a.Post <| SubscribeTopic( td, sett.Concurrency )
member x.Unsubscribe ( td : TopicDescription ) =
logger.InfoFormat("unsubscribe called for topic description '{0}'", td)
a.Post <| UnsubscribeTopic( td )
/// Returns a message if one was added to the buffer within the timeout specified,
/// or otherwise returns null.
member x.Get(timeout : TimeSpan) =
let mutable item = null
let _ = messages.TryTake(&item, timeout.Milliseconds)
item
member x.Consume() = asyncSeq { while true do yield messages.Take() }
interface System.IDisposable with
/// Cleans out all receivers and messaging factories.
member x.Dispose () =
logger.DebugFormat("dispose called for receiver on '{0}'", desc.Path)
a.PostAndReply(fun chan -> Halt(chan))
type ReceiverModule =
/// <code>address</code> is required. <code>settings</code> is optional.
static member StartReceiver(address : AzureServiceBusEndpointAddress,
settings : ReceiverSettings) =
match settings with
| null -> let r = new Receiver(address.QueueDescription, (fun () -> address.MessagingFactoryFactory.Invoke()),
address.NamespaceManager, NameHelper.GenerateRandomName())
r.Start ()
r
| _ -> let r = new Receiver(address.QueueDescription, (fun () -> address.MessagingFactoryFactory.Invoke()),
address.NamespaceManager, NameHelper.GenerateRandomName(),
settings)
r.Start ()
r
Focus on this code, but I would very much like code reviewers to review the larger body of code as well.
Questions; have I made beginner's mistakes when writing this code? It's one of my first production-level actors.
Can things be optimized in ways that allure me? E.g. am I leaking performance somewhere or memory somewhere else?
-
\$\begingroup\$ So let's say something to try to get this question up into the active set. \$\endgroup\$Henrik– Henrik2012年03月05日 13:18:22 +00:00Commented Mar 5, 2012 at 13:18
-
1\$\begingroup\$ long code + F# + concurrency + azure = difficult to review. \$\endgroup\$Quentin Pradet– Quentin Pradet2012年03月13日 13:37:13 +00:00Commented Mar 13, 2012 at 13:37
-
1\$\begingroup\$ also = difficult to get right \$\endgroup\$Henrik– Henrik2012年03月14日 06:28:13 +00:00Commented Mar 14, 2012 at 6:28
1 Answer 1
Disclaimer: I know some OCaml and a bit of Erlang, but never implemented an F# actor before. I also read the async paper you mentioned in the comments (apart from "Semantics" and "Implementation").
Style
Indentation
type RecvMsg =
Start
| Pause
You should indent Start like this:
type RecvMsg =
| Start
| Pause
The syntax allows you to omit the initial pipe, but it's because of one-liners like type RecvMsg = Start | Pause | ...
Declaring types
/// concurrently outstanding asynchronous requests (workers)
and Concurrency = uint32
You should use and
only for mutually recursive types. Concurrency could as well be defined just above RecvMsg
.
Returning unit else ()
is implicit and not needed.
Pattern matching
let (Pair(mf, rs)) = pair
Why don't you match pair with Pair(mf, rs) -> ...
?
Comments Your comments are good! I especially liked the comment of the main agent, which helps to understand the state machine.
Code
Tail Recursion Section 2.2 of the async paper mentions a way to define count
that is elegant, concise and efficient. You should consider replacing the ugly while True
loops with nice tail recursive functions. :)
States
I don't like all those "in-between" states, they make your actor look super complicated and don't seem natural. I have little experience with actors, so I can't tell if this is recommended or not.
-
\$\begingroup\$ "Long code": interesting that you say so. I don't want to have long code; this is as compact as I've been able to make it. Do you think I should be moving some of the utility functions out of the Receiver type? An argument against this is that it lessens cohesion and understandability. \$\endgroup\$Henrik– Henrik2012年03月14日 09:42:31 +00:00Commented Mar 14, 2012 at 9:42
-
\$\begingroup\$ "match pair with Pair": well said; changed to your suggestion! \$\endgroup\$Henrik– Henrik2012年03月14日 09:42:57 +00:00Commented Mar 14, 2012 at 9:42
-
\$\begingroup\$ "Recursive types": I changed it into separate types; but in this case I used the
and
notation to show that it goes together with the type just above; what do you think about that? \$\endgroup\$Henrik– Henrik2012年03月14日 09:43:57 +00:00Commented Mar 14, 2012 at 9:43 -
\$\begingroup\$ "In-between states": I've been thinking a lot about that. I'd really like to construct some type of formal proof on the actor to have my back clear, and then small states are usually easier. That said,
Initial
is because I want to initialize the ReceiverSet before the actor needs to start consuming messages.starting
might be possible to collapse intoSubscribeQueue
message handler - but then I would have to pass a flag that it's the top-most queue and hence the 'major' cancellation token?paused
,halted
are in my eyes pretty nice. \$\endgroup\$Henrik– Henrik2012年03月14日 09:47:11 +00:00Commented Mar 14, 2012 at 9:47 -
\$\begingroup\$ About "recursive types", I think it's enough to just put
Concurrency
beforeRecvMsg
. \$\endgroup\$Quentin Pradet– Quentin Pradet2012年03月14日 09:59:22 +00:00Commented Mar 14, 2012 at 9:59