I have a sequence, retrieve data from a vendor API, cleanup our stage tables, and write the retrieve data into the database. What I have:
public static async Task Main()
{
try
{
IDataRetrievalService api = services.GetService<IDataRetrievalService>();
AuthenticationModel authenticated = await api.GetToken();
authenticated.Decorate(Log.Logger);
#region Web Request:
var budget = api.GetBudgetReport(authenticated);
var site = api.GetSiteReport(authenticated);
var project = api.GetProjectReport(authenticated);
var task = api.GetTaskReport(authenticated);
var workEffort = api.GetWorkEffortReport(authenticated);
var invoiceMaterial = api.GetInvoiceMaterialReport(authenticated);
var pickup = api .GetPickupSummary(authenticated);
var pickupMaterial = api.GetPickupMaterialReport(authenticated);
var dropship = api.GetDropshipReport(authenticated);
var dropshipMaterial = api.GetDropshipMaterialReport(authenticated);
var materialType = api.GetMaterialTypeReport(authenticated);
#endregion
IDatabaseCleanupService cleanup = services.GetService<IDatabaseCleanupService>();
await Task.WhenAll(
budget,
cleanup.PurgeBudget(),
site,
cleanup.PurgeSite(),
project,
cleanup.PurgeProject(),
task,
cleanup.PurgeTask(),
workEffort,
cleanup.PurgeWorkEffort(),
invoiceMaterial,
cleanup.PurgeInvoiceMaterial(),
pickup,
cleanup.PurgePickupSummary(),
pickupMaterial,
cleanup.PurgePickupMaterial(),
dropship,
cleanup.PurgeDropship(),
dropshipMaterial,
cleanup.PurgeDropshipMaterial(),
materialType,
cleanup.PurgeMaterialType()
);
IDatabaseImporterService importer = services.GetService<IDatabaseImporterService>();
await Task.WhenAll(
importer.InsertBudget(await budget),
importer.InsertSite(await site),
importer.InsertProject(await project),
importer.InsertTask(await task),
importer.InsertWorkEffort(await workEffort),
importer.InsertInvoiceMaterial(await invoiceMaterial),
importer.InsertPickupSummary(await pickup),
importer.InsertPickupMaterial(await pickupMaterial),
importer.InsertDropship(await dropship),
importer.InsertDropshipMaterial(await dropshipMaterial),
importer.InsertMaterialType(await materialType)
);
}
catch (Exception exception)
{
exception.Decorate(Log.Logger);
throw;
}
}
I feel though that the second WhenAll should not need to wait for a batch of task that should of been completed above, the syntax feels weird to me.
1 Answer 1
As you have said you are executing the following operations against each entity:
- Retrieve from vendor
- Purge local datastore (in parallel with the previous)
- Insert retrieved data
So, we look at the problem from a single entity perspective then we can say:
(TRetrieve, TPurge) -> TInsert(TRetrieve.Result)
TRetrieve
represents the vendor callTPurge
represents the database cleanup()
represents parallel execution->
represents continuationTInsert
represents the database populationTRetrieve.Result
represents the retrieved data
If we could create a structure that bundles these tasks together then we can express our intent on the entity level:
class CoherentTasks<T>
{
public Func<Task<T>> RetrieveTask { get; init; }
public Func<Task> PurgeTask { get; init; }
public Func<T, Task> InsertTask { get; init; }
public CoherentTasks(Func<Task<T>> retrieve, Func<Task> purge, Func<T, Task> insert)
=> (RetrieveTask, PurgeTask, InsertTask) = (retrieve, purge, insert);
}
- Each property here is init only (so can be set only via the constructor or the object initializer)
- Each property is defined as a
Func<...>
so we will not run the Tasks during property assignment - The constructor is taking advantage of ValueTuple and deconstruction
- With this approach only a single assignment is needed to set all three properties
In C# we can't create a List<CoherentTasks<object>>
to be able to store CoherentTasks<int>
and CoherentTasks<string>
objects in it
In order to overcome on this we need to get rid of the generic parameter and use boxing + unboxing
class CoherentTasks
{
public Func<Task<object>> RetrieveTask { get; set; }
public Func<Task> PurgeTask { get; set; }
public Func<object, Task> InsertTask { get; set; }
public CoherentTasks(Func<Task<object>> retrieve, Func<Task> purge, Func<object, Task> insert)
=> (RetrieveTask, PurgeTask, InsertTask) = (retrieve, purge, insert);
}
For the sake of simplicity let me introduce a couple dummy retrieve, purge and insert methods in order to demonstrate the the proposed solution works with different entity types
async Task<object> GetId()
{
await Task.Delay(300);
return 1;
}
async Task PurgeId() => await Task.Delay(700);
Task InsertId(int id)
{
Console.WriteLine(id);
return Task.CompletedTask;
}
async Task<object> GetDescription()
{
await Task.Delay(200);
return "desc";
}
async Task PurgeDesciption() => await Task.Delay(1000);
Task InsertDescription(string description)
{
Console.WriteLine(description);
return Task.CompletedTask;
}
Now we can define a collection of CoherentTasks
, which represents the operations on different entities
List<CoherentTasks> operationsOnEntities = new ()
{
new CoherentTasks(() => GetId(), () => PurgeId(), (id) => InsertId((int)id)),
new CoherentTasks(() => GetDescription(), () => PurgeDesciption(), (desc) => InsertDescription((string)desc)),
...
};
- I've used here
new ()
which is the target-typed new expression feature of C# 9 if you haven't encountered with that before - I've used here explicit casting (
(int)
,(string)
) to unbox data
Finally lets implement the syncronization functionality:
var retrieveTasks = operationsOnEntities.Select(tasks => tasks.RetrieveTask());
var purgeTasks = operationsOnEntities.Select(tasks => tasks.PurgeTask());
await Task.WhenAll(retrieveTasks.Union(purgeTasks));
var insertionTasks = retrieveTasks.Zip(operationsOnEntities.Select(tasks => tasks.InsertTask),
(retrievedTask, insertTask) => insertTask(retrievedTask.Result));
await Task.WhenAny(insertionTasks);
Step 1 - Issue retrieve and purge
- First we kick off the data retrieval jobs (
retrieveTasks
) - Then we kick off the clean-up jobs (
purgeTasks
) - We await to finish (
Task.WhenAll
) both kinds of jobs (Union
)
Step 2 - Issue insertion by using retrieved data
- First we retrieve all insertion tasks
operationsOnEntities.Select(tasks => tasks.InsertTask)
- Then we combine this list with the retrieval jobs (
Zip
)- The second argument of the Zip method is a function which receives one item from each enumerable and we tell how to combine these (
insertTask(retrievedTask.Result)
- The second argument of the Zip method is a function which receives one item from each enumerable and we tell how to combine these (
So, as you can see the core functionality can be implemented with 6 lines of code if you separate data and operation from each other.
For the sake of completeness here is the full demo code:
class Program
{
static async Task Main(string[] args)
{
List<CoherentTasks> operationsOnEntities = new ()
{
new CoherentTasks(() => GetId(), () => PurgeId(), (id) => InsertId((int)id)),
new CoherentTasks(() => GetDescription(), () => PurgeDesciption(), (desc) => InsertDescription((string)desc)),
};
var retrieveTasks = operationsOnEntities.Select(tasks => tasks.RetrieveTask());
var purgeTasks = operationsOnEntities.Select(tasks => tasks.PurgeTask());
await Task.WhenAll(retrieveTasks.Union(purgeTasks));
var insertionTasks = retrieveTasks.Zip(operationsOnEntities.Select(tasks => tasks.InsertTask),
(retrievedTask, insertTask) => insertTask(retrievedTask.Result));
await Task.WhenAny(insertionTasks);
}
static async Task<object> GetId() { await Task.Delay(300); return 1; }
static async Task PurgeId() => await Task.Delay(700);
static Task InsertId(int id) { Console.WriteLine(id); return Task.CompletedTask; }
static async Task<object> GetDescription() { await Task.Delay(200); return "desc"; }
static async Task PurgeDesciption() => await Task.Delay(1000);
static Task InsertDescription(string description) { Console.WriteLine(description); return Task.CompletedTask; }
}
class CoherentTasks
{
public Func<Task<object>> RetrieveTask { get; set; }
public Func<Task> PurgeTask { get; set; }
public Func<object, Task> InsertTask { get; set; }
public CoherentTasks(Func<Task<object>> retrieve, Func<Task> purge, Func<object, Task> insert)
=> (RetrieveTask, PurgeTask, InsertTask) = (retrieve, purge, insert);
}
-
1\$\begingroup\$ That is a nice approach, I like that. It does increase complexity a bit and an understanding of the returned IEnumerable from those web request but it does decrease startup and provide more explicit intent upfront at the sacrifice of of hiding some service layer domain intent. Wouldn't the boxing and unboxing hinder performance a bit especially as the collection continues to grow? \$\endgroup\$Greg– Greg2022年02月07日 19:36:25 +00:00Commented Feb 7, 2022 at 19:36
-
\$\begingroup\$ @Greg Boxing only matters in case of value types. If all your entities are reference types then there is no performance penalty. \$\endgroup\$Peter Csala– Peter Csala2022年02月07日 21:04:56 +00:00Commented Feb 7, 2022 at 21:04
cleanup.PurgeXYZ
andimporter.InsertXYZ
methods async? \$\endgroup\$WhenAll
is called those variables are in essence a completed task. \$\endgroup\$