0

I am trying to rewrite a foreach loop to use Parallel.ForEach since every document I need to process can be handled as s separate entity there are no dependencies what so ever.

The code is fairly straight forward as below:

  • Query the DB
  • Read each document in the loop
  • For each document do two web calls and add results to the document
  • Add updated document to list
  • BulkImport list to DB

Since the web API calls are the slowest part due to network delay, I wanted to process them in parallell to save time so I wrote this code

private async Task<List<String>> FetchDocumentsAndBuildList(string brand)
{
 using (var client = new DocumentClient(new Uri(cosmosDBEndpointUrl), cosmosDBPrimaryKey))
 {
 List<string> formattedList = new List<string>();
 FeedOptions queryOptions = new FeedOptions
 {
 MaxItemCount = -1,
 PartitionKey = new PartitionKey(brand)
 };
 var query = client.CreateDocumentQuery<Document>(UriFactory.CreateDocumentCollectionUri(cosmosDBName, cosmosDBCollectionNameRawData), $"SELECT TOP 2 * from c where c.brand = '{brand}'", queryOptions).AsDocumentQuery();
 while(query.HasMoreResults)
 {
 var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 10 };
 Parallel.ForEach(await query.ExecuteNextAsync<Document>(), options, async singleDocument =>
 {
 JObject originalData = singleDocument.GetPropertyValue<JObject>("BasicData");
 if (originalData != null)
 {
 var artNo = originalData.GetValue("artno");
 if (artNo != null)
 {
 string strArtNo = artNo.ToString();
 string productNumber = strArtNo.Substring(0, 7);
 string colorNumber = strArtNo.Substring(7, 3);
 string HmGoeUrl = $"https://xxx,xom/Online/{strArtNo}/en";
 string sisApiUrl = $"https:/yyy.com/{productNumber}/{colorNumber}?&maxnumberofstores=10&brand=000&channel=02";
 string HttpFetchMethod = "GET";
 JObject detailedDataResponse = await DataFetcherAsync(HmGoeUrl, HttpFetchMethod);
 JObject inventoryData = await DataFetcherAsync(sisApiUrl, HttpFetchMethod);
 if (detailedDataResponse != null)
 {
 JObject productList = (JObject)detailedDataResponse["product"];
 if (productList != null)
 {
 var selectedIndex = productList["articlesList"].Select((x, index) => new { code = x.Value<string>("code"), Node = x, Index = index })
 .Single(x => x.code == strArtNo)
 .Index;
 detailedDataResponse = (JObject)productList["articlesList"][selectedIndex];
 }
 }
 singleDocument.SetPropertyValue("DetailedData", detailedDataResponse);
 singleDocument.SetPropertyValue("InventoryData", inventoryData);
 singleDocument.SetPropertyValue("consumer", "NWS");
 }
 }
 formattedList.Add(Newtonsoft.Json.JsonConvert.SerializeObject(singleDocument));
 });
 //foreach (Document singleDocument in await query.ExecuteNextAsync<Document>())
 //{
 // JObject originalData = singleDocument.GetPropertyValue<JObject>("BasicData");
 // if(originalData != null)
 // {
 // var artNo = originalData.GetValue("artno");
 // if(artNo != null)
 // {
 // string strArtNo = artNo.ToString();
 // string productNumber = strArtNo.Substring(0, 7);
 // string colorNumber = strArtNo.Substring(7, 3);
 // string HmGoeUrl = $"https:/xxx.xom/Online/{strArtNo}/en";
 // string sisApiUrl = $"https://yyy.xom&maxnumberofstores=10&brand=000&channel=02";
 // string HttpFetchMethod = "GET";
 // JObject detailedDataResponse = await DataFetcherAsync(HmGoeUrl, HttpFetchMethod);
 // JObject inventoryData = await DataFetcherAsync(sisApiUrl, HttpFetchMethod);
 // if(detailedDataResponse != null)
 // {
 // JObject productList = (JObject)detailedDataResponse["product"];
 // if(productList != null)
 // {
 // var selectedIndex = productList["articlesList"].Select((x, index) => new { code = x.Value<string>("code"), Node = x, Index = index })
 // .Single(x => x.code == strArtNo)
 // .Index;
 // detailedDataResponse = (JObject)productList["articlesList"][selectedIndex];
 // }
 // }
 // singleDocument.SetPropertyValue("DetailedData", detailedDataResponse);
 // singleDocument.SetPropertyValue("InventoryData", inventoryData);
 // singleDocument.SetPropertyValue("consumer", "NWS");
 // }
 // }
 // formattedList.Add(Newtonsoft.Json.JsonConvert.SerializeObject(singleDocument));
 //}
 }
 return formattedList;
 }
}

