This code is intended to process commands in a CQRS+Messaging system. It will parallelize the message processing to the degree you specify by the msgToIdFn
parameter. The function's intention is to extract the target ID from the command message. This will be used to correlate all messages with that ID to the same agent. Agents are automatically created for each ID as needed and destroyed when their inbox empties.
Right now runFn
is expected to execute any message. Likely there will be some match ... with
based routing in there to exercise the appropriate module with the appropriate parameters and services.
The comments are verbose because I am learning and also explaining how it works to my team. I can trim them down if too distracting.
I'm looking for possible improvements to code style, missed edge cases, and efficiency. For example, I am returning Some agents
in order to continue the main dispatcher loop or None
to stop it -- that doesn't seem quite right to me. See other Known Issues at the bottom.
module MessageDispatcher
// the functionality exposed by starting a dispatcher
type MessageDispatcher<'message, 'result> = {
Dispatch:'message -> Async<'result>;
Shutdown: unit -> unit
}
// the things that the agents can be instructed to do
type private AgentMessage<'message> =
| Run of 'message
| Stop
// the things that the dispatcher can be instructed to do
type private DispatcherMessage<'key, 'message when 'key : comparison> =
| Dispatch of 'message
| CompleteRun of 'key
| Shutdown
// it made sense to make MessageCount mutable
// otherwise, code would have to create a new record, remove old from agentsMap, add new to agentMap... every time
type private AgentCounter<'message, 'result> = {
mutable MessageCount: int;
Agent: MailboxProcessor<AgentMessage<'message> * ('result -> unit)>
}
/// Start the dispatcher
/// runFn is how you plug in execution code
/// signature: 'message -> 'result
/// msgToIdFn extracts the id from the message, so it can be linked to a specific agent
/// signature: 'message -> 'key
/// sysFailFn wraps an unhandled exception in the caller's desired result type
/// signature: exn -> 'result
let Start<'key, 'message, 'result when 'key : comparison> (runFn:'message -> 'result) (msgToIdFn:'message -> 'key) (sysFailFn: exn -> 'result) =
// function to create a new agent
let createAgent runCompleted =
// create a start a new mailbox processor
MailboxProcessor.Start
<| fun inbox -> // inbox is the same as the agent itself
let rec loop () = async { // define the async message processing loop
let! (message, reply) = inbox.Receive() // wait on next message
match message with
| Stop -> return () // we are done, exit the loop
| Run msg ->
try // in case there are any unhandled exceptions
let result = runFn msg // run message
reply(result) // no crash returns normal result
with
| ex -> reply(sysFailFn ex) // system failure of some kind
runCompleted() // notify completion of this message
return! loop () // continue to run
}
loop () // start the message processing loop
// this is the dispatcher through which all messages flow
let dispatchAgent =
// create and start a new mailbox
MailboxProcessor.Start
<| fun inbox -> // inbox is the same as dispatchAgent
// this function stops and removes an agent from the map
// returns a copy of agentMap with the agent removed
let decommission staleId (staleAgent:MailboxProcessor<_>) agentMap =
// stop agent
staleAgent.Post(Stop, ignore)
// remove from map
Map.remove staleId agentMap
// this function queues a message to the agent
// returns a copy of the (possibly) modified agentMap
let queueMessage msg reply agentMap =
// run the msgToKey function to get the id out of the message
let id = msgToIdFn msg
// return the agent, and the agent map in case it was changed
let (agent, agents) =
// see if the agent already exists
match Map.tryFind id agentMap with
| Some agentCounter -> // existing agent
agentCounter.MessageCount <- agentCounter.MessageCount + 1 // increase count
(agentCounter.Agent, agentMap) // return the agent and original map
| None -> // new agent
// create agent
let runCompletedFn () = inbox.Post(CompleteRun id, ignore) // create func to run when messages complete
let agent = createAgent runCompletedFn // create the agent
let agentCounter = { MessageCount = 1; Agent = agent } // create the agentCounter record
// return agent // return agentmap with agent added
(agentCounter.Agent, Map.add id agentCounter agentMap)
agent.Post(Run msg, reply) // send the message to the agent
Some agents // Some is roughly equivalent to not null with this value
// this function updates counters when an agent completes a message
// it will also decommission an agent that has no messages
// returns agentMap, it will be changed if an agent was decommissioned
let completeMessage id agentMap =
let agentCounter = Map.find id agentMap // get the agentCounter
agentCounter.MessageCount <- agentCounter.MessageCount - 1 // update the counter, <- is mutable style
let agents =
match agentCounter.MessageCount with
| 0 -> decommission id agentCounter.Agent agentMap // immediately decommission
| _ -> agentMap // return the same agent map -- nothing changed
Some agents
// this function will stop all current agents and then exit
// return None to tell the dispatcher to stop running
let shutdown agentMap =
agentMap
|> Map.iter (fun id agentCounter -> agentCounter.Agent.Post(Stop, ignore))
None
// this is the core loop to dispatch messages
let rec loop agentMap = async { // run async
// let! and return! here is kinda like C# await
let! (message, reply) = inbox.Receive() // receive the next message and reply channel
let optionalAgents =
match message with // run the appropriate function
| Dispatch msg -> queueMessage msg reply agentMap
| CompleteRun id -> completeMessage id agentMap
| Shutdown -> shutdown agentMap
match optionalAgents with
| Some agents -> return! loop agents // agents provided means continue to run
| None -> return () // no agents provided means exit the loop
}
loop Map.empty<'key, AgentCounter<'message, 'result>> // start the loop with an empty map
// this sends a dispatch message to the dispatcher agent
let dispatchFn (msg:'message) =
let message = Dispatch msg // create a dispatch message from the given message
// send the message and reply channel
dispatchAgent.PostAndAsyncReply(fun replyChannel -> (message, replyChannel.Reply))
// the reply channel allows callers to get a response back
// this sends the shutdown message to the dispatcher agent
let shutdownFn () = dispatchAgent.Post(Shutdown, ignore)
// this exposes the public functions that this dispatcher supports
let dispatcher = {Dispatch = dispatchFn; Shutdown = shutdownFn}
// return the dispatcher
dispatcher
Known Issues
- Immutable collection performance
- Already tested vs mutable with various usage patterns.
- Worst pattern was around 90k msgs/s vs mutable's 140k
- Best pattern was around 280k msgs/s vs mutable's 290k
- Considering .NET Web API will be doing good to get around 10k HTTP requests/sec, and our likely system throughput will be far less, I'm not worried about it being a bottleneck. If it becomes one, I can always switch to a mutable collection later.
- Caching agents for reuse instead of immediately decommissioning
- I tested this. Surprisingly, it didn't make much difference and created several more edge cases.
- In my tests it's only faster when using mutable collections, and then only marginally so (vs mutable non-cached) except for limited (and possibly unlikely) usage patterns.
Addendum
I have a more updated version of this. I experimented with returning an interface as well, but I didn't care for it. So I converted it into a more f-sharp-y representation using module functions for all operations. It is using mutable collections for maximum performance, however.
1 Answer 1
// the functionality exposed by starting a dispatcher
type MessageDispatcher<'message, 'result> = {
Dispatch:'message -> Async<'result>;
Shutdown: unit -> unit
}
...
// this sends a dispatch message to the dispatcher agent
let dispatchFn (msg:'message) =
let message = Dispatch msg // create a dispatch message from the given message
// send the message and reply channel
dispatchAgent.PostAndAsyncReply(fun replyChannel -> (message, replyChannel.Reply))
// the reply channel allows callers to get a response back
// this sends the shutdown message to the dispatcher agent
let shutdownFn () = dispatchAgent.Post(Shutdown, ignore)
// this exposes the public functions that this dispatcher supports
let dispatcher = {Dispatch = dispatchFn; Shutdown = shutdownFn}
// return the dispatcher
dispatcher
This looks like a functional way of emulating OOP. But F# does support OOP directly, so you should probably use that:
type IMessageDispatcher<'message, 'result> =
abstract Dispatch:'message -> Async<'result>;
abstract Shutdown: unit -> unit
...
// return the dispatcher
{ new IMessageDispatcher<_, _> with
member this.Dispatch(msg) =
let message = Dispatch msg // create a dispatch message from the given message
// send the message and reply channel
dispatchAgent.PostAndAsyncReply(fun replyChannel -> (message, replyChannel.Reply))
// the reply channel allows callers to get a response back
member this.Shutdown() =
dispatchAgent.Post(Shutdown, ignore)
}
/// Start the dispatcher
/// runFn is how you plug in execution code
/// signature: 'message -> 'result
/// msgToIdFn extracts the id from the message, so it can be linked to a specific agent
/// signature: 'message -> 'key
/// sysFailFn wraps an unhandled exception in the caller's desired result type
/// signature: exn -> 'result
Why are you duplicating the signature here? I don't see any reason for that.
Also, consider using XML documentation comments for this kind of documentation. (Though I'm not sure the tooling is good enough to make that worth it.)
msgToIdFn:'message -> 'key
Try not to use abbreviations in names, it makes them harder to read.
// function to create a new agent
let createAgent runCompleted =
// create a start a new mailbox processor
MailboxProcessor.Start
You're right, your comments can be too verbose. Use comments when something is unclear or needs explanation.
runCompleted() // notify completion of this message
return! loop () // continue to run
Why are you calling unit
-taking functions sometimes without a space (so it looks like C-like function call) and sometimes without (so it looks like functional function call with unit as an argument)? I think you should choose one or the other and stick with it.
// the things that the agents can be instructed to do
type private AgentMessage<'message> =
| Run of 'message
| Stop
Since Stop
doesn't need a reply, consider incorporating the reply function directly into AgentMessage
:
type private AgentMessage<'message, 'result> =
| Run of 'message * ('result -> unit)
| Stop
The same approach could be applied to DispatcherMessage
too.
// run the msgToKey function to get the id out of the message
let id = msgToIdFn msg
This shows one reason why too many comments can hurt: when you change code, you need to make sure you also modify the corresponding comment. This specific comment is wrong and useless (since it just repeats what the code says).
match agentCounter.MessageCount with
| 0 -> decommission id agentCounter.Agent agentMap // immediately decommission
| _ -> agentMap // return the same agent map -- nothing changed
You don't need to use match
when if
would work too:
if agentCounter.MessageCount = 0
then decommission id agentCounter.Agent agentMap // immediately decommission
else agentMap // return the same agent map -- nothing changed
-
\$\begingroup\$ Thanks for the input. One point of curiosity, you recommend using an interface to return multiple functions? versus a record or tuple? \$\endgroup\$Kasey Speakman– Kasey Speakman2015年07月06日 13:06:05 +00:00Commented Jul 6, 2015 at 13:06
-
\$\begingroup\$ Yeah, I don't see any advantage in using a record of functions, when you can use an interface. \$\endgroup\$svick– svick2015年07月06日 13:26:53 +00:00Commented Jul 6, 2015 at 13:26