7
\$\begingroup\$

I am in the process of removing the last few pieces of mutable state from an F#-based distributed system. Some of the remaining mutable state is a ConcurrentDictionary used in some Akka.net actors to store messages, look them later, update their statuses, etc. We had some problems with locking on the ConcurrentDictionary instances, so we believe it worth replacing them with an immutable implementaiton. In order to replace the concurrent dictionaries, I have created a wrapper around the F# Immutable Map<'key, 'value> type using a MailboxProcessor to process dictionary operations one-at-a-time. My implementation is as follows, let me know if you see any problems with using this to replace ConcurrentDictionary.

open System.Collections.Generic
type private MapMessage<'key,'value when 'key: comparison> =
| Add of 'key*'value*AsyncReplyChannel<Map<'key, 'value>>
| Contains of 'key*AsyncReplyChannel<bool>
| Count of AsyncReplyChannel<int>
| Get of 'key*AsyncReplyChannel<'value option>
| GetAll of AsyncReplyChannel<Map<'key, 'value>>
| Remove of 'key*AsyncReplyChannel<Map<'key, 'value>>
| Update of 'key*'value*AsyncReplyChannel<Map<'key, 'value>>
| Keys of AsyncReplyChannel<'key seq>
| Values of AsyncReplyChannel<'value seq>
type MapAgent<'key,'value when 'key: comparison> () =
 let agent = MailboxProcessor<MapMessage<'key,'value>>.Start
 <| fun inbox ->
 let rec loop (state: Map<'key,'value>) =
 async {
 let! message = inbox.Receive()
 match message with
 | Add (key, value, channel) -> 
 let newState = state |> Map.add key value
 channel.Reply(newState)
 return! loop newState
 | Contains (key, channel) ->
 channel.Reply(state |> Map.containsKey key)
 return! loop state
 | Count channel ->
 channel.Reply(state |> Map.count)
 return! loop state
 | Get (key, channel) -> 
 channel.Reply(state |> Map.tryFind key)
 return! loop state
 | GetAll channel -> 
 channel.Reply(state)
 return! loop state
 | Remove (key, channel) ->
 let newState = state |> Map.remove key
 channel.Reply(newState)
 return! loop newState
 | Update (key, value, channel) ->
 let newState = 
 state 
 |> Map.remove key
 |> Map.add key value
 channel.Reply(newState)
 return! loop newState 
 | Keys channel ->
 channel.Reply(state |> Map.toSeq |> Seq.map fst)
 return! loop state
 | Values channel ->
 channel.Reply(state |> Map.toSeq |> Seq.map snd)
 return! loop state
 }
 loop Map.empty
 let postAndReply = agent.PostAndReply
 let postAndAsyncReply = agent.PostAndAsyncReply
 let postKeyAndReply key messageBuilder = 
 agent.PostAndReply <| fun reply -> messageBuilder (key, reply)
 let postKeyAndAsyncReply key messageBuilder =
 agent.PostAndAsyncReply <| fun reply -> messageBuilder (key, reply)
 let postKeyValueAndReply key value messageBuilder =
 agent.PostAndReply <| fun reply -> messageBuilder (key, value, reply)
 let postKeyValueAndAsyncReply key value messageBuilder =
 agent.PostAndAsyncReply <| fun reply -> messageBuilder (key, value, reply)
 let add key value =
 Add |> postKeyValueAndReply key value
 let asyncAdd key value =
 Add |> postKeyValueAndAsyncReply key value
 let contains key =
 Contains |> postKeyAndReply key
 let asyncContains key =
 Contains |> postKeyAndAsyncReply key
 let count () =
 Count |> postAndReply
 let asyncCount () =
 Count |> postAndAsyncReply
 let get key =
 Get |> postKeyAndReply key
 let asyncGet key =
 Get |> postKeyAndAsyncReply key
 let getAll () =
 GetAll |> postAndReply
 let asyncGetAll () =
 GetAll |> postAndAsyncReply
 let remove key =
 Remove |> postKeyAndReply key
 let asyncRemove key =
 Remove |> postKeyAndAsyncReply key
 let update key value =
 Update |> postKeyValueAndReply key value
 let asyncUpdate key value =
 Update |> postKeyValueAndAsyncReply key value
 let keys () = 
 Keys |> postAndReply
 let asyncKeys () =
 Keys |> postAndAsyncReply
 let values () =
 Values |> postAndReply
 let asyncValues () =
 Values |> postAndAsyncReply
 let getEnumerable () =
 seq {
 for (key, value) in getAll() |> Map.toSeq -> KeyValuePair<'key, 'value>(key, value)
 }
 let getEnumerator () =
 getEnumerable().GetEnumerator()
 let compare (kvps: KeyValuePair<'key, 'value> seq) =
 let comparer = LanguagePrimitives.FastGenericComparer<'key>
 Seq.compareWith 
 (fun (kvp1 : KeyValuePair<'key, 'value>) (kvp2 : KeyValuePair<'key, 'value>) -> 
 let c = comparer.Compare(kvp1.Key, kvp2.Key) in 
 if c <> 0 then c else Unchecked.compare kvp1.Value kvp2.Value)
 (getEnumerable()) 
 let containsKeyValue key value =
 contains key && 
 (match get key with
 | Some value -> value |> Unchecked.equals value
 | None -> false)
 member __.Add key value = add key value
 member __.AsyncAdd key value = add key value
 member __.ContainsKey key = contains key
 member __.AsyncContainsKey key = asyncContains key
 member __.Get key = get key
 member __.AsyncGet key = asyncGet key
 member __.GetAll () = getAll ()
 member __.AsyncGetAll () = asyncGetAll ()
 member __.Remove key = remove key
 member __.AsyncRemove key = asyncRemove key
 member __.Update key value = update key value
 member __.AsyncUpdate key value = asyncUpdate key value
 member __.Values () = values ()
 member __.AsyncValues () = asyncValues ()
 member __.AsEnumerable () = getEnumerable ()
 override __.Equals that = 
 match that with
 | :? seq<KeyValuePair<'key, 'value>> as map -> (compare (getAll()) map) = 0
 | _ -> false
 override __.GetHashCode () =
 getAll() |> box |> fun o -> o.GetHashCode()
 interface IEnumerable<KeyValuePair<'key, 'value>> with
 member __.GetEnumerator () = getEnumerator ()
 interface System.Collections.IEnumerable with
 member __.GetEnumerator () = getEnumerator () :> System.Collections.IEnumerator
 interface IDictionary<'key, 'value> with
 member __.Add (key, value) = add key value |> ignore
 member __.Add (kvp) = add kvp.Key kvp.Value |> ignore
 member __.Clear () = keys() |> Seq.iter (remove >> ignore)
 member __.Contains kvp = containsKeyValue kvp.Key kvp.Value 
 member __.ContainsKey (key) = contains key 
 member __.CopyTo (array, index) = 
 getAll() 
 |> Map.toSeq 
 |> Seq.iteri (fun i (key, value) -> array.[index + i] <- KeyValuePair(key, value))
 member __.Remove (key: 'key) = 
 if contains key
 then (remove key |> ignore; true)
 else false
 member __.Remove (kvp: KeyValuePair<'key, 'value>) =
 if containsKeyValue kvp.Key kvp.Value
 then (remove kvp.Key |> ignore; true)
 else false
 member __.TryGetValue (key, result) = 
 if contains key 
 then match get key with
 | Some value -> result <- value;true
 | None -> false
 else false
 member __.Count
 with get () = count ()
 member __.IsReadOnly
 with get () = false
 member __.Item
 with get key = 
 match get key with
 | Some value -> value
 | None -> raise <| System.IndexOutOfRangeException(sprintf "The key %A was not found in the dictionary" key)
 and set key value = update key value |> ignore
 member __.Keys 
 with get () = [| for key in keys () -> key |] :> ICollection<'key>
 member __.Values
 with get () = [| for value in values () -> value |] :> ICollection<'value>
 interface System.IComparable with 
 member __.CompareTo (obj: obj) = 
 match obj with 
 | :? MapAgent<'key, 'value> as mapAgent ->
 compare (getEnumerable()) (mapAgent.AsEnumerable())
 | :? Map<'key, 'value> as map ->
 compare (getEnumerable()) (map |> Map.toSeq |> Seq.map (fun (key, value) -> KeyValuePair(key, value)))
 | _ -> 
 invalidArg "obj" "not comparable"
asked May 4, 2018 at 18:01
\$\endgroup\$

0

Know someone who can answer? Share a link to this question via email, Twitter, or Facebook.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.