I have some code that populate EventfulConcurrentQueue
which is ConcurrentQueue
. So I need to save all data from EventfulConcurrentQueue
to a database.
I think to use sort of multithreading model in order to dequeue items.
So EventfulConcurrentQueue
should be depopulated from different threads that grab the part of items of EventfulConcurrentQueue
and that items will insert to a database.
So far I have following code.
Usage of EventfulConcurrentQueue
public partial class FormMain: Form {
private EventfulConcurrentQueue < PacketItem > PacketQueue {
get;
set;
} = new EventfulConcurrentQueue < PacketItem > ();
private void FormMain_Load(object sender, EventArgs e) {
StartPersistingTask();
}
private async Task StartPersistingTask() {
var chunkOfPacketsToInsert = 100;
try {
while (true) {
if (PacketQueue.Count > chunkOfPacketsToInsert) {
int counter = 0;
var chunkToBulkInsert = DataHelper.CreateBulkCopyDataTable();
while (counter <= chunkOfPacketsToInsert) {
PacketItem packetItem = null;
PacketQueue.TryDequeue(out packetItem);
if (packetItem != null) {
chunkToBulkInsert.Rows.Add(Guid.NewGuid(), packetItem.AtTime, packetItem.GPSTrackerID, packetItem.PacketBytes);
counter++;
} else {
break;
}
};
//create object of SqlBulkCopy which help to insert
using
var bulkCopy = new SqlBulkCopy(ConfigurationManager.ConnectionStrings["MyDB"].ConnectionString, SqlBulkCopyOptions.Default);
bulkCopy.DestinationTableName = "MyTable";
try {
bulkCopy.WriteToServer(chunkToBulkInsert);
} catch (Exception ex) {
Log(ex, LogErrorEnums.Fatal);
} finally {
Log(LogErrorEnums.Info, $ "Items: {counter}");
}
} else {
await Task.Delay(100);
}
}
} catch (Exception ex) {
Log(ex, LogErrorEnums.Error);
}
}
}
Definition of EventfulConcurrentQueue
public sealed class EventfulConcurrentQueue < T > {
private readonly ConcurrentQueue < T > _queue;
public EventfulConcurrentQueue() {
_queue = new ConcurrentQueue < T > ();
}
public int Count {
get {
return _queue.Count;
}
}
public bool IsEmpty {
get {
return _queue.IsEmpty;
}
}
public void Enqueue(T item) {
_queue.Enqueue(item);
OnItemEnqueued();
}
public bool TryDequeue(out T result) {
var success = _queue.TryDequeue(out result);
if (success) {
OnItemDequeued(result);
}
return success;
}
public event EventHandler ItemEnqueued;
public event EventHandler < ItemDequeuedEventArgs < T >> ItemDequeued;
void OnItemEnqueued() {
ItemEnqueued?.Invoke(this, EventArgs.Empty);
}
void OnItemDequeued(T item) {
ItemDequeued?.Invoke(this, new ItemDequeuedEventArgs < T > {
Item = item
});
}
}
public sealed class ItemDequeuedEventArgs < T >: EventArgs {
public T Item {
get;
set;
}
}
public sealed class PacketItem {
public int GPSTrackerID {
get;
set;
}
public DateTime AtTime {
get;
set;
}
public byte[] PacketBytes {
get;
set;
}
public int GPSTrackerTypeID {
get;
set;
}
/// <summary>
/// Parsed by device's protocol packet
/// </summary>
public object ParsedPacket {
get;
set;
}
}
-
1\$\begingroup\$ Could you please format your code? \$\endgroup\$Peter Csala– Peter Csala2021年05月04日 08:00:43 +00:00Commented May 4, 2021 at 8:00
-
1\$\begingroup\$ BulkCopy shines when you want to insert a massive amount of records into the database. I think 100 items is too small for BulkCopy. For 100 items a stored procedure with TVP might be a better fit in my opinion. \$\endgroup\$Peter Csala– Peter Csala2021年05月04日 08:29:06 +00:00Commented May 4, 2021 at 8:29
-
\$\begingroup\$ @PeterCsala hey! thanks for review! yeah, I know it. It's just for testing only. In fact code should support 10000 records per seconds to process and insert data. \$\endgroup\$NoWar– NoWar2021年05月04日 08:36:26 +00:00Commented May 4, 2021 at 8:36
1 Answer 1
I think the EventfulConcurrentQueue
was not designed for this use case.
From the consumer perspective:
- Things that are not in use
IsEmpty
Enqueue
ItemEnqueued
ItemDequeued
- Things that are missing
DequeueAtMostNItems
WaitUntilThereAreEnoughItems
After I removed the not used things and added the missing ones the class would look like this:
public sealed class EventfulConcurrentQueue<T>
{
private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
public List<T> DequeueAtMostNItems(int n)
{
List<T> result = new();
T current;
do
{
if(_queue.TryDequeue(out current))
result.Add(current);
} while (current != null && result.Count <= n);
return result;
}
public async Task WaitUntilThereAreEnoughItems(int numberOfItems)
{
while (_queue.Count < numberOfItems)
{
await Task.Delay(100);
}
return;
}
}
With these methods you are embracing encapsulation. The operation and data are next to each other. And with this approach you don't have to expose anything internal, so the information hiding aspect is covered as well.
If you stick with this implementation then I would suggest a renaming, because the ConcurrentQueue
now becomes an implementation detail.
The usage could be simplified like this:
while (true)
{
await PacketQueue.WaitUntilThereAreEnoughItems(chunkSize);
var chunk = PacketQueue.DequeueAtMostNItems(chunkSize);
DataTable toBeInserted = DataHelper.CreateBulkCopyDataTable();
chunk.ForEach(row => toBeInserted.Rows.Add(Guid.NewGuid(), row.AtTime, row.GPSTrackerID, row.PacketBytes));
using var bulkCopy = ...
}
-
1\$\begingroup\$ Hey bro! It seems like an absolutly cleaner and balanced approach. Thanks a lot! Cheers! \$\endgroup\$NoWar– NoWar2021年05月05日 02:12:03 +00:00Commented May 5, 2021 at 2:12