3
\$\begingroup\$

The General Idea

I wish to simulate something which works like Java's ThreadExecutor, but as a first step, I would like to run a fixed number of identical tasks in parallel. The next step should be queuing different tasks.

The method in question

public static async Task RunTasks(Func<Task<bool>> action, int tasks, int retries, string method)
{
 // Its just a source in order to use the library.
 byte[] source = new byte[tasks];
 // Wrapping the method with a retry mechanism
 async Task NewAction(byte _)
 {
 await DoWithRetries(async () => await action(), retries, method);
 }
 // Running everything
 await source.ParallelForEachAsync(NewAction, tasks);
}
private static async Task<bool> DoWithRetries(Func<Task<bool>> action, int retryCount, string method)
{
 while (true)
 {
 if (retryCount <= 0)
 return false;
 try
 {
 bool res = await action();
 if (res)
 return true;
 }
 catch (Exception e)
 {
 LogFactory.General.Fatal(e, method);
 }
 retryCount--;
 await Task.Delay(200);
 }
}

External Libraries

I used the AsyncEnumerator Nuget (which allows the ParallelForEachAsync call)

Code Execution

int page = -1;
int tasks = 10;
long count = 0;
await RetryComponent.RunTasks(async () =>
{
 int localPage = Interlocked.Increment(ref page);
 int results = -1;
 while (results != 0)
 {
 Console.WriteLine($"Page: {localPage}");
 // req.IsSuccess holds if the request processed well 
 // mData contains the data returned from db (List)
 var req = await FetchSomeListFromDb();
 if (!req.IsSuccess)
 continue;
 foreach (var item in req.mData)
 {
 // simluate some IO..
 Interlocked.Increment(ref count);
 }
 // shuts down if zero
 results = req.mData.Count;
 // next page
 localPage = Interlocked.Increment(ref page);
 }
 return true;
}, tasks, 1, MethodBase.GetCurrentMethod().Name);

The issues I have with this code:

  1. It seems like someone 100% did it already in the past, but I couldn't find anything like that or like the Java's ThreadsExecutor I mentioned
  2. It seems like the execution code (The code I used to run everything) is large and seems like it will repeat itself..especially the page increment and the while handling (the last 2 statements at the end of the while block)

I will be glad to get a feedback on it or suggestions if the code exists somewhere (Either my implemetation or something similar to Java's ThreadsExecutor)

t3chb0t
44.6k9 gold badges84 silver badges190 bronze badges
asked May 6, 2018 at 12:22
\$\endgroup\$

2 Answers 2

2
\$\begingroup\$

1) The library you are trying to use clearly does not fit your requirements. So why force it? Aren't you essentially just calling:

var tasks = Enumerable.Range(0, tasks).Select(i => DoWithRetries(action, retries, method));
await Task.WhenAll(tasks);

2) Fatal error is an error that your software cannot recover from. If you just swallow it in retry loop and keep going, then it is not fatal.

3) Delay duration should probably be a parameter and not a hardcoded value.

4) You can use nameof operator instead of MethodBase.GetCurrentMethod().Name.

P.S. I don't know what ThreadExecutor is and I didn't understand your use-case example, so I won't comment on those.

answered May 7, 2018 at 7:59
\$\endgroup\$
1
  • \$\begingroup\$ Thanks for replying. regarding 1, you're probably right.. ill fix it. regarding 2: it could be that the internet crashed or out of memory..anyway the number of retries is limitted. regarding 3, you're probably right, but its kind of an acceptable constant.. regarding 4, never used nameof, ill check it out. The main issue that the core issue is not answered. the code re-use is bad imo and probably could be utilalize. \$\endgroup\$ Commented May 7, 2018 at 8:09
1
\$\begingroup\$

As the author of AsyncEnumerable library, I can tell that the use of ParallelForEachAsync method is indeed inappropriate.

Honestly, it's hard to understand what exactly you are trying to accomplish, but I can give some advice on how AsyncEnumerable can help.

  1. Hide enumeration of all DB rows behind an 'Async Stream'

    class Repository
    {
     /// <summary>
     /// Creates a continuous stream of list items.
     /// </summary>
     IAsyncEnumerable<SomeListItem> GetSomeListItems() =>
     new AsyncEnumerable<SomeListItem>(async yield =>
     {
     int results = -1;
     while (results != 0)
     {
     var req = await FetchSomeListFromDb();
     if (!req.IsSuccess)
     continue;
     foreach (var item in req.mData)
     await yield.ReturnAsync(item);
     results = req.mData.Count;
     }
     });
    }
    
  2. Option A: Process list items sequentially

    class Logic
    {
     async Task DoStuffAsync()
     {
     var listItem = repository.GetSomeListItems();
     await listItem.ForEachAsync(async item =>
     {
     // Asynchronous list item processing.
     });
     }
    }
    
  3. Option B: Process list items in parallel

    class Logic
    {
     async Task DoStuffAsync()
     {
     var listItem = repository.GetSomeListItems();
     await listItem.ParallForEachAsync(async item =>
     {
     // Asynchronous list item processing.
     },
     maxDegreeOfParalellism: 10);
     }
    }
    

Not sure about the retry logic though - that can be put either on the DB call or on an item process, or both. Then in your example you spin up multiple threads to do exactly the same item processing 10 times. Is that what you are trying to achieve? In my example #3 the items are read once, but processed in parallel with up to 10 threads.

Jamal
35.2k13 gold badges134 silver badges238 bronze badges
answered Jun 12, 2018 at 3:30
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.