9
\$\begingroup\$

I need a simple dispatcher implementation(invoke methods on one thread from many threads) once you dispatch method curent thread should wait for results, so I'm thinking about something like this:

public class Dispatcher
{
 private readonly BlockingCollection<Tuple<Delegate, object[]>> runQueue = new BlockingCollection<Tuple<Delegate, object[]>>();
 private readonly BlockingCollection<object> resultQueue = new BlockingCollection<object>();
 private readonly CancellationTokenSource source = new CancellationTokenSource();
 private readonly Task task;
 public Dispatcher()
 {
 Task.Run(() =>
 {
 using (source)
 using (runQueue)
 using (resultQueue)
 {
 Debug.WriteLine("Dispatcher started with thread {0}", Thread.CurrentThread.ManagedThreadId);
 while (!source.IsCancellationRequested)
 {
 var run = runQueue.Take(source.Token);
 resultQueue.Add(run.Item1.DynamicInvoke(run.Item2));
 }
 Debug.WriteLine("Dispatcher ended");
 }
 });
 }
 public void Stop()
 {
 source.Cancel();
 }
 [MethodImpl(MethodImplOptions.Synchronized)]
 public object Invoke(Delegate @delegate, params object[] @params)
 {
 runQueue.Add(new Tuple<Delegate, object[]>(@delegate, @params));
 return resultQueue.Take(source.Token);
 }
}

I had test it with such code and it seems too work well ...

class Program
{
 static void Main(string[] args)
 {
 Func<int, int, int> func = (sleep, i) =>
 {
 CC.WriteLine(i, "\tTask {0} will sleep for {1}, thread {2}", i, sleep, Thread.CurrentThread.ManagedThreadId);
 Thread.Sleep(sleep);
 CC.WriteLine(i, "\tTask {0} woke up", i, sleep);
 return i; //return task number
 };
 var rnd = new Random();
 var disp = new Dispatcher();
 {
 var tasks = Enumerable.Range(0, 10).Select(i =>
 Task.Run(() =>
 {
 CC.WriteLine(i, "Task {0} started, thread {1}", i, Thread.CurrentThread.ManagedThreadId);
 CC.WriteLine(i, "Task {0} ended with {1}", i, disp.Invoke(func, rnd.Next(3, 15) * 100, i));
 })).ToArray();
 Task.WaitAll(tasks);
 disp.Stop();
 }
 Console.ReadKey();
 }
 static class CC
 {
 public static readonly List<ConsoleColor> COLORS = Enum.GetValues(typeof(ConsoleColor)).Cast<ConsoleColor>().ToList();
 static CC()
 {
 COLORS.RemoveAt(0);
 COLORS.RemoveAt(0);
 }
 public static void WriteLine(int colorIndex, string format, params object[] args)
 {
 lock (COLORS)
 {
 Console.ForegroundColor = COLORS[colorIndex];
 Console.WriteLine(format, args);
 }
 }
 }
}

Is a BlockingCollection a good choice? Or maybe it can be done with something simpler? (SlimSemaphore)?

Vogel612
25.5k7 gold badges59 silver badges141 bronze badges
asked Jul 14, 2016 at 14:36
\$\endgroup\$
4
  • \$\begingroup\$ Why do you need that Dispatcher class. Why not simply use a Task? \$\endgroup\$ Commented Jul 14, 2016 at 15:00
  • \$\begingroup\$ It because for some reasons(using thrid party native API) i need to invoke some code on one given thread from multiple threads and wait for the resutls ... and AFAIK there is no way for doing this with just a task... \$\endgroup\$ Commented Jul 14, 2016 at 15:02
  • \$\begingroup\$ ... and yes, I'm aware of System.Windows.Threading.Dispatcher but it is really bloated \$\endgroup\$ Commented Jul 14, 2016 at 15:07
  • 1
    \$\begingroup\$ Welcome to Code Review! Please do not update the code in your question to incorporate feedback from answers, doing so goes against the Question + Answer style of Code Review. This is not a forum where you should keep the most updated version in your question. Please see what you may and may not do after receiving answers . \$\endgroup\$ Commented Jul 15, 2016 at 11:15

2 Answers 2

8
\$\begingroup\$
  • Because the task within the Dispatcher is a long running task, you should use the option TaskCreationOptions.LongRunning. Otherwise the operation blocks a thread-pool thread that is usually used for short operations. Alternatively you could also use a Thread.
  • Instead of using the blocking collection for returning the result, you could use a TaskCompletionSource. That would change the return value of the method Invoke to Task<T>. Therfore the calling code could use the async/await-pattern.
  • Consider to throw an exception if the Dispatcher was stopped and Invoke is called. Otherwise the call blocks forever, right?
Heslacher
50.8k5 gold badges83 silver badges177 bronze badges
answered Jul 14, 2016 at 15:23
\$\endgroup\$
1
  • \$\begingroup\$ googling for TaskCreationOptions gives me similar question what do you think about it? \$\endgroup\$ Commented Jul 14, 2016 at 15:36
4
\$\begingroup\$

Thanks @JanDotNet for your help ... googling around your hints gives me what I really need SingleThreadTaskScheduler based on StaTaskScheduler from Swarm

