I have implemented some methods to make the creation of multithreaded programs easier for me in the future
The requirement was:
- Be able to add as many Tasks as I need to the pool
- Be able to restrict the amount of concurrently running threads
- Be able to set the thread creation speed
- Be able to cancel the thread creation/currently running threads
This is the code i came up with, note I do not know the conventions of Thread safety and design patterns usually used in multithreaded programming in c# or any other language.
This is a first attempt at a topic im interested in, The point of the review is to see if there are improvements I can make to my code and learn some new methods to make it tidier and easier to understand
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace MTSimplified
{
public class AlreadyStartedException : Exception { public AlreadyStartedException() { } }
public class NotStartedException : Exception { public NotStartedException(){ } }
class ThreadOrganizer
{
private List<Thread> threadList;
private int maxThreads,creationInterval,finishedThreads,currentThreadIndex;
private bool keepCreating,started;
private CancellationTokenSource cts;
/* entire pool */
public event EventHandler onDone;
public event EventHandler onStopped;
/* per thread */
public event EventHandler onThreadCreate;
public event EventHandler onThreadFinish;
public ThreadOrganizer(int maxThreads, int creationInterval)
{
this.maxThreads = maxThreads;
this.creationInterval = creationInterval;
this.threadList = new List<Thread>();
this.finishedThreads = 0;
this.currentThreadIndex = 0;
this.cts = new CancellationTokenSource();
this.keepCreating = true;
this.started = false;
}
public void addTask(Action function)
{
if (started)
throw new AlreadyStartedException();
threadList.Add(new Thread(delegate ()
{
functionWrapper(function,cts.Token);
}));
}
public void startAll()
{
if (started)
throw new AlreadyStartedException();
this.started = true;
int targetFinish = threadList.Count;
Task.Run(() => {
while (finishedThreads != targetFinish && currentThreadIndex != targetFinish && keepCreating)
{
if (threadList.Count(t => t.IsAlive) < maxThreads)
{
startThread();
Thread.Sleep(creationInterval);
}
}
if (!keepCreating)
return;
while (threadList.Count(t => t.IsAlive) != 0) ;
onDone?.Invoke(this, EventArgs.Empty);
});
}
public void stopAll()
{
if (!started)
throw new NotStartedException();
this.cts.Cancel();
this.keepCreating = false;
while (threadList.Count(t => t.IsAlive) != 0) ;
onStopped?.Invoke(this, EventArgs.Empty);
}
/* private methods */
private void functionWrapper(Action function, CancellationToken ct)
{
onThreadCreate?.Invoke(this, EventArgs.Empty);
function.Invoke();
this.finishedThreads++;
onThreadFinish?.Invoke(this, EventArgs.Empty);
}
private void startThread()
{
threadList[currentThreadIndex].Start();
currentThreadIndex++;
}
}
}
To test the class
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace MTSimplified
{
class Program
{
static int i = 0;
static void Main(string[] args)
{
ThreadOrganizer threadO = new ThreadOrganizer(10,100);
threadO.onDone += (s,e) => done();
threadO.onStopped += (s, e) => canceled();
threadO.onThreadCreate += (s, e) => created();
threadO.onThreadFinish += (s, e) => finished();
for (int i = 0; i<200;i++)
threadO.addTask(print);
threadO.startAll();
Console.ReadLine();
threadO.stopAll();
while (true) ;
}
public static void print()
{
Thread.Sleep(1000);
i++;
Console.WriteLine(i);
}
public static void created()
{
Console.WriteLine("created");
}
public static void finished()
{
Console.WriteLine("finished");
}
public static void done()
{
Console.WriteLine("Everything is done");
}
public static void canceled()
{
Console.WriteLine("Everything is canceled");
}
}
}
2 Answers 2
I don't have time to write a better implementation of a thread pool, but I'll point out what I see is wrong.
I'm quite sure your implementation does not satisfy the requirements' intention. Where does it say that none of the tasks should be started when added, and then they should only be started all at once (but not really, because of the delay between thread creation)? And once the "thread pool" has been started, you can't add any further tasks? As it stands, I cannot see any real use for it. Not that I can see a real use for a better version either, since you can pretty much do all that with the standard thread pool.
Other issues:
- Each
functionWrapper
method runs on a separate thread, and incrementsfinishedThreads
when complete. This is not thread safe, and you're bound to lose increments. Given that you use this variable in yourwhile
loop instartAll()
, this is a problem. Perhaps useInterlocked.Increment
. - Speaking of which,
finishedThreads
,currentThreadIndex
, andkeepCreating
should be markedvolatile
so that the reads of their values aren't hoisted out of thewhile
loop. - Creating a new thread for each task is wasteful. The standard thread pool will reutilise threads from the pool to schedule queued tasks, so you don't have loads of threads constantly being newly created.
- I don't believe your implementation meets the requirement "Be able to cancel [...] currently running threads". I can't test it myself but I'm pretty sure that simply calling
Cancel()
on aCancellationTokenSource
when the function you started is already running will do a whole load of nothing, unless you pass the token in to the method, and then continually test it from within the method (in this case, from within the method that gets invoked whenfunction.Invoke()
is called infunctionWrapper
). Of course, if the method being invoked doesn't care aboutCancellationToken
s, then you're plum out of luck - it's not "cancellable" (apart from aborting the thread). Someone may correct me but I believe that's how it is. - Use PascalCase for method names and public events.
- Put a space after a comma (e.g. in
private int maxThreads,creationInterval,finishedThreads,currentThreadIndex;
) - Be consistent:
startAll()
uses a member field like so:if (started) ...
and two lines later uses "this":this.started = true;
. Do one or the other.
while (threadList.Count(t => t.IsAlive) != 0) ;
this line will burn up CPU, may be a very long time if a thread doesn't exit.
A better move could be to wait for the thread to signal that he has exited. There are basic samples about this everywhere, just Google about WaitHandle class.
Also, why do you use a task to launch a thread inside a thread pool ? Task.Run is running with a default thread pool.
Hope this help !