Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit c10fcf2

Browse files
Merge pull request #549 from jehrenzweig-captionhealth/je/strategy_2_publishing_messages_in_batches_20250205
Refactored PublisherConfirms.cs
2 parents dc2fe75 + f8095d8 commit c10fcf2

File tree

1 file changed

+18
-29
lines changed

1 file changed

+18
-29
lines changed

‎dotnet/PublisherConfirms/PublisherConfirms.cs‎

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
using System.Buffers.Binary;
1+
using RabbitMQ.Client;
2+
using System.Buffers.Binary;
23
using System.Diagnostics;
34
using System.Text;
4-
using RabbitMQ.Client;
55

66
const ushort MAX_OUTSTANDING_CONFIRMS = 256;
77

@@ -83,38 +83,31 @@ async Task PublishMessagesInBatchAsync()
8383
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
8484
string queueName = queueDeclareResult.QueueName;
8585

86-
int batchSize = MAX_OUTSTANDING_CONFIRMS / 2;
87-
int outstandingMessageCount = 0;
86+
int batchSize = Math.Max(1, MAX_OUTSTANDING_CONFIRMS / 2);
8887

89-
var sw = new Stopwatch();
90-
sw.Start();
88+
var sw = Stopwatch.StartNew();
9189

9290
var publishTasks = new List<ValueTask>();
9391
for (int i = 0; i < MESSAGE_COUNT; i++)
9492
{
9593
byte[] body = Encoding.UTF8.GetBytes(i.ToString());
96-
publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, mandatory: true, basicProperties: props));
97-
outstandingMessageCount++;
94+
ValueTaskpublishTask=channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body, mandatory: true, basicProperties: props);
95+
publishTasks.Add(publishTask);
9896

99-
if (outstandingMessageCount == batchSize)
100-
{
101-
foreach (ValueTask pt in publishTasks)
102-
{
103-
try
104-
{
105-
await pt;
106-
}
107-
catch (Exception ex)
108-
{
109-
Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'");
110-
}
111-
}
112-
publishTasks.Clear();
113-
outstandingMessageCount = 0;
114-
}
97+
await MaybeAwaitPublishes(publishTasks, batchSize);
11598
}
11699

117-
if (publishTasks.Count > 0)
100+
// Await any remaining tasks in case message count was not
101+
// evenly divisible by batch size.
102+
await MaybeAwaitPublishes(publishTasks, 0);
103+
104+
sw.Stop();
105+
Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages in batch in {sw.ElapsedMilliseconds:N0} ms");
106+
}
107+
108+
static async Task MaybeAwaitPublishes(List<ValueTask> publishTasks, int batchSize)
109+
{
110+
if (publishTasks.Count >= batchSize)
118111
{
119112
foreach (ValueTask pt in publishTasks)
120113
{
@@ -128,11 +121,7 @@ async Task PublishMessagesInBatchAsync()
128121
}
129122
}
130123
publishTasks.Clear();
131-
outstandingMessageCount = 0;
132124
}
133-
134-
sw.Stop();
135-
Console.WriteLine($"{DateTime.Now} [INFO] published {MESSAGE_COUNT:N0} messages in batch in {sw.ElapsedMilliseconds:N0} ms");
136125
}
137126

138127
async Task HandlePublishConfirmsAsynchronously()

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /