I've created a library providing a ThreadBlock
that has the following features:
- Aggregates results of all actions executed
- Provides for non-threaded warm up to prepare data for processing
- Provides for per-thread continuation actions
- Provides for per-block continuation actions
The code is located on GitHub.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace GPS.SimpleThreading.Blocks
{
/// <summary>
/// Parallel thread block class that provides for
/// thread warmup, execution, and continuation.
/// </summary>
/// <remarks>
/// ## Features
/// * Allows capture of results of thread executions
/// * Allows warmup action per data item before spawning thread
/// * Allows continuation action per data item after executing thread
/// * Allows continuation of the entire set
/// </remarks>
public sealed class ThreadBlock<TData, TResult>
{
private readonly ConcurrentDictionary<TData, (TData data, TResult result)?> _results =
new ConcurrentDictionary<TData, (TData data, TResult result)?>();
private readonly ConcurrentBag<TData> _baseList =
new ConcurrentBag<TData>();
private bool _locked;
private readonly Func<TData, TResult> _action;
private readonly Action<ICollection<(TData data, TResult result)?>> _continuation;
/// <summary>
/// Constructor accepting the action and block continuation.
/// </summary>
public ThreadBlock(
Func<TData, TResult> action,
Action<ICollection<(TData data, TResult result)?>> continuation = null)
{
_action = action;
_continuation = continuation;
}
/// <summary>
/// Add single data item.
/// </summary>
public void Add(TData item)
{
if (!_locked) _baseList.Add(item);
}
/// <summary>
/// Adds range of data items from an IEnumerable
/// </summary>
public void AddRange(IEnumerable<TData> collection)
{
Parallel.ForEach(collection, Add);
}
/// <summary>
/// Adds range of data items from an ICollection.
/// </summary>
public void AddRange(ICollection<TData> collection)
{
Parallel.ForEach(collection, Add);
}
/// <summary>
/// Adds range of data items from an IProducerConsumerCollection.
/// </summary>
public void AddRange(IProducerConsumerCollection<TData> collection)
{
Parallel.ForEach(collection, Add);
}
/// <summary>
/// Maximum number of concurrent threads (default = 1).
/// </summary>
public int MaxDegreeOfParallelism { get; set; } = 1;
/// <summary>
/// Removes a data item from the block.
/// </summary>
public bool Remove(TData item)
{
TData itemToRemove;
if (!_locked)
return _baseList.TryTake(out itemToRemove);
return false;
}
/// <summary>
/// Locks the data of the block, allowing processing.
/// </summary>
public void LockList()
{
_locked = true;
}
/// <summary>
/// Executes the action over the set of data.
/// </summary>
public void Execute(
int maxDegreeOfParallelization = -1,
Action<TData> warmupItem = null,
Action<Task, (TData data, TResult result)?> threadContinuation = null)
{
if (!_locked) throw new NotLockedException();
if (maxDegreeOfParallelization == -1)
{
maxDegreeOfParallelization = MaxDegreeOfParallelism;
}
if (maxDegreeOfParallelization < 1)
{
throw new ArgumentOutOfRangeException(
"Must supply positive value for either " +
$"{nameof(maxDegreeOfParallelization)} or " +
$"this.{nameof(MaxDegreeOfParallelism)}.");
}
var padLock = new object();
var queue = new Queue<TData>(_baseList);
var allTasks = new Dictionary<TData, Task>();
int depth = 0;
while (queue.Count > 0)
{
var item = queue.Dequeue();
if (warmupItem != null) warmupItem(item);
var task = new Task<TResult>(() => _action(item));
task.ContinueWith((resultTask, data) =>
{
var returnValue = ((TData, TResult)?)(data, resultTask.Result);
if (threadContinuation != null)
{
threadContinuation(resultTask, returnValue);
}
_results.AddOrUpdate(item, returnValue,
(itemData, resultTaskResult) => resultTaskResult);
lock (padLock)
{
depth--;
}
}, item);
int d = 0;
lock (padLock)
{
d = depth;
}
while (d >= maxDegreeOfParallelization)
{
System.Threading.Thread.Sleep(1);
lock (padLock)
{
d = depth;
}
}
task.Start(TaskScheduler.Current);
lock (padLock)
{
depth++;
}
}
var dd = 0;
lock (padLock)
{
dd = depth;
}
while (dd > 0)
{
Thread.Sleep(1);
lock (padLock)
{
dd = depth;
}
}
_continuation?.Invoke(_results.Values);
}
/// <summary>
/// Point-in-time results providing a stable result set
/// for processing results as the block runs.
/// </summary>
public ConcurrentDictionary<TData, (TData data, TResult result)?> Results
{
get
{
var results = new ConcurrentDictionary<TData, (TData data, TResult result)?>();
foreach (var key in _results.Keys)
{
var result = _results[key];
var value = key;
results.AddOrUpdate(value, result, (resultKey, resultValue) => resultValue);
}
return results;
}
}
}
}
I would be very grateful for feedback on the features, design and implementation.
A simple usage is
[Fact]
public void ContrivedTest()
{
string Processor(int data)
{
System.Threading.Thread.Sleep(data);
return $"Waiting {data} miliseconds";
}
void Warmup(int data)
{
_log.WriteLine($"Contrived Warmup for {data}");
}
void ThreadBlockContinuation(Task task, (int data, string result)? result)
{
_log.WriteLine($"Contrived Thread Continuation result: {result.Value.data}, {result.Value.result}");
}
void PLINQContinuation((int data, string result)? result)
{
_log.WriteLine($"Contrived Thread Continuation result: {result.Value.data}, {result.Value.result}");
}
void BlockContinuation(ICollection<(int data, string result)?> results)
{
_log.WriteLine($"Results count: {results.Count}");
}
var dataSet = new int[500];
var rand = new System.Random();
for(int i = 0; i < dataSet.Length; ++i)
{
dataSet[i] = rand.Next(250, 2500);
}
var block = new ThreadBlock<int, string>(
Processor,
BlockContinuation);
block.AddRange(dataSet);
block.LockList();
var parallelism = 8;
var sw = new System.Diagnostics.Stopwatch();
sw.Start();
block.Execute(parallelism, Warmup, ThreadBlockContinuation);
sw.Stop();
var blockElapsed = sw.Elapsed;
sw = new System.Diagnostics.Stopwatch();
sw.Start();
var resultSet = dataSet
.Select(data => { Warmup(data); return data; })
.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.WithDegreeOfParallelism(parallelism)
.Select(data =>
{
return new Nullable<(int data, string result)>
((data: data, result: Processor(data)));
})
.AsSequential()
.Select(result => {
PLINQContinuation(result);
return result;
}).ToList();
BlockContinuation(resultSet.ToArray());
sw.Stop();
var plinqElapsed = sw.Elapsed;
_log.WriteLine(
$"block: {blockElapsed.TotalSeconds}, " +
$"PLINQ: {plinqElapsed.TotalSeconds}");
Assert.Equal(dataSet.Length, block.Results.Count);
Assert.Equal(dataSet.Length, resultSet.Count);
// This is here to force the test to fail
// allowing dotnet test to output the log.
Assert.Equal(blockElapsed, plinqElapsed);
}
Edit 2
Added a PLINQ equivalent to the test. Execution times are practically identitical. To me, the PLINQ version is a mess.
So it really comes down to what you like better.
Here's an example result from my system using the exact test above:
... lots of data ....
Contrived Thread Continuation result: 1143, Waiting 1143 miliseconds
Contrived Thread Continuation result: 1593, Waiting 1593 miliseconds
Contrived Thread Continuation result: 2206, Waiting 2206 miliseconds
Results count: 500
block: 84.4324359, PLINQ: 85.2551954
The data is meant to simulate a large set of expensive operations, which is the natural use-case for parallelism. All parameters and data are identical between the ThreadBlock and PLINQ tests.
-
\$\begingroup\$ When you do embed the code (and fix the example), perhaps you could also explain the intended use-case for this class? It seems relatively inefficient compared to an equivalent PLINQ approach. \$\endgroup\$Pieter Witvoet– Pieter Witvoet2019年02月06日 10:48:21 +00:00Commented Feb 6, 2019 at 10:48
-
\$\begingroup\$ I would love to know the PLINQ alternative. Can you explain? \$\endgroup\$The Sharp Ninja– The Sharp Ninja2019年02月06日 19:53:07 +00:00Commented Feb 6, 2019 at 19:53
-
\$\begingroup\$ OK, read up on the PLINQ, there's definitely some overlap. I will do some testing and see which methodology is faster. \$\endgroup\$The Sharp Ninja– The Sharp Ninja2019年02月06日 20:53:50 +00:00Commented Feb 6, 2019 at 20:53
-
\$\begingroup\$ The PLINQ "equivalent" of a Thread Continuation has a few drawbacks: * It has to be run in a second pass through the data * There is no Task result to get state about the task. This is a biggie. \$\endgroup\$The Sharp Ninja– The Sharp Ninja2019年02月07日 00:07:06 +00:00Commented Feb 7, 2019 at 0:07
-
\$\begingroup\$ I'd consider those tasks to be an implementation detail - why would you need to expose them? \$\endgroup\$Pieter Witvoet– Pieter Witvoet2019年02月07日 12:29:21 +00:00Commented Feb 7, 2019 at 12:29
1 Answer 1
Design
I think there are some problems with this design:
ThreadBlock
has too many responsibilities: it's used to build a read-only collection, to perform parallelized work on it and to store the results of that work. That's a fairly rigid workflow. PLINQ, in comparison, is much more composable: you can callAsParallel().Select(DoWork)
on anything that's enumerable, and the results are returned directly to the caller, which can then store, share, discard or further process them depending on the use-case (without the risk that something else will overwrite these results).- The design is conflicting: because
action
andcontinuation
are passed to the constructor,Execute
should always perform the same work, so it doesn't make sense to call it multiple times (you'd just be redoing work that was already done*). On the other hand,Execute
accepts per-item warmup and continuation callbacks, which allow custom work (or rather, side-effects, because they don't return anything) perExecute
call. How is this intended to be used? Why so many different callbacks? - All that sleeping and locking in
Execute
looks quite inefficient. This may not always be noticeable - it depends on the amount of items and how much work needs to be done per item - but I'm fairly sure PLINQ is better optimized in this regard.
I think the PLINQ equivalent is mostly just messy because the overal workflow is messy, with several separate callbacks being used for side-effects (I'd use loops for that, not Select
calls). Why not move that extra work into Processor
? That would make both the PLINQ approach and the design of your ThreadBlock
class simpler.
Note that ThreadBlock
is running the warmup and thread continuations in parallel, while your PLINQ example runs them sequentially.
*Unless that work is not deterministic, but I'm not sure that that's such a good idea. You'd run into trouble with Results
, as you can't tell whether a result is new or old while an Execute
call is still in progress.
Other problems
Results
does not take duplicateTData
values into account - it silently discards the results of all but one of them.Execute
gets stuck onnull
data items.ConcurrentDictionary
throws when you're trying to usenull
as a key, which then causesdepth
to not be decreased.Add
andAddRange
silently discard their input onceLockList
has been called. I would at least expect an exception to be thrown.
Other notes
- Some quick testing suggests that adding items to a bag in parallel is slower than doing so sequentially.
- Both
ICollection<T>
andIProducerConsumerCollection<T>
implementIEnumerable<T>
, so thoseAddRange
overloads aren't needed. - I think
TInput
is a little more self-descriptive thanTData
. - Why does the thread continuation take a nullable tuple? It's never invoked with null.
- Why is
MaxDegreeOfParallelism
publicly settable? Mutable shared state is best avoided in concurrency. This makes it possible for another thread to 'intercept' the degree of parallelism. - Why use
task.ContinueWith
if you can call bothaction
andthreadContinuation
inside the original task? - Why expose this task to
threadContinuation
? Whether yourThreadBlock
uses threads, tasks or some other approach internally shouldn't matter to outside code. This prevents you from switching to a different implementation without affecting existing callers. - You may want to verify that
action
isn't null in the constructor, and throw anArgumentNullException
if it is. - It's recommended to make custom exception types serializable.
-
\$\begingroup\$ Thank you! I see several places to make the implementation better from your comments. The fact that the
threadContinuation
runs on the main thread, not the task, is very important as it's meant to be the equivalent behavior of callingContinueWith
directly on aTask
. Exposing theTask
to thethreadContinutation
is again part of implementing theContinueWith
pattern. It makes going from implicitly declaringTasks
to aThreadBlock
an apples-to-apples implementation. \$\endgroup\$The Sharp Ninja– The Sharp Ninja2019年02月07日 18:37:17 +00:00Commented Feb 7, 2019 at 18:37 -
\$\begingroup\$ I'm not sure what you mean?
threadContinuation
is not executed on the thread that's callingExecute
.task.ContinueWith
creates a second task that will be scheduled aftertask
is finished. It might well be executed on the same thread as the preceding task, or on a different thread - that's up to the task scheduler. \$\endgroup\$Pieter Witvoet– Pieter Witvoet2019年02月08日 08:26:27 +00:00Commented Feb 8, 2019 at 8:26