I've been looking at some async comms in C#. As a proof of concept, I've written a simple multi-client echo server. The server allows multiple TCP clients to connect and listens for input from the clients. When it receives a complete line, it forwards the completed line to any other connected clients.
For testing, I used multiple telnet clients to connect and monitor messages, as well as a simple client for sending test messages:
TestClient
using System.Net.Sockets;
using System.Text;
namespace Client
{
class Program
{
static void Main(string[] args)
{
using (TcpClient client = new TcpClient("127.0.0.1", 4040))
{
SendMessage(client, "Hello\r\nThis is line two\r\nAnd line three\r\n");
string Line4 = "Finally, Line Four\r\n";
foreach(var character in Line4)
{
SendMessage(client, character.ToString());
}
}
}
static void SendMessage(TcpClient client, string messageToSend)
{
var buffer = Encoding.ASCII.GetBytes(messageToSend);
client.GetStream().Write(buffer, 0, buffer.Length);
}
}
}
The server itself listens on a known port and keeps running until a line is received from the console. It consists of 4 classes:
LineBufferedClient
- maintains state for the client, including async reads.ClientManager
- maintains the list of connected clients.Server
- responsible for listening for incoming connections and accepting them.Program
- Simple wrapper that bootstraps the server and waits for console exit command.
For the moment as it's a small POC, all the classes are in the same file:
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
namespace ServerPOC
{
class LineBufferedClient
{
public LineBufferedClient(TcpClient client) {
ReadBuffer = new byte[256];
CurrentLine = new StringBuilder();
Client = client;
}
public TcpClient Client { get; private set; }
public Byte[] ReadBuffer { get; private set; }
public StringBuilder CurrentLine { get; set; }
}
class ClientManager
{
List<LineBufferedClient> _clients = new List<LineBufferedClient>();
public void Add(TcpClient tcpClient)
{
var client = new LineBufferedClient(tcpClient);
var result = tcpClient.GetStream().BeginRead(client.ReadBuffer, 0, client.ReadBuffer.Length, DataReceived, client);
_clients.Add(client);
}
private void HandleCompleteLine(LineBufferedClient client, string line)
{
Console.WriteLine(line);
var buffer = Encoding.ASCII.GetBytes(line + "\n");
_clients.ForEach((connectedClient) => { if (connectedClient != client) connectedClient.Client.GetStream().Write(buffer, 0, buffer.Length); });
}
private void DataReceived(IAsyncResult ar)
{
var client = ar.AsyncState as LineBufferedClient;
var bytesRead = client.Client.GetStream().EndRead(ar);
if(bytesRead > 0)
{
var readString = Encoding.UTF8.GetString(client.ReadBuffer, 0, bytesRead);
while(readString.Contains("\n"))
{
var indexOfNewLine = readString.IndexOf('\n');
var left = readString.Substring(0, indexOfNewLine);
client.CurrentLine.Append(left);
var line = client.CurrentLine.ToString();
client.CurrentLine.Clear();
if(indexOfNewLine != readString.Length-1)
{
readString = readString.Substring(indexOfNewLine + 1);
}
else
{
readString = string.Empty;
}
HandleCompleteLine(client, line);
}
if(!string.IsNullOrEmpty(readString))
{
client.CurrentLine.Append(readString);
}
client.Client.GetStream().BeginRead(client.ReadBuffer, 0, 256, DataReceived, client);
}
else
{
_clients.Remove(client);
}
}
}
class Server
{
CancellationTokenSource _cts = new CancellationTokenSource();
private bool _shutdown = false;
int _serverPort=0;
private Thread _listenerThread;
private ClientManager _clientManager;
public Server(ClientManager clientManager)
{
_clientManager = clientManager;
}
public void Run(int serverPort)
{
_serverPort = serverPort;
_listenerThread = new Thread(ListenLoop);
_listenerThread.Start();
}
public void ListenLoop()
{
TcpListener listener = new TcpListener(new IPEndPoint(IPAddress.Any, _serverPort));
listener.Start();
while (!_shutdown)
{
try
{
var acceptTask = listener.AcceptTcpClientAsync();
acceptTask.Wait(_cts.Token);
var newClient = acceptTask.Result;
_clientManager.Add(newClient);
}
catch (OperationCanceledException)
{
// NOP - Shutting down
}
}
}
public void Stop()
{
_shutdown = true;
_cts.Cancel();
_listenerThread.Join();
}
}
class Program
{
static void Main(string[] args)
{
var clientManager = new ClientManager();
var server = new Server(clientManager);
server.Run(4040);
Console.WriteLine("Server running, press Enter to quit.");
Console.ReadLine();
server.Stop();
}
}
}
Any feedback's welcome. I'm particularly interested in feedback around any scalability issues this approach is likely to encounter, or if there is a more modern approach with C# for handling multiple clients.
1 Answer 1
There are a couple of critical issues with the original code that need to be addressed:
Exception handling
If the client at the other end shuts down cleanly, then the current approach will usually work OK. The BeginRead
operation completes, having read 0 bytes to indicate that the other side is no longer connected. However, it is possible that if processing takes a period of time, that another client will attempt to write to the socket prior to it being cleaned up. This can be simulated by adding a long running task stub to the HandleCompleteLine
method:
private void HandleCompleteLine(LineBufferedClient client, string line)
{
Console.WriteLine(line);
Thread.Sleep(2000); // Simulate long running task
var buffer = Encoding.ASCII.GetBytes(line + "\n");
Consequently the socket can get into an error state. At a minimum, the following two exceptions should be caught around both read and writes to the network stream:
IOException
- This is thrown, for example when an attempt to write to a socket takes place that has been closed at the other end.InvalidOperationException
- This is thrown when an operation is attempted on a closed socket (if a socket throws anIOException
it will be put into a closed state, so will subsequently throwInvalidOperationException
s in response to read/write requests).
Concurrency
Not all DataReceived
calls will take place on the same thread. It's possible that multiple calls (from different clients) could be being handled concurrently, particularly if the long running task described above is introduced. This means that shared/dependant resources need to be protected. With the current implementation the main concern is the _clients
list. It's possible that items could be added to / removed from the list while other threads are doing the same thing / iterating over the list. The list could be protected using locking, or a concurrent collection like ConcurrentDictionary
could be used instead.
Making these changes leads to the following more error tolerant code for the ClientManager
:
class ClientManager
{
ConcurrentDictionary<LineBufferedClient, LineBufferedClient> _clients = new ConcurrentDictionary<LineBufferedClient, LineBufferedClient>();
public void Add(TcpClient tcpClient)
{
var client = new LineBufferedClient(tcpClient);
var result = tcpClient.GetStream().BeginRead(client.ReadBuffer, 0, client.ReadBuffer.Length, DataReceived, client);
if (!_clients.TryAdd(client, client))
{
throw new InvalidOperationException("Tried to add connection twice");
}
}
private void HandleCompleteLine(LineBufferedClient client, string line)
{
Console.WriteLine(line);
Thread.Sleep(2000);
var buffer = Encoding.ASCII.GetBytes(line + "\n");
foreach(var entry in _clients)
{
var connectedClient = entry.Value;
if (connectedClient != client)
{
try
{
connectedClient.Client.GetStream().Write(buffer, 0, buffer.Length);
}
catch(Exception ex) when (ex is InvalidOperationException || ex is System.IO.IOException)
{
RemoveClient(connectedClient);
}
}
}
}
private void DataReceived(IAsyncResult ar)
{
var client = ar.AsyncState as LineBufferedClient;
var bytesRead = client.Client.GetStream().EndRead(ar);
if(bytesRead > 0)
{
var readString = Encoding.UTF8.GetString(client.ReadBuffer, 0, bytesRead);
while(readString.Contains("\n"))
{
var indexOfNewLine = readString.IndexOf('\n');
var left = readString.Substring(0, indexOfNewLine);
client.CurrentLine.Append(left);
var line = client.CurrentLine.ToString();
client.CurrentLine.Clear();
if(indexOfNewLine != readString.Length-1)
{
readString = readString.Substring(indexOfNewLine + 1);
}
else
{
readString = string.Empty;
}
HandleCompleteLine(client, line);
}
if(!string.IsNullOrEmpty(readString))
{
client.CurrentLine.Append(readString);
}
try
{
client.Client.GetStream().BeginRead(client.ReadBuffer, 0, 256, DataReceived, client);
}
catch (Exception ex) when (ex is InvalidOperationException || ex is System.IO.IOException)
{
RemoveClient(client);
}
}
else
{
RemoveClient(client);
}
}
private void RemoveClient(LineBufferedClient client)
{
LineBufferedClient ignored;
_clients.TryRemove(client, out ignored);
}
}
Break up the ClientManager
Looking at the code above, it seems like the ClientManager
is doing rather more than just managing the clients. It's involved maintaining the list of connected clients, transmitting to them, reading from them, orchestrating the buffering until a new line is received and performing processing on the received line. This isn't terrible for a proof of concept, however it feels like some of the responsibility should be shared around going forward.
When integrating the POC, LineBuffering has been extracted into it's own class.
TcpClient
in .Net. \$\endgroup\$