I have cleanly separated functionality of a message bus - namely the diagnostics (optional), sending, decoding and the bus driver, I need to now bind this together and have done so with a MessageBus
class.
It feels anemic though, with a lot of pass through methods.
On the flip side it is necessary for coordinating StartAsync()
which links dependencies on the driver
and receiver
(binding them so they can call back) - I saw no other way of achieving this if I registered all individual interfaces via an inversion of control container as it would get out of sync (how to bind the driver to the decoder for instance).
If this is the wrong place for comments on code please let me know and I will remove straight away. Any pointers much appreciated.
public class MessageBus : IDisposable
{
private readonly MessageBusReceiver receiver;
private readonly MessageBusPublisher publisher;
private readonly IMessageBusDriver driver;
private int isRunning;
private readonly MessageBusDiagnostics diagnostics = null;
public MessageBus(IMessageBusDriver driver, SerializerRegistry serializerRegistry, ITypeIdentifier typeIdentifier, ILoggerFactory loggerFactory, bool isDebug = false)
{
this.driver = driver;
this.publisher = new MessageBusPublisher(serializerRegistry, driver, typeIdentifier);
this.receiver = new MessageBusReceiver(serializerRegistry, typeIdentifier, loggerFactory.CreateLogger<MessageBusReceiver>());
if (isDebug)
diagnostics = new MessageBusDiagnostics(driver, loggerFactory.CreateLogger<MessageBusDiagnostics>());
}
void AssertIsRunning()
{
if (this.isRunning == 0)
throw new InvalidOperationException($"{nameof(StartAsync)} must be called before calling other methods.");
}
public async Task SubscribeAsync(string topic, QualityOfService qos, CancellationToken cancellationToken = default)
{
AssertIsRunning();
await this.driver.SubscribeAsync(topic, qos, null, null, null, cancellationToken);
}
public async Task UnsubscribeAsync(string topic, CancellationToken cancellationToken = default)
{
AssertIsRunning();
await this.driver.UnsubscribeAsync(topic, cancellationToken);
}
public async Task StartAsync(Func<object, Task> receiveApplicationMessage)
{
if (Interlocked.CompareExchange(ref this.isRunning, 1, 0) == 1)
throw new InvalidOperationException("Already running.");
this.receiver.OnMessageDecoded = receiveApplicationMessage;
await this.driver.StartAsync(this.receiver.ReceiveMessageFromMessageBusDriver);
}
public async Task StopAsync()
{
if (Interlocked.CompareExchange(ref this.isRunning, 0, 1) == 0)
throw new InvalidOperationException("Not running.");
await this.driver.StopAsync();
}
public async Task PublishAsync(object notification, string busPath, QualityOfService qos = QualityOfService.AtMostOnce, CancellationToken cancellationToken = default)
{
AssertIsRunning();
await this.publisher.PublishAsync(notification, busPath, qos, cancellationToken);
}
public async Task PublishAsync(object notification, Dictionary<string, string[]> pathTokens = null, QualityOfService qos = QualityOfService.AtMostOnce, CancellationToken cancellationToken = default)
{
AssertIsRunning();
await this.publisher.PublishAsync(notification, pathTokens, qos, cancellationToken);
}
public void Dispose()
{
this.driver.Dispose();
this.diagnostics?.Dispose();
}
}
1 Answer 1
I think what i was missing was a "builder" class that could take care of the construction, and allow the interfaces to be returned.
Not all transport types needed to be "started", so i left that concern to the builder of the specific bus (i.e. MQTT transport would have different requirements to Kafka for instance).
Any "stopping" can be done on dispose - any one not wanting to receive messages before then can unsubscribe via the ISubscriber
interface.
So we build the base MessageBusBuilderBase
class:
public abstract class MessageBusBuilderBase
{
private MessageBusDiagnostics messageBusDiagnostics;
public bool IsDebugEnabled => messageBusDiagnostics != null;
protected abstract Task<ITransport> NewTransportAsync(ILoggerFactory loggerFactory, Func<TransportMessage, Task> receiveMessage);
public async Task<BuiltInterfaces> CreateAsync(IMessageSerializer[] serializers, ITypeIdentifier typeIdentifier, ILoggerFactory loggerFactory, bool isDebug = false)
{
var serializerRegistry = new SerializerRegistry(serializers);
var receiver = new MessageDeserializer(serializerRegistry, typeIdentifier, loggerFactory.CreateLogger<MessageDeserializer>());
var transport = await NewTransportAsync(loggerFactory, receiver.ReceiveMessageFromTransport);
var publisher = new MessagePublisher(serializerRegistry, transport, typeIdentifier);
var subscriber = new MessageSubscriber(transport);
// chain up event handler functions
receiver.OnMessageDecoded = subscriber.ReceiveMessageFromDeserializer;
if (isDebug)
this.messageBusDiagnostics = new MessageBusDiagnostics(transport, loggerFactory.CreateLogger<MessageBusDiagnostics>());
return new BuiltInterfaces(publisher, subscriber);
}
}
And the specific derived class for MqttMessageBusBuilder
:
public class MqttMessageBusBuilder : MessageBusBuilderBase
{
private readonly MqttSettings settings;
public MqttMessageBusBuilder(MqttSettings settings)
{
this.settings = settings;
}
protected override async Task<ITransport> NewTransportAsync(ILoggerFactory loggerFactory, Func<TransportMessage, Task> receiveMessage)
{
var logger = loggerFactory.CreateLogger<MqttMessageBusBuilder>();
logger.LogDebug($"{nameof(MqttMessageBusBuilder)} is creating a new {nameof(MqttConnection)} and will connect to {this.settings.Server}:{this.settings.Port}.");
var connection = new MqttConnection(this.settings, receiveMessage, loggerFactory.CreateLogger<MqttConnection>());
await connection.StartAsync();
logger.LogDebug($"{nameof(MqttConnection)} has been started via {nameof(MqttConnection.StartAsync)}");
return connection;
}
}
With the above, we now have the interfaces chained correctly, and can register the instances with a inversion of control container clearly separating the concerns of publishing and subscribing:
var mqttBusBuilder = new MqttMessageBusBuilder(settings);
var serializers = new [] { MessagePackSerializer };
var idStrategy = new IdentifyUsingTransportHeaders();
mqttBusBuilder
.Create(serializers, idStrategy, loggerFactory, var out publisher, var out subscriber);
container.RegisterSingleton(publisher);
container.RegisterSingleton(subscriber);
With regards to "stopping" the bus, I could create a BusManager
class to manage a list of created busses and call StopAsync
on call - alternatively will just perform this transparently by stopping in Dispose()
and let that be a concern of the bus itself as i see other busses do not have Start
or Stop
functions so I shouldn't let that be the driver of my interface design.
-
1\$\begingroup\$
Task.Run(async () => await connection.StartAsync());
returnsTask
which is not awaited in the code. That's bad practice inasync
programming. Also running awaitable method on a separate thread has no sense. You can optimize out thread and keep the same "not awaited" mistake with_ = connection.StartAsync()
but I suggest to fix it e.g. simpleawait connection.StartAsync()
, probably not in this method but outside in a caller's code. \$\endgroup\$aepot– aepot2021年04月03日 12:51:47 +00:00Commented Apr 3, 2021 at 12:51 -
1\$\begingroup\$ Valid point - thought was some form of background long running task (which it is not in this case) will get this fixed. \$\endgroup\$morleyc– morleyc2021年04月04日 14:49:17 +00:00Commented Apr 4, 2021 at 14:49
async
State Machines here as you have only oneawait
and no code after it and it's not insideusing
ortry-catch
statement. For examplepublic Task UnsubscribeAsync(...) { AssertIsRunning(); return this.driver.UnsubscribeAsync(...); }
\$\endgroup\$ConfigureAwait
I believe would be best practice (in some cases) medium.com/bynder-tech/… \$\endgroup\$ConfigureAwait(false)
doesn't affect the above tip. \$\endgroup\$