I need a simple dispatcher implementation(invoke methods on one thread from many threads) once you dispatch method curent thread should wait for results, so I'm thinking about something like this:
public class Dispatcher
{
private readonly BlockingCollection<Tuple<Delegate, object[]>> runQueue = new BlockingCollection<Tuple<Delegate, object[]>>();
private readonly BlockingCollection<object> resultQueue = new BlockingCollection<object>();
private readonly CancellationTokenSource source = new CancellationTokenSource();
private readonly Task task;
public Dispatcher()
{
Task.Run(() =>
{
using (source)
using (runQueue)
using (resultQueue)
{
Debug.WriteLine("Dispatcher started with thread {0}", Thread.CurrentThread.ManagedThreadId);
while (!source.IsCancellationRequested)
{
var run = runQueue.Take(source.Token);
resultQueue.Add(run.Item1.DynamicInvoke(run.Item2));
}
Debug.WriteLine("Dispatcher ended");
}
});
}
public void Stop()
{
source.Cancel();
}
[MethodImpl(MethodImplOptions.Synchronized)]
public object Invoke(Delegate @delegate, params object[] @params)
{
runQueue.Add(new Tuple<Delegate, object[]>(@delegate, @params));
return resultQueue.Take(source.Token);
}
}
I had test it with such code and it seems too work well ...
class Program
{
static void Main(string[] args)
{
Func<int, int, int> func = (sleep, i) =>
{
CC.WriteLine(i, "\tTask {0} will sleep for {1}, thread {2}", i, sleep, Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(sleep);
CC.WriteLine(i, "\tTask {0} woke up", i, sleep);
return i; //return task number
};
var rnd = new Random();
var disp = new Dispatcher();
{
var tasks = Enumerable.Range(0, 10).Select(i =>
Task.Run(() =>
{
CC.WriteLine(i, "Task {0} started, thread {1}", i, Thread.CurrentThread.ManagedThreadId);
CC.WriteLine(i, "Task {0} ended with {1}", i, disp.Invoke(func, rnd.Next(3, 15) * 100, i));
})).ToArray();
Task.WaitAll(tasks);
disp.Stop();
}
Console.ReadKey();
}
static class CC
{
public static readonly List<ConsoleColor> COLORS = Enum.GetValues(typeof(ConsoleColor)).Cast<ConsoleColor>().ToList();
static CC()
{
COLORS.RemoveAt(0);
COLORS.RemoveAt(0);
}
public static void WriteLine(int colorIndex, string format, params object[] args)
{
lock (COLORS)
{
Console.ForegroundColor = COLORS[colorIndex];
Console.WriteLine(format, args);
}
}
}
}
Is a BlockingCollection a good choice? Or maybe it can be done with something simpler? (SlimSemaphore)?
2 Answers 2
- Because the task within the Dispatcher is a long running task, you should use the option
TaskCreationOptions.LongRunning
. Otherwise the operation blocks a thread-pool thread that is usually used for short operations. Alternatively you could also use aThread
. - Instead of using the blocking collection for returning the result, you could use a
TaskCompletionSource
. That would change the return value of the methodInvoke
toTask<T>
. Therfore the calling code could use the async/await-pattern. - Consider to throw an exception if the
Dispatcher
was stopped andInvoke
is called. Otherwise the call blocks forever, right?
-
\$\begingroup\$ googling for TaskCreationOptions gives me similar question what do you think about it? \$\endgroup\$Selvin– Selvin2016年07月14日 15:36:17 +00:00Commented Jul 14, 2016 at 15:36
Thanks @JanDotNet for your help ... googling around your hints gives me what I really need SingleThreadTaskScheduler based on StaTaskScheduler from Swarm
public sealed class SingleThreadTaskScheduler : TaskScheduler, IDisposable
{
private BlockingCollection<Task> _tasks;
private readonly Thread _thread;
public SingleThreadTaskScheduler()
{
// Initialize the tasks collection
_tasks = new BlockingCollection<Task>();
_thread = new Thread(() =>
{
// Continually get the next task and try to execute it.
// This will continue until the scheduler is disposed and no more tasks remain.
foreach (var t in _tasks.GetConsumingEnumerable())
{
TryExecuteTask(t);
}
});
_thread.IsBackground = true;
_thread.Name = "SingleThreadTaskScheduler-"+Guid.NewGuid().ToString();
_thread.Start();
}
/// <summary>Queues a Task to be executed by this scheduler.</summary>
/// <param name="task">The task to be executed.</param>
protected override void QueueTask(Task task)
{
// Push it into the blocking collection of tasks
_tasks.Add(task);
}
/// <summary>Provides a list of the scheduled tasks for the debugger to consume.</summary>
/// <returns>An enumerable of all tasks currently scheduled.</returns>
protected override IEnumerable<Task> GetScheduledTasks()
{
// Serialize the contents of the blocking collection of tasks for the debugger
return _tasks.ToArray();
}
/// <summary>Determines whether a Task may be inlined.</summary>
/// <param name="task">The task to be executed.</param>
/// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param>
/// <returns>true if the task was successfully inlined; otherwise, false.</returns>
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return Thread.CurrentThread.ManagedThreadId == _thread.ManagedThreadId && TryExecuteTask(task);
}
/// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
public override int MaximumConcurrencyLevel
{
get { return 1; }
}
/// <summary>
/// Cleans up the scheduler by indicating that no more tasks will be queued.
/// This method blocks until all threads successfully shutdown.
/// </summary>
public void Dispose()
{
if (_tasks != null)
{
// Indicate that no new tasks will be coming in
_tasks.CompleteAdding();
// Wait for all threads to finish processing tasks
_thread.Join();
// Cleanup
_tasks.Dispose();
_tasks = null;
}
}
and usage
class Program
{
static void Main(string[] args)
{
Func<int, int, int> func = (sleep, i) =>
{
CC.WriteLine(i, "\tTask {0} will sleep for {1}, thread {2}", i, sleep, Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(sleep);
CC.WriteLine(i, "\tTask {0} woke up", i, sleep);
if (i == 5)
throw new Exception("Exception test!");
return i; //return task number
};
using (var scheduler = new SingleThreadTaskScheduler())
{
var rnd = new Random();
var tasks = Enumerable.Range(0, 10).Select(i =>
Task.Run(async () =>
{
CC.WriteLine(i, "Task {0} started, thread {1}", i, Thread.CurrentThread.ManagedThreadId);
object r = null;
try
{
r = await Task.Factory.StartNew(() => func(rnd.Next(3, 15) * 100, i), CancellationToken.None, TaskCreationOptions.None, scheduler).ConfigureAwait(false);
CC.WriteLine(i, "Task {0} ended with {1}", i, r);
}
catch (Exception ex)
{
CC.WriteLine(i, "Task {0} exception: {1}", i, ex);
}
})).ToArray();
Task.WaitAll(tasks);
Console.ReadKey();
}
}
static class CC
{
public static readonly List<ConsoleColor> COLORS = Enum.GetValues(typeof(ConsoleColor)).Cast<ConsoleColor>().ToList();
static CC()
{
COLORS.RemoveAt(0);
COLORS.RemoveAt(0);
}
public static void WriteLine(int colorIndex, string format, params object[] args)
{
lock (COLORS)
{
Console.ForegroundColor = COLORS[colorIndex];
Console.WriteLine(format, args);
}
}
}
}
-
\$\begingroup\$ That seems to solve your problem also. However, it is a little bit complicated to use... I am not sure if I would prefer that solution or your Dispatcher with a TaskCompletionSource... \$\endgroup\$JanDotNet– JanDotNet2016年07月14日 16:35:29 +00:00Commented Jul 14, 2016 at 16:35
-
\$\begingroup\$ why it is a little bit complicated to use ?
r = await Task.Factory.StartNew(() => func(100, 1), CancellationToken.None, TaskCreationOptions.None, scheduler);
is pretty simple ... (with creating static TaskFactory usage would looks likevar r = await TFactory.StartNew(() => func(100, 1));
) ... unfortunatly I can't use it in my code ... \$\endgroup\$Selvin– Selvin2016年07月15日 10:44:58 +00:00Commented Jul 15, 2016 at 10:44 -
\$\begingroup\$ yes, that is simple... but you have to create the
SingleThreadTaskScheduler
first.. On the other side, the dispatcher have to be created first in the same way so maybe there is not difference ;) Why couldn't you use it in your code? \$\endgroup\$JanDotNet– JanDotNet2016年07月15日 10:52:31 +00:00Commented Jul 15, 2016 at 10:52 -
\$\begingroup\$ @JanDotNet look at 2nd revision of my question it is all about using "by ref" in lambdas \$\endgroup\$Selvin– Selvin2016年07月15日 11:25:55 +00:00Commented Jul 15, 2016 at 11:25
-
\$\begingroup\$ Ok.... Using native methods that way looks adventurous. Maybe you could wrap that method calls within your own class!? \$\endgroup\$JanDotNet– JanDotNet2016年07月15日 11:38:34 +00:00Commented Jul 15, 2016 at 11:38
System.Windows.Threading.Dispatcher
but it is really bloated \$\endgroup\$