I am working on multi-threading application to learn multithreading better.
My application has a queue with 400 unique links, 4 threads and each thread is pulling link from the queue and sending a Get request to this queue.
My application seems to run, and it takes 18 - 22 seconds for it to finish on 400 links.
I am using ThreadPool.QueueUserWorkItem
to do the multithreading.
I wanted someone to review my code and tell me if I use ThreadPool
in the correct way.
I need it to be efficient as it can be.
I created a similar application in Python and it takes 12 seconds to run 400 requests. Therefore, I think I am not using the multithreading ability of C# correctly.
The code:
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
namespace PlayGround
{
class Program
{
static void Main(string[] args)
{
var watch = System.Diagnostics.Stopwatch.StartNew();
RunThreadPool();
watch.Stop();
var elapsedMs = watch.ElapsedMilliseconds;
Console.WriteLine("[*] Elapsed time: {0}", elapsedMs / 1000);
Console.ReadLine();
}
static BlockingCollection<string> inputQueue = new BlockingCollection<string>();
// https://stackoverflow.com/questions/6529659/wait-for-queueuserworkitem-to-complete
private static ManualResetEvent resetEvent = new ManualResetEvent(false);
private static void RunThreadPool()
{
string url = @"https://google.com";
for (int i = 0; i < 4; i++)
{
var tempWorkerId = i;
ThreadPool.QueueUserWorkItem(state => Worker(tempWorkerId));
}
for (int i = 0; i < 400; ++i)
{
//Console.WriteLine("Queueing work item {0}", url + "/" + i);
inputQueue.Add(url + "/" + i);
//Thread.Sleep(50);
}
Console.WriteLine("Stopping adding.");
inputQueue.CompleteAdding();
resetEvent.WaitOne();
Console.WriteLine("Done.");
}
// https://stackoverflow.com/a/22689821/2153777
static void Worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);
foreach (var workItem in inputQueue.GetConsumingEnumerable())
{
string res = "";
//Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem);
try
{
res = Get(workItem);
}
catch (Exception)
{
res = "404";
}
//Console.WriteLine("Worker {0} is processing item {1} with result {2}", workerId, workItem, res);
//Thread.Sleep(100); // Simulate work.
}
resetEvent.Set();
Console.WriteLine("Worker {0} is stopping.", workerId);
}
// https://stackoverflow.com/a/27108442/2153777
public static string Get(string uri)
{
HttpStatusCode status;
HttpWebRequest request = (HttpWebRequest)WebRequest.Create(uri);
request.AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate;
using (HttpWebResponse response = (HttpWebResponse)request.GetResponse())
{
status = response.StatusCode;
}
//Thread.Sleep(2000);
return status.ToString() + "; Thread: " + Thread.CurrentThread.ManagedThreadId.ToString();
}
}
}
EDIT: Using async\await
was very slow (44 seconds) (4.12.2018)
Following the comments about using async\await
, I used the link from Microsoft to create a version with async\await
and it seems to work faster than anything else I tried.
Here is the code:
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace AsyncAwait
{
class Program
{
static void Main(string[] args)
{
SumPageSizesAsync();
Console.ReadLine();
}
private static async Task SumPageSizesAsync()
{
var watch = System.Diagnostics.Stopwatch.StartNew();
// Make a list of web addresses.
List<string> urlList = SetUpURLList();
// Create a query.
IEnumerable<Task<HttpStatusCode>> downloadTasksQuery =
from url in urlList select GetURLStatusCodeAsync(url);
// Use ToArray to execute the query and start the download tasks.
Task<HttpStatusCode>[] downloadTasks = downloadTasksQuery.ToArray();
// You can do other work here before awaiting.
// Await the completion of all the running tasks.
HttpStatusCode[] lengths = await Task.WhenAll(downloadTasks);
watch.Stop();
var elapsedMs = watch.ElapsedMilliseconds;
Console.WriteLine("[*] Elapsed time: {0}", elapsedMs / 1000);
}
private static async Task<HttpStatusCode> GetURLStatusCodeAsync(string url)
{
HttpStatusCode status;
try
{
var request = (HttpWebRequest)WebRequest.Create(url);
request.AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate;
// Send the request to the Internet resource and wait for
// the response.
using (HttpWebResponse response = (HttpWebResponse)await request.GetResponseAsync())
{
status = response.StatusCode;
}
}
catch (Exception)
{
status = HttpStatusCode.NotFound;
}
//Console.WriteLine("[*] Status code: {0}, Thread: {1}, Url: {2}", status, Thread.CurrentThread.ManagedThreadId, url);
return status;
}
private static List<string> SetUpURLList()
{
string url = @"https://google.com";
List<string> urls = new List<string>();
for (int i = 0; i < 400; ++i)
{
urls.Add(url + "/" + i);
}
return urls;
}
}
}
1 Answer 1
I think your use of ManualResetEvent
is incorrect in this scenario, because as I read it, you intend to wait for all threads to complete here:
resetEvent.WaitOne(); Console.WriteLine("Done.");
But you actually only wait for the first thread to call resetEvent.Set()
.
Instead you'll have to provide a ManualResetEvent
(or another waithandle type - for instance ManualResetEventSlim
) for each thread in order to wait for them all to end. This could be done like this:
private static void RunThreadPool()
{
string url = @"https://google.com";
List<ManualResetEvent> waitHandles = new List<ManualResetEvent>();
for (int i = 0; i < 4; i++)
{
ManualResetEvent waitHandle = new ManualResetEvent(false);
waitHandles.Add(waitHandle);
int workerId = i;
ThreadPool.QueueUserWorkItem(state => Worker(workerId, waitHandle));
}
Console.WriteLine("Starting adding.");
for (int i = 0; i < 400; ++i)
{
inputQueue.Add(url); // + "/" + i);
}
Console.WriteLine("Stopping adding.");
inputQueue.CompleteAdding();
WaitHandle.WaitAll(waitHandles.ToArray());
waitHandles.ForEach(wh => wh.Dispose());
//resetEvent.WaitOne();
Console.WriteLine("Done.");
}
static void Worker(int workerId, ManualResetEvent waitHandle)
{
Console.WriteLine("Worker {0} is starting.", workerId);
foreach (var workItem in inputQueue.GetConsumingEnumerable())
{
string res = "";
try
{
res = Get(workItem);
Console.WriteLine($"{res} - {workerId}");
}
catch (Exception)
{
res = "404";
}
}
waitHandle.Set();
Console.WriteLine("Worker {0} is stopping.", workerId);
}
Besides that I think your use of ThreadPool.QueueUserWorkItem()
is OK.
It is not quite clear what kind of concurrency you are exercising but if it's just about executing an action on each element in an existing collection in a multi threaded/parallel manner then, you can experiment with Parallel.ForEach(..)
:
private static void RunParallel()
{
string url = @"https://google.com";
List<string> inputQueue = new List<string>();
for (int i = 0; i < 400; ++i)
{
inputQueue.Add(url); // + "/" + i);
}
int workerId = 0;
ParallelOptions options = new ParallelOptions();
options.MaxDegreeOfParallelism = 8;
Parallel.ForEach(inputQueue, options, (uri) =>
{
Worker(workerId++, uri);
});
Console.WriteLine("Done.");
}
static void Worker(int workerId, string uri)
{
Console.WriteLine("Worker {0} is starting.", workerId);
string res = "";
try
{
res = Get(uri);
Console.WriteLine(res);
}
catch (Exception)
{
res = "404";
}
Console.WriteLine("Worker {0} is stopping.", workerId);
}
If you experiment with different values of options.MaxDegreeOfParallelism
you can investigate how the Parallel
react to that in the use of threads and how it may influence on performance.
-
\$\begingroup\$ I'd use semaphore instead (workers call Release, main calls WaitOne four times). +1 Anyway \$\endgroup\$user52292– user522922018年12月02日 16:17:05 +00:00Commented Dec 2, 2018 at 16:17
-
\$\begingroup\$ @Henrik Hansen, thanks. The last method with the
Parallel.ForEach(..)
was the fastet (14 seconds). Btw, I also triedasync\await
(my last edit from 4.12.2018) but it was much slower compared to your suggestions. \$\endgroup\$E235– E2352018年12月04日 07:41:09 +00:00Commented Dec 4, 2018 at 7:41
Explore related questions
See similar questions with these tags.
async/await
or do just want to use threads? \$\endgroup\$async\await
(see my last edit from 4.12.2018) and it took between 36 - 44 seconds. Very slow compared to my first method. \$\endgroup\$