In a docker compose environment there are several apps that communicate with each other. One of these apps is a logger (a centralized logger). Every part of the system that needs to log something, will call the logger's APIs (which is written with gRPC).
The logger adds these log requests to a queue (a BlockingCollection) which will be consumed by workers. These workers will read from queue and store them in DB. In order to reduce DB calls, they will use a local buffer. That buffer will be flushed every (n) seconds or after getting (m) records.
These workers need to dynamically be created or disposed based on how many items are in the queue. For example for each (n) items there should be a worker.
This is the big picture:
And now the implementation,
The log entity:
public class ActivityLog
{
public string Id { get; set; }
public IPAddress IpAddress { get; set; }
public string? Parameters { get; set; }
// ...
}
gRPC proto contract:
syntax = "proto3";
import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
package logger;
service Logger {
rpc LogActivity (ActivityLogPayload) returns (google.protobuf.Empty);
}
message ActivityLogPayload {
string ip_address = 1;
google.protobuf.StringValue parameters = 2;
// ...
}
gRPC service:
public class LoggerRpc : Logger.LoggerBase
{
private readonly LogQueue queue;
public LoggerRpc(LogQueue queue)
{
this.queue = queue;
}
public override Task<Empty> LogActivity(ActivityLogPayload request, ServerCallContext context)
{
var logEntity = request.ToEntity();
queue.ActivityLogsCollection.Add(logEntity);
return Task.FromResult(new Empty());
}
}
The log queue:
public class LogQueue
{
public BlockingCollection<ActivityLog> ActivityLogsCollection { get; }
public LogQueue()
{
ActivityLogsCollection = new(10_000);
}
}
The worker:
public class LogWorker : IDisposable
{
private const int BufferThreshold = 100;
private readonly TimeSpan BufferTimeLimit = TimeSpan.FromSeconds(5);
private DateTimeOffset lastDbUpdate = DateTimeOffset.UtcNow;
private readonly BlockingCollection<ActivityLog> queue;
private readonly IServiceScopeFactory ssf;
private CancellationTokenSource cts = new();
private Task workerTask;
public LogWorker(
BlockingCollection<ActivityLog> queue,
IServiceScopeFactory ssf)
{
this.queue = queue;
this.ssf = ssf;
workerTask = new Task(Work, TaskCreationOptions.LongRunning);
workerTask.Start(TaskScheduler.Default);
}
private void Work()
{
using var scope = ssf.CreateScope();
using var dbContext = scope.ServiceProvider.GetRequiredService<LogDbContext>();
var localBuffer = new List<ActivityLog>();
ActivityLog? nextItem;
try
{
while (queue.TryTake(out nextItem, -1, cts.Token))
{
localBuffer.Add(nextItem);
// make a DB call when we have 100 logs
// or 5 seconds has passes from last DB call
while (localBuffer.Count < BufferThreshold
&& DateTime.UtcNow - lastDbUpdate < BufferTimeLimit)
{
// wait for the remaining time of our 5 second window
var waitTime = (int)(BufferTimeLimit - (DateTime.UtcNow - lastDbUpdate)).TotalMilliseconds;
if (queue.TryTake(out nextItem, waitTime, cts.Token))
localBuffer.Add(nextItem);
}
dbContext.AddRange(localBuffer);
dbContext.SaveChanges();
localBuffer.Clear();
lastDbUpdate = DateTime.UtcNow;
}
}
catch (ObjectDisposedException)
{
// queue completed
}
catch(OperationCanceledException)
{
// worker is shutting down
}
finally
{
// clear the buffer
if (localBuffer.Any())
{
dbContext.AddRange(localBuffer);
dbContext.SaveChanges();
}
}
}
public void Dispose()
{
cts.Cancel();
workerTask.Wait();
}
}
Worker manager:
public class LogWorkerManager : IHostedService
{
private const int MinimumWorkersCount = 2;
private const int LogsThreshold = 200;
private readonly TimeSpan TimerInterval = TimeSpan.FromSeconds(2);
private readonly List<LogWorker> workers = new();
private readonly LogQueue queue;
private readonly IServiceScopeFactory ssf;
Timer? timer;
public LogWorkerManager(
LogQueue queue,
IServiceScopeFactory ssf)
{
this.queue = queue;
this.ssf = ssf;
}
private void MonitorQueue(object? state)
{
var workersNeeded = (queue.ActivityLogsCollection.Count / LogsThreshold) + 1;
workersNeeded = Math.Max(workersNeeded, MinimumWorkersCount);
if (workers.Count > workersNeeded)
{
// gradually reduce workers
var worker = workers.Last();
worker.Dispose();
workers.Remove(worker);
}
else if (workers.Count < workersNeeded)
{
for (int i = 0; i < workersNeeded - workers.Count; i++)
{
var worker = new LogWorker(queue.ActivityLogsCollection, ssf);
workers.Add(worker);
}
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
timer = new(MonitorQueue, null, 0, (int)TimerInterval.TotalMilliseconds);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
timer?.Change(Timeout.Infinite, Timeout.Infinite);
timer?.Dispose();
Parallel.ForEach(workers, worker => worker.Dispose());
return Task.CompletedTask;
}
}
DI:
builder.Services.AddSingleton<LogQueue>();
builder.Services.AddHostedService<LogWorkerManager>();
1 Answer 1
This looks like an XY-problem.
The only work done in the worker is the DB Insert. If the worker can't keep up it's because the DB can't keep up. By creating additional workers the work is just moved to a new queue without a limit.
DbContext
is not designed to be long living.
The items added to the context will continue to be tracked until cleared or disposed.
This could be the source of poor performance.
Explore related questions
See similar questions with these tags.
Manager
.) \$\endgroup\$LogWorker
performs work with the database. These are IO operations, they should be performed asynchronously:DbContext.SaveChangesAsync
and so on. Implement theIAsyncDisposable
Interface. Don't useworkerTask.Wait();
in theDispose
method. Useawait workerTask
inDisposeAsync
method. \$\endgroup\$IServiceScopeFactory
or can we do with injectingDbContext
directly into the class constructor? \$\endgroup\$