If I add a breakpoint in the loop, I can see the correct values are assigned to each variable but for some reason the formattedList returned is always 0 entries and I cannot figure out why.

Commented out is the original foreach loop that works just fine but is slooooow

--- EDIT --- THis is how I am calling this code from the parent method

 log.LogInformation($"Starting creation of DocumentList for BulkImport at: {DateTime.Now}");
 var documentsToImportInBatch = await FetchDocumentsAndBuildList(brand);
 log.LogInformation($"BulkExecutor DocumentList has: {documentsToImportInBatch.Count} entries, created at: {DateTime.Now}");
StuartLC
108k18 gold badges224 silver badges293 bronze badges
asked Nov 6, 2019 at 6:47
5
  • 1
    Have you tried replacing your List<> with a ConcurrentBag<>? As far as I know, Lists aren't designed to be used in concurrent code. Commented Nov 6, 2019 at 6:52
  • 1
    @RobinB You beat me at that ;) Commented Nov 6, 2019 at 6:53
  • It still fails, its like it returns the empty bag before the loops finish retrieving the data and adding it to the bag? I break after the bag.Add and I can see the count increase for every iteration of the loop, but the code doesn't stop it already returns the empty bag to the calling method Commented Nov 6, 2019 at 7:04
  • @StuartLC yes I am trying to change it to TPL steep learning curve though but seems powerful and good fit Commented Nov 7, 2019 at 10:37
  • 1
    Using the Parallel class for async work is a common mistake. It doesn't understand async delegates, meaning that it doesn't accept lambdas that return Tasks. So when you feed it with an async () => lambda, an async void is created. Async void's are a problem by themselves. They can't be awaited, and their exceptions can't be handled. Unfortunately there is no class analogous to Parallel for async work. We have nothing in between the Task.WhenAll which is very basic, and the super powerful TPL Dataflow library that does everything you can imagine but has learning curve. Commented Nov 8, 2019 at 18:19

1 Answer 1

1

The problem here is that Parallel.ForEach doesn't understand that each call to your lambda returning a Task needs to be awaited before the ForEach can be considered complete.

As a result, the continuation after the await isn't invoked before your function exits, and this is why formattedList has zero elements in it.

You can easily prove this with a code sample such as:

Parallel.ForEach(Enumerable.Range(0, 100), async singleDocument => await Task.Delay(9999));
Console.WriteLine("Done!");

Done will be printed almost immediately.

For I/O bound parallelism, you could instead use Task.WhenAll to parallelize your async webscraping calls

var myDocuments = await query.ExecuteNextAsync<Document>();
var myScrapingTasks = myDocuments.Select(async singleDocument =>
{
 // ... all of your web scraping code here
 // return the amended (mutated) document
 return JsonConvert.SerializeObject(singleDocument);
});
var results = await Task.WhenAll(myScrapingTasks);
formattedList.AddRange(results);

w.r.t MaxDegreeOfParallelism, if you find that you need to throttle the number of concurrent scraping calls, easiest would be to group the incoming documents into manageable chunks and processing the smaller chunks at a time - the Select(x, i) overload and GroupBy work wonders.

answered Nov 8, 2019 at 17:14

Comments

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.