public sealed class SingleThreadTaskScheduler : TaskScheduler, IDisposable
{
 private BlockingCollection<Task> _tasks;
 private readonly Thread _thread;
 public SingleThreadTaskScheduler()
 {
 // Initialize the tasks collection
 _tasks = new BlockingCollection<Task>();
 _thread = new Thread(() =>
 {
 // Continually get the next task and try to execute it.
 // This will continue until the scheduler is disposed and no more tasks remain.
 foreach (var t in _tasks.GetConsumingEnumerable())
 {
 TryExecuteTask(t);
 }
 });
 _thread.IsBackground = true;
 _thread.Name = "SingleThreadTaskScheduler-"+Guid.NewGuid().ToString();
 _thread.Start();
 }
 /// <summary>Queues a Task to be executed by this scheduler.</summary>
 /// <param name="task">The task to be executed.</param>
 protected override void QueueTask(Task task)
 {
 // Push it into the blocking collection of tasks
 _tasks.Add(task);
 }
 /// <summary>Provides a list of the scheduled tasks for the debugger to consume.</summary>
 /// <returns>An enumerable of all tasks currently scheduled.</returns>
 protected override IEnumerable<Task> GetScheduledTasks()
 {
 // Serialize the contents of the blocking collection of tasks for the debugger
 return _tasks.ToArray();
 }
 /// <summary>Determines whether a Task may be inlined.</summary>
 /// <param name="task">The task to be executed.</param>
 /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param>
 /// <returns>true if the task was successfully inlined; otherwise, false.</returns>
 protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
 {
 return Thread.CurrentThread.ManagedThreadId == _thread.ManagedThreadId && TryExecuteTask(task);
 }
 /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
 public override int MaximumConcurrencyLevel
 {
 get { return 1; }
 }
 /// <summary>
 /// Cleans up the scheduler by indicating that no more tasks will be queued.
 /// This method blocks until all threads successfully shutdown.
 /// </summary>
 public void Dispose()
 {
 if (_tasks != null)
 {
 // Indicate that no new tasks will be coming in
 _tasks.CompleteAdding();
 // Wait for all threads to finish processing tasks
 _thread.Join();
 // Cleanup
 _tasks.Dispose();
 _tasks = null;
 }
 }

and usage

class Program
{
 static void Main(string[] args)
 {
 Func<int, int, int> func = (sleep, i) =>
 {
 CC.WriteLine(i, "\tTask {0} will sleep for {1}, thread {2}", i, sleep, Thread.CurrentThread.ManagedThreadId);
 Thread.Sleep(sleep);
 CC.WriteLine(i, "\tTask {0} woke up", i, sleep);
 if (i == 5)
 throw new Exception("Exception test!");
 return i; //return task number
 };
 using (var scheduler = new SingleThreadTaskScheduler())
 {
 var rnd = new Random();
 var tasks = Enumerable.Range(0, 10).Select(i =>
 Task.Run(async () =>
 {
 CC.WriteLine(i, "Task {0} started, thread {1}", i, Thread.CurrentThread.ManagedThreadId);
 object r = null;
 try
 {
 r = await Task.Factory.StartNew(() => func(rnd.Next(3, 15) * 100, i), CancellationToken.None, TaskCreationOptions.None, scheduler).ConfigureAwait(false);
 CC.WriteLine(i, "Task {0} ended with {1}", i, r);
 }
 catch (Exception ex)
 {
 CC.WriteLine(i, "Task {0} exception: {1}", i, ex);
 }
 })).ToArray();
 Task.WaitAll(tasks);
 Console.ReadKey();
 }
 }
 static class CC
 {
 public static readonly List<ConsoleColor> COLORS = Enum.GetValues(typeof(ConsoleColor)).Cast<ConsoleColor>().ToList();
 static CC()
 {
 COLORS.RemoveAt(0);
 COLORS.RemoveAt(0);
 }
 public static void WriteLine(int colorIndex, string format, params object[] args)
 {
 lock (COLORS)
 {
 Console.ForegroundColor = COLORS[colorIndex];
 Console.WriteLine(format, args);
 }
 }
 }
}
answered Jul 14, 2016 at 16:09
\$\endgroup\$
5
  • \$\begingroup\$ That seems to solve your problem also. However, it is a little bit complicated to use... I am not sure if I would prefer that solution or your Dispatcher with a TaskCompletionSource... \$\endgroup\$ Commented Jul 14, 2016 at 16:35
  • \$\begingroup\$ why it is a little bit complicated to use ?r = await Task.Factory.StartNew(() => func(100, 1), CancellationToken.None, TaskCreationOptions.None, scheduler); is pretty simple ... (with creating static TaskFactory usage would looks like var r = await TFactory.StartNew(() => func(100, 1));) ... unfortunatly I can't use it in my code ... \$\endgroup\$ Commented Jul 15, 2016 at 10:44
  • \$\begingroup\$ yes, that is simple... but you have to create the SingleThreadTaskScheduler first.. On the other side, the dispatcher have to be created first in the same way so maybe there is not difference ;) Why couldn't you use it in your code? \$\endgroup\$ Commented Jul 15, 2016 at 10:52
  • \$\begingroup\$ @JanDotNet look at 2nd revision of my question it is all about using "by ref" in lambdas \$\endgroup\$ Commented Jul 15, 2016 at 11:25
  • \$\begingroup\$ Ok.... Using native methods that way looks adventurous. Maybe you could wrap that method calls within your own class!? \$\endgroup\$ Commented Jul 15, 2016 at 11:38

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.