Pretty straightforward question. The methods SubscribeToOrdersAsync
and SubscribeToFillsAsync
are pretty similar due to ConnectionLost
and ConnectionRestored
.
What would be the best approach to follow the DRY principle? I created the SubscribeToAsync
method which is my point of view but I would like to know yours.
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets;
using FTX.Net.Interfaces.Clients;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace InvEx.FtxExchange;
public class FtxProducer : IHostedService
{
private readonly ILogger<FtxProducer> _logger;
private readonly IFTXSocketClient _socketClient;
public FtxProducer(ILogger<FtxProducer> logger, IFTXSocketClient socketClient)
{
_logger = logger;
_socketClient = socketClient;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
var ordersSub = await SubscribeToOrdersAsync(cancellationToken);
var fillsSub = await SubscribeToFillsAsync(cancellationToken);
await SubscribeToAsync(() => _socketClient.Streams.SubscribeToOrderUpdatesAsync(data =>
{
_logger.LogInformation("Orders: {@Data}", data.Data);
}, cancellationToken));
}
public Task StopAsync(CancellationToken cancellationToken)
{
return _socketClient.UnsubscribeAllAsync();
}
private async Task<UpdateSubscription?> SubscribeToAsync(Func<Task<CallResult<UpdateSubscription>>> func)
{
var subscription = await func();
if (subscription.Success)
{
subscription.Data.ConnectionLost += () => _logger.LogError("Connection lost");
subscription.Data.ConnectionRestored += _ => _logger.LogInformation("Connection restored");
return subscription.Data;
}
return null;
}
private async Task<UpdateSubscription?> SubscribeToOrdersAsync(CancellationToken cancellationToken = default)
{
var subscription = await _socketClient.Streams
.SubscribeToOrderUpdatesAsync(data =>
{
_logger.LogInformation("Orders: {@Data}", data.Data);
}, cancellationToken);
if (subscription.Success)
{
subscription.Data.ConnectionLost += () => _logger.LogError("Connection lost");
subscription.Data.ConnectionRestored += _ => _logger.LogInformation("Connection restored");
return subscription.Data;
}
return null;
}
private async Task<UpdateSubscription?> SubscribeToFillsAsync(CancellationToken cancellationToken = default)
{
var subscription = await _socketClient.Streams
.SubscribeToUserTradeUpdatesAsync(data =>
{
_logger.LogInformation("Fills: {@Data}", data.Data);
}, cancellationToken);
if (subscription.Success)
{
subscription.Data.ConnectionLost += () => _logger.LogError("Connection lost");
subscription.Data.ConnectionRestored += _ => _logger.LogInformation("Connection restored");
return subscription.Data;
}
return null;
}
}
-
1\$\begingroup\$ Hi @nop could you please review my proposed solution? Is it useful or do you have any question regarding that? \$\endgroup\$Peter Csala– Peter Csala2022年09月07日 06:25:50 +00:00Commented Sep 7, 2022 at 6:25
-
1\$\begingroup\$ @PeterCsala, hey, sorry, I didn't get notification about that one \$\endgroup\$nop– nop2022年09月08日 07:04:40 +00:00Commented Sep 8, 2022 at 7:04
1 Answer 1
Even though you have not explicitly mentioned it in your question but I assume you are using this library.
Your SubscribeToAsync
is a good starting point. But your to be provided func
parameters are still have much resemblance as you can see
await SubscribeToAsync(() => _socketClient.Streams.SubscribeToOrderUpdatesAsync(data =>
{
_logger.LogInformation("Orders: {@Data}", data.Data);
}, cancellationToken));
await SubscribeToAsync(() => _socketClient.Streams.SubscribeToUserTradeUpdatesAsync(data =>
{
_logger.LogInformation("Fills: {@Data}", data.Data);
}, cancellationToken));
If you want to move as much logic as possible into your SubscribeToAsync
then you have to assess the above method calls. What are the differences?
SubscribeToOrderUpdatesAsync
vsSubscribeToUserTradeUpdatesAsync
"Orders:
vs"Fills:
If you could provide these information to the SubscribeToAsync
methods then you could get rid of the SubscribeToOrdersAsync
and SubscribeToFillsAsync
methods entirely.
Fortunately you can do that:
private async Task<UpdateSubscription?> SubscribeToAsync<T>(
Func<IFTXSocketClientStreams, Func<Action<DataEvent<T>>, CancellationToken, Task<CallResult<UpdateSubscription>>>> methodSelector,
string logPrefix, CancellationToken token)
{
var asyncMethod = methodSelector(_socketClient.Streams);
var subscription = await asyncMethod(data => _logger.LogInformation(logPrefix +": {@Data}", data.Data), token);
if (!subscription.Success)
return null;
subscription.Data.ConnectionLost += () => _logger.LogError("Connection lost");
subscription.Data.ConnectionRestored += _ => _logger.LogInformation("Connection restored");
return subscription.Data;
}
- The
methodSelector
parameter: It select the to be called method from theIFTXSocketClientStreams
inteface- Since
FTXOrder
andFTXUserTrade
don't have a common base class that's why you have to make theSubscribeToAsync
generic
- Since
- The
logPrefix
parameter: The prefix of the logging message - The
token
parameter: The cancellation token
With this in your hand the entire class could be shortened like this:
public class FtxProducer : IHostedService
{
private readonly ILogger<FtxProducer> _logger;
private readonly IFTXSocketClient _socketClient;
public FtxProducer(ILogger<FtxProducer> logger, IFTXSocketClient socketClient)
=> (_logger, _socketClient) = (logger, socketClient);
public async Task StartAsync(CancellationToken cancellationToken)
{
await SubscribeToAsync<FTXOrder>(stream => stream.SubscribeToOrderUpdatesAsync, "Orders", cancellationToken);
await SubscribeToAsync<FTXUserTrade>(stream => stream.SubscribeToUserTradeUpdatesAsync, "Fills", cancellationToken);
}
public Task StopAsync(CancellationToken cancellationToken)
=> _socketClient.UnsubscribeAllAsync();
private async Task<UpdateSubscription?> SubscribeToAsync<T>(
Func<IFTXSocketClientStreams, Func<Action<DataEvent<T>>, CancellationToken, Task<CallResult<UpdateSubscription>>>> methodSelector,
string logPrefix, CancellationToken token)
{
var asyncMethod = methodSelector(_socketClient.Streams);
var subscription = await asyncMethod(data => _logger.LogInformation(logPrefix +": {@Data}", data.Data), token);
if (!subscription.Success) return null;
subscription.Data.ConnectionLost += () => _logger.LogError("Connection lost");
subscription.Data.ConnectionRestored += _ => _logger.LogInformation("Connection restored");
return subscription.Data;
}
}
UPDATE #1
May you just change
_logger.LogInformation(logPrefix +": {@Data}", data.Data)
to something more generic?
private async Task<UpdateSubscription?> SubscribeToWithLoggingAsync<T>(
Func<IFTXSocketClientStreams, Func<Action<DataEvent<T>>, CancellationToken, Task<CallResult<UpdateSubscription>>>> methodSelector,
string logPrefix, CancellationToken token)
=> await SubscribeToAsync(methodSelector, data => _logger.LogInformation(logPrefix +": {@Data}", data.Data), token);
private async Task<UpdateSubscription?> SubscribeToAsync<T>(
Func<IFTXSocketClientStreams, Func<Action<DataEvent<T>>, CancellationToken, Task<CallResult<UpdateSubscription>>>> methodSelector,
Action<DataEvent<T>> handler, CancellationToken token)
{
var asyncMethod = methodSelector(_socketClient.Streams);
var subscription = await asyncMethod(handler, token);
if (!subscription.Success) return null;
subscription.Data.ConnectionLost += () => _logger.LogError("Connection lost");
subscription.Data.ConnectionRestored += _ => _logger.LogInformation("Connection restored");
return subscription.Data;
}
- The
SubscribeToWithLoggingAsync
is a specialized version of the more genericSubscribeToAsync
- It specifies that the
handler
should perform some simple logging
- It specifies that the
- The
SubscribeToAsync
method's signature looks a bit terrifying now, but it is generic enough to be reused in multiple ways
UPDATE #2
public abstract Task<CallResult<UpdateSubscription>> SubscribeToTickerUpdatesAsync(string symbol, Action<DataEvent<FTXStreamTicker>> handler, CancellationToken ct = default(CancellationToken))
I wasn't able to call this method.
If you look at the signature of the method then you can see there is an extra input parameter called symbol
. The SubscribeToAsync
's methodSelector
is unaware of this.
In order to support the call of SubscribeToTickerUpdatesAsync
or SubscribeToTradeUpdatesAsync
like this:
await SubscribeWithSymbolToAsync<FTXStreamTicker>(stream => stream.SubscribeToTickerUpdatesAsync, "Ticker", cancellationToken);
await SubscribeWithSymbolToAsync<IEnumerable<FTXTrade>>(stream => stream.SubscribeToTradeUpdatesAsync, "Trade", cancellationToken);
we need the following trick:
private async Task<UpdateSubscription?> SubscribeWithSymbolToAsync<T>(
Func<IFTXSocketClientStreams, Func<string, Action<DataEvent<T>>, CancellationToken, Task<CallResult<UpdateSubscription>>>> methodSelector,
string symbol, CancellationToken token)
{
var asyncMethod = (Action<DataEvent<T>> handler, CancellationToken cToken) =>
methodSelector(_socketClient.Streams)(symbol, handler, cToken);
var subscription = await asyncMethod(data => _logger.LogInformation(symbol + ": {@Data}", data.Data), token);
return SubscribeToEvents(subscription);
}
private async Task<UpdateSubscription?> SubscribeToAsync<T>(
Func<IFTXSocketClientStreams, Func<Action<DataEvent<T>>, CancellationToken, Task<CallResult<UpdateSubscription>>>> methodSelector,
Action<DataEvent<T>> handler, CancellationToken token)
{
var asyncMethod = methodSelector(_socketClient.Streams);
var subscription = await asyncMethod(handler, token);
return SubscribeToEvents(subscription);
}
private UpdateSubscription SubscribeToEvents(CallResult<UpdateSubscription> subscription)
{
if (!subscription.Success) return null;
subscription.Data.ConnectionLost += () => _logger.LogError("Connection lost");
subscription.Data.ConnectionRestored += _ => _logger.LogInformation("Connection restored");
return subscription.Data;
}
- The
SubscribeWithSymbolToAsync
binds thesymbol
formal parameter of themethodSelector
to the providedsymbol
parameter- In other words, we convert a
Func<T1, T2, T3>
to aFunc<T2, T3>
by supplyingT1
- In other words, we convert a
- The
SubscribeWithSymbolToAsync
does not rely onSubscribeToAsync
so I extract the common part- The
SubscribeToEvents
handles the event subscribtions
- The
-
1\$\begingroup\$ Thank you very much! I really like your solution. May you just change
_logger.LogInformation(logPrefix +": {@Data}", data.Data)
to something more generic? Because I would be pushing messages to rabbitmq or kafka. The logger was just an example. \$\endgroup\$nop– nop2022年09月08日 07:12:03 +00:00Commented Sep 8, 2022 at 7:12 -
\$\begingroup\$ @nop Thanks, I will amend my post later today to address this concern as well \$\endgroup\$Peter Csala– Peter Csala2022年09月08日 07:20:01 +00:00Commented Sep 8, 2022 at 7:20
-
1\$\begingroup\$ @nop Updated, please check it! \$\endgroup\$Peter Csala– Peter Csala2022年09月08日 10:29:40 +00:00Commented Sep 8, 2022 at 10:29
-
1\$\begingroup\$ @nop I've updated my post. I hope you don't come up with a new requirement :) \$\endgroup\$Peter Csala– Peter Csala2022年09月08日 15:31:57 +00:00Commented Sep 8, 2022 at 15:31
-
1\$\begingroup\$ Thank you very much for the effort you put and the time you spent, it looks precious! \$\endgroup\$nop– nop2022年09月08日 22:53:09 +00:00Commented Sep 8, 2022 at 22:53