I currently have a class that I want to serialize/deserialize messages sent to this socket.
My plan was to use Task.WhenAny()
to monitor 2 tasks (Either watch something that needs to be written, or read from the stream). If I have a message waiting to be sent, it would write it to the stream, otherwise it would attempt to read from the stream. My result yielded some pretty garbage code (Having to create other methods for read/write), and using a private class for return value, as I can't use Task.WhenAny
with one value returning an int, and another returning IPayload
.
I'm worried about edge cases, what happens if (albeit unlikely), a read happens right when IPayload
happens. For example: if I partially read, and the queue gets a payload.
Trying to find the best approach to do this:
public class GameSocket
{
private readonly string _host;
private readonly int _port;
private readonly ISerializer _serializer;
private readonly ILogger _logger;
private readonly TcpClient _client = new TcpClient();
private readonly AsyncProducerConsumerQueue<IPayload> _outgoingQueue = new AsyncProducerConsumerQueue<IPayload>();
private enum OperationType : byte
{
Read = 0,
Write = 1
}
private class ClientOperation
{
public int ReadLength;
public IPayload ToWrite;
public OperationType Type;
}
public GameSocket([NotNull] string host, int port, [NotNull] ISerializer serializer, ILogger logger = null)
{
_host = host ?? throw new ArgumentNullException(nameof(host));
if (port < 0 || port > 65535) throw new ArgumentOutOfRangeException(nameof(port), "Invalid port provided.");
_port = port;
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
_logger = logger ?? new DebugLogger("GameSocket");
_client = new TcpClient();
}
public GameSocket(IPEndPoint endpoint, ISerializer serializer, ILogger logger = null) : this(endpoint.Address.ToString(), endpoint.Port, serializer, logger)
{
}
public async Task Run(CancellationToken ct)
{
try
{
await _client.ConnectAsync(_host, _port);
using (var stream = _client.GetStream())
{
while (true)
{
var buffer = new byte[_serializer.HeaderSize];
var res = await Task.WhenAny(ReadAsync(stream, buffer, ct), GetPayload(ct));
switch (res.Result.Type)
{
case OperationType.Write:
var sendBytes = _serializer.Serialize(res.Result.ToWrite);
if (_logger.IsEnabled(LogLevel.Trace))
{
_logger.LogTrace($"Writing bytes: {sendBytes}");
}
await stream.WriteAsync(sendBytes, 0, sendBytes.Length, ct);
break;
case OperationType.Read:
var readLength = res.Result.ReadLength;
if (readLength == 0)
{
_logger.LogWarning("Connection Closed. Read Length=0");
return;
}
var deserialized = _serializer.Deserialize(buffer);
//Find handlers
//Dispatch method
break;
}
}
}
}
finally
{
_client.Dispose();
}
}
public Task SendMessage(IPayload payload)
{
return _outgoingQueue.EnqueueAsync(payload);
}
private async Task<ClientOperation> ReadAsync(NetworkStream stream, byte[] buffer, CancellationToken ct)
{
var readLength = await stream.ReadAsync(buffer, 0, buffer.Length, ct);
return new ClientOperation
{
ReadLength = readLength,
Type = OperationType.Read
};
}
private async Task<ClientOperation> GetPayload(CancellationToken ct)
{
var item = await _outgoingQueue.DequeueAsync(ct);
return new ClientOperation
{
ToWrite = item,
Type = OperationType.Write
};
}
}
1 Answer 1
Race Condition
You are starting two tasks concurrently, each blocking on an event to occur.
var res = await Task.WhenAny(ReadAsync(stream, buffer, ct), GetPayload(ct));
By calling WhenAny
, once one of tasks is completed, you continue without awaiting completion of the other task. Let's say that ReadAsync
completes first, this means the following code rus in a task that is fired and forgotten:
var item = await _outgoingQueue.DequeueAsync(ct);
You immediately go to a next loop of awaiting 2 newly created tasks: ReadAsync
and GetPayload
. However, since the previous GetPayload
was not awaited upon, the new call to it will block until that one is finished (they both want to acquire a lock on the mutex of AsyncProducerConsumerQueue
).
And here is the race condition: the old (forgotten) task completes but no-one would care about it. No handler is called on completion, because you only handle the task that completes first of the pair or newly created tasks.
Frankly, I don't know why you would even consider using this pattern. These are two independant tasks that should have their own scope and loop.
-
\$\begingroup\$ I was considering using this pattern, because I thought you could only have one concurrent reader and writer, and wanted a way to nicely read and write without involving locks. \$\endgroup\$Blue– Blue2019年08月08日 09:01:23 +00:00Commented Aug 8, 2019 at 9:01
Explore related questions
See similar questions with these tags.
WhenAny
means that whichever of the two tasks completes first (ish) will be observed, and the other will be ignored: it will not be cancelled, the task will wait on. For the read, this means losing whatever was read; forGetPayload
, this means losing whatever is on top of the queue. This isn't an unlikely event (it will occur every 'cycle' inRun
), and I doubt it would pass even a cursory testing. \$\endgroup\$