I am trying to rewrite a foreach
loop to use Parallel.ForEach
since every document I need to process can be handled as s separate entity there are no dependencies what so ever.
The code is fairly straight forward as below:
- Query the DB
- Read each document in the loop
- For each document do two web calls and add results to the document
- Add updated document to list
- BulkImport list to DB
Since the web API calls are the slowest part due to network delay, I wanted to process them in parallell to save time so I wrote this code
private async Task<List<String>> FetchDocumentsAndBuildList(string brand)
{
using (var client = new DocumentClient(new Uri(cosmosDBEndpointUrl), cosmosDBPrimaryKey))
{
List<string> formattedList = new List<string>();
FeedOptions queryOptions = new FeedOptions
{
MaxItemCount = -1,
PartitionKey = new PartitionKey(brand)
};
var query = client.CreateDocumentQuery<Document>(UriFactory.CreateDocumentCollectionUri(cosmosDBName, cosmosDBCollectionNameRawData), $"SELECT TOP 2 * from c where c.brand = '{brand}'", queryOptions).AsDocumentQuery();
while(query.HasMoreResults)
{
var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 10 };
Parallel.ForEach(await query.ExecuteNextAsync<Document>(), options, async singleDocument =>
{
JObject originalData = singleDocument.GetPropertyValue<JObject>("BasicData");
if (originalData != null)
{
var artNo = originalData.GetValue("artno");
if (artNo != null)
{
string strArtNo = artNo.ToString();
string productNumber = strArtNo.Substring(0, 7);
string colorNumber = strArtNo.Substring(7, 3);
string HmGoeUrl = $"https://xxx,xom/Online/{strArtNo}/en";
string sisApiUrl = $"https:/yyy.com/{productNumber}/{colorNumber}?&maxnumberofstores=10&brand=000&channel=02";
string HttpFetchMethod = "GET";
JObject detailedDataResponse = await DataFetcherAsync(HmGoeUrl, HttpFetchMethod);
JObject inventoryData = await DataFetcherAsync(sisApiUrl, HttpFetchMethod);
if (detailedDataResponse != null)
{
JObject productList = (JObject)detailedDataResponse["product"];
if (productList != null)
{
var selectedIndex = productList["articlesList"].Select((x, index) => new { code = x.Value<string>("code"), Node = x, Index = index })
.Single(x => x.code == strArtNo)
.Index;
detailedDataResponse = (JObject)productList["articlesList"][selectedIndex];
}
}
singleDocument.SetPropertyValue("DetailedData", detailedDataResponse);
singleDocument.SetPropertyValue("InventoryData", inventoryData);
singleDocument.SetPropertyValue("consumer", "NWS");
}
}
formattedList.Add(Newtonsoft.Json.JsonConvert.SerializeObject(singleDocument));
});
//foreach (Document singleDocument in await query.ExecuteNextAsync<Document>())
//{
// JObject originalData = singleDocument.GetPropertyValue<JObject>("BasicData");
// if(originalData != null)
// {
// var artNo = originalData.GetValue("artno");
// if(artNo != null)
// {
// string strArtNo = artNo.ToString();
// string productNumber = strArtNo.Substring(0, 7);
// string colorNumber = strArtNo.Substring(7, 3);
// string HmGoeUrl = $"https:/xxx.xom/Online/{strArtNo}/en";
// string sisApiUrl = $"https://yyy.xom&maxnumberofstores=10&brand=000&channel=02";
// string HttpFetchMethod = "GET";
// JObject detailedDataResponse = await DataFetcherAsync(HmGoeUrl, HttpFetchMethod);
// JObject inventoryData = await DataFetcherAsync(sisApiUrl, HttpFetchMethod);
// if(detailedDataResponse != null)
// {
// JObject productList = (JObject)detailedDataResponse["product"];
// if(productList != null)
// {
// var selectedIndex = productList["articlesList"].Select((x, index) => new { code = x.Value<string>("code"), Node = x, Index = index })
// .Single(x => x.code == strArtNo)
// .Index;
// detailedDataResponse = (JObject)productList["articlesList"][selectedIndex];
// }
// }
// singleDocument.SetPropertyValue("DetailedData", detailedDataResponse);
// singleDocument.SetPropertyValue("InventoryData", inventoryData);
// singleDocument.SetPropertyValue("consumer", "NWS");
// }
// }
// formattedList.Add(Newtonsoft.Json.JsonConvert.SerializeObject(singleDocument));
//}
}
return formattedList;
}
}
If I add a breakpoint in the loop, I can see the correct values are assigned to each variable but for some reason the formattedList
returned is always 0 entries and I cannot figure out why.
Commented out is the original foreach
loop that works just fine but is slooooow
--- EDIT --- THis is how I am calling this code from the parent method
log.LogInformation($"Starting creation of DocumentList for BulkImport at: {DateTime.Now}");
var documentsToImportInBatch = await FetchDocumentsAndBuildList(brand);
log.LogInformation($"BulkExecutor DocumentList has: {documentsToImportInBatch.Count} entries, created at: {DateTime.Now}");
1 Answer 1
The problem here is that Parallel.ForEach
doesn't understand that each call to your lambda returning a Task
needs to be awaited before the ForEach
can be considered complete.
As a result, the continuation after the await isn't invoked before your function exits, and this is why formattedList
has zero elements in it.
You can easily prove this with a code sample such as:
Parallel.ForEach(Enumerable.Range(0, 100), async singleDocument => await Task.Delay(9999));
Console.WriteLine("Done!");
Done
will be printed almost immediately.
For I/O bound parallelism, you could instead use Task.WhenAll
to parallelize your async webscraping calls
var myDocuments = await query.ExecuteNextAsync<Document>();
var myScrapingTasks = myDocuments.Select(async singleDocument =>
{
// ... all of your web scraping code here
// return the amended (mutated) document
return JsonConvert.SerializeObject(singleDocument);
});
var results = await Task.WhenAll(myScrapingTasks);
formattedList.AddRange(results);
w.r.t MaxDegreeOfParallelism
, if you find that you need to throttle the number of concurrent scraping calls, easiest would be to group the incoming documents into manageable chunks and processing the smaller chunks at a time - the Select(x, i)
overload and GroupBy
work wonders.
Comments
Explore related questions
See similar questions with these tags.
List<>
with aConcurrentBag<>
? As far as I know, Lists aren't designed to be used in concurrent code.Parallel
class for async work is a common mistake. It doesn't understand async delegates, meaning that it doesn't accept lambdas that returnTask
s. So when you feed it with anasync () =>
lambda, anasync void
is created. Async void's are a problem by themselves. They can't be awaited, and their exceptions can't be handled. Unfortunately there is no class analogous toParallel
for async work. We have nothing in between theTask.WhenAll
which is very basic, and the super powerful TPL Dataflow library that does everything you can imagine but has learning curve.