I made a class that I think satisfies the conditions:
Action
s can be added to the queue at any time by any number of clients.Action
's executing in the order in which were added.Only one
Action
executing at time.
public sealed class SequentialTaskScheduler
{
private static volatile SequentialTaskScheduler instance = null;
private static readonly object padlock = new object();
private readonly ConcurrentQueue<Action> queue = new ConcurrentQueue<Action>();
SequentialTaskScheduler()
{
var task = Task.Factory.StartNew(ThreadProc);
}
public static SequentialTaskScheduler Instance
{
get
{
if (instance == null)
{
lock (padlock)
{
if (instance == null)
{
instance = new SequentialTaskScheduler();
}
}
}
return instance;
}
}
public void AddToQueueForExecution(Action action)
{
queue.Enqueue(action);
}
private void ThreadProc()
{
while (true)
{
Action item;
bool isSuccessfull = false;
isSuccessfull = queue.TryDequeue(out item);
if (isSuccessfull)
{
item();
}
System.Threading.Thread.Sleep(100);
}
}
}
I will be grateful for the advice about dangers of such an implementation. And also will be grateful for the advice how to improve it.
Update:
I get feedback on this task from company.
uncertain use of volatile
use threadsafe singleton or singleton
Task.Factory.StartNew take pool from thread but does not return it.
use busy loop with Thread.Sleep is not elegant solution.
How to improve code with those remark's? I think that first two reason are non constructive, but the third and four are constructive. i will be grateful for the advices.
-
\$\begingroup\$ Required reading: csharpindepth.com/Articles/General/Singleton.aspx \$\endgroup\$Jesse C. Slicer– Jesse C. Slicer2018年04月01日 04:25:05 +00:00Commented Apr 1, 2018 at 4:25
-
\$\begingroup\$ You are aware of BlockingCollection? msdn.microsoft.com/en-us/library/dd267312(v=vs.110).aspx \$\endgroup\$paparazzo– paparazzo2018年04月04日 16:28:21 +00:00Commented Apr 4, 2018 at 16:28
-
\$\begingroup\$ @paparazzo, it was test task. With 3 condtions. Implementation could be whatever. \$\endgroup\$Ted– Ted2018年04月06日 18:04:36 +00:00Commented Apr 6, 2018 at 18:04
2 Answers 2
I have few suggestions, and modified the code.
Use Lazy singleton pattern instead of Double check singleton, avoid usage of volatile
private static readonly Lazy<SequentialTaskExecutor> _lazy = new Lazy<SequentialTaskExecutor>(() => new SequentialTaskExecutor());
Tasks should be cancelled and disposed properly. Since the task we want to run for long time, create a new task using Task.Factory.StartNew with Cancellation tokens and TaskCreationOptions.LongRunning
Task.Factory.StartNew(this.ExecuteTask, _token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
Class should implement IDisposable interface, so created threads can be cancelled properly and items in the queue are cleared
public class SequentialTaskExecutor : IDisposable protected virtual void Dispose(bool disposing) { try { if (!disposedValue) { if (disposing) { // clear the items in queue. Action item; while (_queue.Count != 0) { if (_queue.TryDequeue(out item)) { } } _tokenSource.Cancel(); _executionTask.Wait(); _tokenSource.Dispose(); _executionTask.Dispose(); } disposedValue = true; } } catch { } finally { _tokenSource = null; _executionTask = null; } }
Use Smart waiting instead of Busy waiting in tasks.
private void ExecuteTask() { bool isTaskCompleted = true; while (true) { Action item; if (_token.IsCancellationRequested) { while (_queue.Count != 0) { if (_queue.TryDequeue(out item)) { } } break; } if (isTaskCompleted) { isTaskCompleted = false; var innerTask = Task.Factory.StartNew(() => { if(_token.IsCancellationRequested) { return; } if(_queue.TryDequeue(out item)) { item(); } }, _token).ContinueWith((ants) => { if(ants.IsCanceled || ants.IsFaulted) { isTaskCompleted = true; } else if(ants.IsCompleted) { isTaskCompleted = true; } _currentInterval = _minimumInterval; System.Diagnostics.Debug.WriteLine(Task.CurrentId + " " + DateTime.Now.ToString() + " Task execution complted _currentInterval : " + _currentInterval); }); } else { // Raise the interval till maximum value if (_currentInterval < _maximumInterval) { _currentInterval++; } System.Diagnostics.Debug.WriteLine(Task.CurrentId + " " + DateTime.Now.ToString() + " Task waiting complted _currentInterval : " + _currentInterval); Thread.Sleep(TimeSpan.FromSeconds(_currentInterval)); } } }
Complete source code is as follows
public class SequentialTaskExecutor : IDisposable { private static readonly Lazy<SequentialTaskExecutor> _lazy = new Lazy<SequentialTaskExecutor>(() => new SequentialTaskExecutor()); private readonly ConcurrentQueue<Action> _queue = null; private CancellationTokenSource _tokenSource = null; private CancellationToken _token; private int _maximumInterval = 10; private int _minimumInterval = 1; private int _currentInterval = 0; private bool disposedValue = false; Task _executionTask = null; private SequentialTaskExecutor() { _queue = new ConcurrentQueue<Action>(); _tokenSource = new CancellationTokenSource(); _token = _tokenSource.Token; _executionTask = Task.Factory.StartNew(this.ExecuteTask, _token, TaskCreationOptions.LongRunning, TaskScheduler.Default); } public static SequentialTaskExecutor Instance { get { return _lazy.Value; } } public void AddToQueueForExecution(Action action) { _queue.Enqueue(action); System.Diagnostics.Debug.WriteLine("Task added " + _queue.Count); } private void ExecuteTask() { bool isTaskCompleted = true; while (true) { Action item; if (_token.IsCancellationRequested) { while (_queue.Count != 0) { if (_queue.TryDequeue(out item)) { } } break; } if (isTaskCompleted) { isTaskCompleted = false; var innerTask = Task.Factory.StartNew(() => { if(_token.IsCancellationRequested) { return; } if(_queue.TryDequeue(out item)) { item(); } }, _token).ContinueWith((ants) => { if(ants.IsCanceled || ants.IsFaulted) { isTaskCompleted = true; } else if(ants.IsCompleted) { isTaskCompleted = true; } _currentInterval = _minimumInterval; System.Diagnostics.Debug.WriteLine(Task.CurrentId + " " + DateTime.Now.ToString() + " Task execution complted _currentInterval : " + _currentInterval); }); } else { // Raise the interval till maximum value if (_currentInterval < _maximumInterval) { _currentInterval++; } System.Diagnostics.Debug.WriteLine(Task.CurrentId + " " + DateTime.Now.ToString() + " Task waiting complted _currentInterval : " + _currentInterval); Thread.Sleep(TimeSpan.FromSeconds(_currentInterval)); } } } protected virtual void Dispose(bool disposing) { try { if (!disposedValue) { if (disposing) { // clear the items in queue. Action item; while (_queue.Count != 0) { if (_queue.TryDequeue(out item)) { } } _tokenSource.Cancel(); _executionTask.Wait(); _tokenSource.Dispose(); _executionTask.Dispose(); } disposedValue = true; } } catch { } finally { _queue = null; _tokenSource = null; _executionTask = null; } } public void Dispose() { Dispose(true); } } }
I mostly agree with vishnu vardhan's answer. However I disagree with his 4th suggestion, which I think is hardly an improvement.
It still uses Thread.Sleep
which should never be used in production code: you shouldn't pay your worker to sleep. Either use a BlockingCollection
or write your own synchronization using ManualResetEvent
. If no tasks are being processes your thread should just hang on some wait handle. It should not spin in a loop, constantly consuming CPU resources, while doing no meaningful work.
I also think it might be a good idea to refactor your code into an actual TaskScheduler.
Explore related questions
See similar questions with these tags.