The General Idea
I wish to simulate something which works like Java's ThreadExecutor
, but as a first step, I would like to run a fixed number of identical tasks in parallel.
The next step should be queuing different tasks.
The method in question
public static async Task RunTasks(Func<Task<bool>> action, int tasks, int retries, string method)
{
// Its just a source in order to use the library.
byte[] source = new byte[tasks];
// Wrapping the method with a retry mechanism
async Task NewAction(byte _)
{
await DoWithRetries(async () => await action(), retries, method);
}
// Running everything
await source.ParallelForEachAsync(NewAction, tasks);
}
private static async Task<bool> DoWithRetries(Func<Task<bool>> action, int retryCount, string method)
{
while (true)
{
if (retryCount <= 0)
return false;
try
{
bool res = await action();
if (res)
return true;
}
catch (Exception e)
{
LogFactory.General.Fatal(e, method);
}
retryCount--;
await Task.Delay(200);
}
}
External Libraries
I used the AsyncEnumerator
Nuget (which allows the ParallelForEachAsync call)
Code Execution
int page = -1;
int tasks = 10;
long count = 0;
await RetryComponent.RunTasks(async () =>
{
int localPage = Interlocked.Increment(ref page);
int results = -1;
while (results != 0)
{
Console.WriteLine($"Page: {localPage}");
// req.IsSuccess holds if the request processed well
// mData contains the data returned from db (List)
var req = await FetchSomeListFromDb();
if (!req.IsSuccess)
continue;
foreach (var item in req.mData)
{
// simluate some IO..
Interlocked.Increment(ref count);
}
// shuts down if zero
results = req.mData.Count;
// next page
localPage = Interlocked.Increment(ref page);
}
return true;
}, tasks, 1, MethodBase.GetCurrentMethod().Name);
The issues I have with this code:
- It seems like someone 100% did it already in the past, but I couldn't find anything like that or like the Java's ThreadsExecutor I mentioned
- It seems like the execution code (The code I used to run everything) is large and seems like it will repeat itself..especially the
page
increment and thewhile
handling (the last 2 statements at the end of the while block)
I will be glad to get a feedback on it or suggestions if the code exists somewhere (Either my implemetation or something similar to Java's ThreadsExecutor)
2 Answers 2
1) The library you are trying to use clearly does not fit your requirements. So why force it? Aren't you essentially just calling:
var tasks = Enumerable.Range(0, tasks).Select(i => DoWithRetries(action, retries, method));
await Task.WhenAll(tasks);
2) Fatal
error is an error that your software cannot recover from. If you just swallow it in retry loop and keep going, then it is not fatal.
3) Delay duration should probably be a parameter and not a hardcoded value.
4) You can use nameof
operator instead of MethodBase.GetCurrentMethod().Name
.
P.S. I don't know what ThreadExecutor
is and I didn't understand your use-case example, so I won't comment on those.
-
\$\begingroup\$ Thanks for replying. regarding 1, you're probably right.. ill fix it. regarding 2: it could be that the internet crashed or out of memory..anyway the number of retries is limitted. regarding 3, you're probably right, but its kind of an acceptable constant.. regarding 4, never used nameof, ill check it out. The main issue that the core issue is not answered. the code re-use is bad imo and probably could be utilalize. \$\endgroup\$Ori Refael– Ori Refael2018年05月07日 08:09:28 +00:00Commented May 7, 2018 at 8:09
As the author of AsyncEnumerable
library, I can tell that the use of ParallelForEachAsync
method is indeed inappropriate.
Honestly, it's hard to understand what exactly you are trying to accomplish, but I can give some advice on how AsyncEnumerable
can help.
Hide enumeration of all DB rows behind an 'Async Stream'
class Repository { /// <summary> /// Creates a continuous stream of list items. /// </summary> IAsyncEnumerable<SomeListItem> GetSomeListItems() => new AsyncEnumerable<SomeListItem>(async yield => { int results = -1; while (results != 0) { var req = await FetchSomeListFromDb(); if (!req.IsSuccess) continue; foreach (var item in req.mData) await yield.ReturnAsync(item); results = req.mData.Count; } }); }
Option A: Process list items sequentially
class Logic { async Task DoStuffAsync() { var listItem = repository.GetSomeListItems(); await listItem.ForEachAsync(async item => { // Asynchronous list item processing. }); } }
Option B: Process list items in parallel
class Logic { async Task DoStuffAsync() { var listItem = repository.GetSomeListItems(); await listItem.ParallForEachAsync(async item => { // Asynchronous list item processing. }, maxDegreeOfParalellism: 10); } }
Not sure about the retry logic though - that can be put either on the DB call or on an item process, or both. Then in your example you spin up multiple threads to do exactly the same item processing 10 times. Is that what you are trying to achieve? In my example #3 the items are read once, but processed in parallel with up to 10 threads.
Explore related questions
See similar questions with these tags.