I would appreciate review of this implementation of the concurrent list that is, to my knowledge, currently missing in .NET Core.
Requirements
- Thread safe
- Lock free
- Ordered
- Supports get and set by index
- Implements generic IList
- Acceptable performance and time complexity compared to regular
IList
The main idea
- To keep order I use
ConcurrentQueue
- To get and set values at index I use
ConcurrentDictionary
- To make sure that both data sets are in sync I update them on Add, Remove
- To avoid excessive syncing, enumerator combines iteration over Queue + picks Dictionary if available
Questions
- Is there anything wrong with the code below, e.g. do I need to make a copy of enumerator before accessing it to ensure thread safety?
- Any concerns in regard to shallow copies and immutability?
- Any code samples or other implementations that perform better?
items.GetEnumerator() // Should it be items.ToList().GetEnumerator()
Excerpt from implementation
public class ConcurrentList<T> : IList<T>
{
protected ConcurrentQueue<T> _items = new();
protected ConcurrentDictionary<int, T> _indices = new();
public T this[int index]
{
get => _indices.TryGetValue(index, out T value) ? value : default;
set => UpdateInRange(index, 0, _items.Count, () => _indices[index] = value);
}
public int Count => _items.Count;
public bool IsReadOnly => false;
public void Add(T input) => _items.Enqueue(input);
public bool Contains(T input) => _items.Contains(input);
public void CopyTo(T[] items, int index) => _items.CopyTo(items, index);
public IEnumerator<T> GetEnumerator() => _items.GetEnumerator();
public int IndexOf(T input) => _items.ToList().IndexOf(input);
public void Clear()
{
_items.Clear();
_indices.Clear();
}
public void Insert(int index, T input)
{
if (index == _items.Count)
{
_items.Enqueue(input);
_indices[_items.Count - 1] = input;
return;
}
UpdateInRange(index, 0, _items.Count + 1, () =>
{
_indices.Clear();
var i = 0;
var items = new ConcurrentQueue<T>();
while (_items.TryDequeue(out T item))
{
if (Equals(i, index))
{
items.Enqueue(input);
_indices[items.Count - 1] = input;
}
items.Enqueue(item);
_indices[items.Count - 1] = item;
i++;
}
_items = items;
});
}
public bool Remove(T input)
{
_indices.Clear();
var response = false;
var items = new ConcurrentQueue<T>();
while (_items.TryDequeue(out T item))
{
if (Equals(input, item) is false)
{
response = true;
items.Enqueue(item);
_indices[items.Count - 1] = item;
}
}
_items = items;
return response;
}
public void RemoveAt(int index)
{
UpdateInRange(index, 0, _items.Count, () =>
{
_indices.Clear();
var i = 0;
var items = new ConcurrentQueue<T>();
while (_items.TryDequeue(out T item))
{
if (Equals(i, index) is false)
{
items.Enqueue(item);
_indices[items.Count - 1] = item;
}
i++;
}
_items = items;
});
}
protected IEnumerator<T> Enumerate()
{
using (var enumerator = _items.GetEnumerator())
{
var i = 0;
var items = new ConcurrentQueue<T>();
while (enumerator.MoveNext())
{
var item = _indices.TryGetValue(i++, out T value) ? value : enumerator.Current;
items.Enqueue(item);
yield return item;
}
_items = items;
}
}
protected void UpdateInRange(int index, int min, int max, Action action)
{
if (index < min || index >= max)
{
throw new ArgumentOutOfRangeException("Incorrect index");
}
action();
}
IEnumerator<T> IEnumerable<T>.GetEnumerator() => Enumerate();
IEnumerator IEnumerable.GetEnumerator() => Enumerate();
}
Complete code and tests
1 Answer 1
There appear to be several threading bugs in this implementation. Just the first line with any logic in it, what if two threads call Insert
with index == _items.Count
at the same time and both evaluate that predicate to true? You could get this interleaving of calls:
Thread 1: _items.Enqueue(obj1);
Thread 2: _items.Enqueue(obj2);
Thread 2: _indices[_items.Count - 1] = obj2;
Thread 1: _indices[_items.Count - 1] = obj1;
Your queue contains both items but your dictionary has just one of them. If you have two data structures that need to be kept in sync in any way, it's very difficult (if not impossible) to do that without a lock.
The other branch of Insert
has a different race condition. If two threads try to Insert to different locations at the same time, _indices
will be modified to contain both objects, but one thread's execution of _items = items;
can overwrite the other thread's change. Last writer wins.
The main point I want to make though, is that there is a reason that the .Net library doesn't contain a ConcurrentList implementation. Index-based collections can't be used properly without locking. If I have a List that is being modified by other threads, it's very difficult to use an index. IndexOf
can tell me where a particular object is located, but there is no guarantee that it will still be there when I look again. That is, there is no guarantee that this code does what the caller intends:
list.RemoveAt(list.IndexOf(obj));
This means that IList doesn't really have anything to offer over other containers.
I've looked at your Github repository. You only have one test in the Concurrency file/area, and it currently doesn't pass for me. But I think the test is checking for behavior outside the control of the container:
Parallel.For(0, 1000, i => y.Add(y.Count));
The test asserts that the container should end with the numbers 0..999 in order, but it's clear that the test itself has a race condition. Count
can be accessed in parallel by any number of threads at the same time, which can cause a particular value to be inserted any number of times.
I've written a more fundamental test that fails:
[TestMethod]
public void ItDoesntLoseElements()
{
var y = new V2<int>();
Parallel.For(0, 1000, i => y.Add(i));
Assert.AreEqual(1000, y.Count);
}
Explore related questions
See similar questions with these tags.
Clear()
method, executes_items.Clear();
, at this time another thread callsInsert
, executes_indices[_items.Count - 1] = input;
. Then the first thread calls_indices.Clear();
. \$\endgroup\$_items.ToList().IndexOf(input)
is sucks. Poor performance and excessive memory consumption. \$\endgroup\$ImmutableList
there. \$\endgroup\$Add
isn't correct (it never sets the indices). At the risk of sounding brutal: no part of this code is thread safe. In general, if you're managing more than 1 piece of state in one operation in a multi-threaded scenario you need to lock. There are cases where that might not be true but being absolutely sure is very hard. \$\endgroup\$