12
\$\begingroup\$

I've been doing network programming using C#'s TcpClient for several years. The code below is an asynchronous wrapper for TcpClient that I developed throughout these years.

The key methods are:

  • ConnectAsync() - connects asynchronously; RemoteServerInfo is a simple class containing Host, Port, and a boolean indicating whether this is an SSL connection.
  • StartReceiving() - initiates the data reading callbacks; this method is necessary to allow time for events to be hooked up before data starts being processed
  • DataReceivedCallback() - processes data as it is received and passes it on to any subscribed event handlers
  • SendAsync() - sends data asynchronously

Some other things to notice:

  • The code uses the old Asynchronous Programming Model for asynchrony.
  • There is some play with buffer sizes. The intention of this is to have adaptive buffer sizes - using a small amount of memory most of the time, but expanding to better cater for larger incoming data (up to a specified maximum) when necessary.
  • I am using a goto statement. This might send cold shivers down your spine, but I thought it was fine in this case. Read this answer if you're religious about never using goto in any situation whatsoever.

I would really appreciate code review from other developers (especially from network programmers) to see if this implementation can be improved further. Some things that come to mind include better performance, better use of TAP over APM, and any possible subtle bugs I might have missed.

Here is the code for AsyncTcpClient:

public class AsyncTcpClient : IDisposable
{
 private bool disposed = false;
 private TcpClient tcpClient;
 private Stream stream;
 private int minBufferSize = 8192;
 private int maxBufferSize = 15 * 1024 * 1024;
 private int bufferSize = 8192;
 private int BufferSize
 {
 get
 {
 return this.bufferSize;
 }
 set
 {
 this.bufferSize = value;
 if (this.tcpClient != null)
 this.tcpClient.ReceiveBufferSize = value;
 }
 }
 public int MinBufferSize
 {
 get
 {
 return this.minBufferSize;
 }
 set
 {
 this.minBufferSize = value;
 }
 }
 public int MaxBufferSize
 {
 get
 {
 return this.maxBufferSize;
 }
 set
 {
 this.maxBufferSize = value;
 }
 }
 public int SendBufferSize
 {
 get
 {
 if (this.tcpClient != null)
 return this.tcpClient.SendBufferSize;
 else
 return 0;
 }
 set
 {
 if (this.tcpClient != null)
 this.tcpClient.SendBufferSize = value;
 }
 }
 public event EventHandler<byte[]> OnDataReceived;
 public event EventHandler OnDisconnected;
 public bool IsConnected
 {
 get
 {
 return this.tcpClient != null && this.tcpClient.Connected;
 }
 }
 public AsyncTcpClient()
 {
 }
 public async Task SendAsync(byte[] data)
 {
 try
 {
 await Task.Factory.FromAsync(this.stream.BeginWrite, this.stream.EndWrite, data, 0, data.Length, null);
 await this.stream.FlushAsync();
 }
 catch (IOException ex)
 {
 if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
 ; // ignore
 else if (this.OnDisconnected != null)
 this.OnDisconnected(this, null);
 }
 }
 public async Task ConnectAsync(RemoteServerInfo remoteServerInfo, CancellationTokenSource cancellationTokenSource = null)
 {
 try
 {
 await Task.Run(() => this.tcpClient = new TcpClient());
 await Task.Factory.FromAsync(this.tcpClient.BeginConnect, this.tcpClient.EndConnect,
 remoteServerInfo.Host, remoteServerInfo.Port, null);
 // get stream and do SSL handshake if applicable
 this.stream = this.tcpClient.GetStream();
 if (remoteServerInfo.Ssl)
 {
 var sslStream = new SslStream(this.stream);
 sslStream.AuthenticateAsClient(remoteServerInfo.Host);
 this.stream = sslStream;
 }
 if (cancellationTokenSource != null && cancellationTokenSource.IsCancellationRequested)
 {
 this.Dispose();
 this.stream = null;
 }
 }
 catch(Exception)
 {
 // if task has been cancelled, then we don't care about the exception;
 // if it's still running, then the caller must receive the exception
 if (cancellationTokenSource == null || !cancellationTokenSource.IsCancellationRequested)
 throw;
 }
 }
 public void StartReceiving()
 {
 byte[] buffer = new byte[bufferSize];
 this.stream.BeginRead(buffer, 0, buffer.Length, DataReceivedCallback, buffer);
 }
 protected virtual void DataReceivedCallback(IAsyncResult asyncResult)
 {
 try
 {
 byte[] buffer = asyncResult.AsyncState as byte[];
 int bytesRead = this.stream.EndRead(asyncResult);
 if (bytesRead > 0)
 {
 // adapt buffer if it's too small / too large
 if (bytesRead == buffer.Length)
 this.BufferSize = Math.Min(this.BufferSize * 10, this.maxBufferSize);
 else
 {
 reduceBufferSize:
 int reducedBufferSize = Math.Max(this.BufferSize / 10, this.minBufferSize);
 if (bytesRead < reducedBufferSize)
 {
 this.BufferSize = reducedBufferSize;
 if (bytesRead > this.minBufferSize)
 goto reduceBufferSize;
 }
 }
 // forward received data to subscriber
 if (this.OnDataReceived != null)
 {
 byte[] data = new byte[bytesRead];
 Array.Copy(buffer, data, bytesRead);
 this.OnDataReceived(this, data);
 }
 // recurse
 byte[] newBuffer = new byte[bufferSize];
 this.stream.BeginRead(newBuffer, 0, newBuffer.Length, DataReceivedCallback, newBuffer);
 }
 else
 this.OnDisconnected(this, null);
 }
 catch(ObjectDisposedException) // can occur when closing, because tcpclient and stream are disposed
 {
 // ignore
 }
 catch(IOException ex)
 {
 if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
 ; // ignore
 else if (this.OnDisconnected != null)
 this.OnDisconnected(this, null);
 }
 }
 protected virtual void Dispose(bool disposing)
 {
 if (!disposed)
 {
 if (disposing)
 {
 // Dispose managed resources.
 if (this.tcpClient != null)
 {
 this.tcpClient.Close();
 this.tcpClient = null;
 }
 }
 // There are no unmanaged resources to release, but
 // if we add them, they need to be released here.
 }
 disposed = true;
 // If it is available, make the call to the
 // base class's Dispose(Boolean) method
 // base.Dispose(disposing);
 }
 public void Dispose()
 {
 Dispose(true);
 GC.SuppressFinalize(this);
 }
}
asked Feb 28, 2015 at 8:52
\$\endgroup\$
9
  • 1
    \$\begingroup\$ Good stuff. I started a project here. github.com/sethcall/async-helper I have some more commits/changes coming. But an initial version which is essentially the poster + jasonlind's comments is also on nuget: nuget.org/packages/AsyncTcpClient \$\endgroup\$ Commented Jun 15, 2016 at 2:57
  • 1
    \$\begingroup\$ Nice! I wanted to perfect this but never had the time. \$\endgroup\$ Commented Jun 15, 2016 at 9:51
  • \$\begingroup\$ Why not return the parsed(or byte[]) response in the same function SendAsync like a normal service call function ??? its normal for a service call that you pass parameters and get a response back \$\endgroup\$ Commented Aug 24, 2017 at 10:45
  • \$\begingroup\$ Because data may be sent spontaneously by the server without even being requested. \$\endgroup\$ Commented Aug 24, 2017 at 15:05
  • \$\begingroup\$ @sethcall I am looking at your tests and have cpl questions: (a) 1st, why in your ServerDisconnect are you calling 2ice SendAsync? (b) 2nd, why in your Send() are you calling Thread.Sleep(1000) after sending? (c) 3rd, in your ConnectDisconnect, you are disconnecting on last line but not asserting that it is disconnected after, did you forget to assert? and (d) most importantly, how do I know that entire response has been received from server (this is important to me), so I can build a list from the response bytes? Thanks a million! \$\endgroup\$ Commented Mar 2, 2019 at 1:09

3 Answers 3

9
\$\begingroup\$

You should only use Task.Run to start threads, which you don't want to do when you're doing IO, at least directly. You should let the runtime make that decision. Also you need to make sure your tcpClient isn't already connected. There's also a tcpClient.ConnectAsync that retruns a Task so you should use that.

Also you should never pass a CancellationTokenSource to an async method, use CancellationToken. And the .NET async library is designed to throw an OperationCanceledException on cancelation so the task is marked canceled, so use that. It's also bad practice to dispose of a class from within itself, that can have some very undeseried effects, simply close and dispose of your tcpListener

So ConnectAsync could look like:

private async Task Close()
{
 await Task.Yield();
 if(this.tcpClient != null)
 {
 this.tcpClient.Close();
 this.tcpClient.Dispose();
 this.tcpClient = null;
 }
 if(this.stream != null)
 {
 this.stream.Dispose();
 this.stream = null;
 }
}
private async Task CloseIfCanceled(CancellationTeken token, Action onClosed = null)
{
 if(token.IsCancellationRequested)
 {
 await this.Close();
 if(onClosed != null)
 onClosed();
 token.ThrowIfCancellationRequested();
 }
}
public async Task ConnectAsync(RemoteServerInfo remoteServerInfo, CancellationToken cancellationToken = default(CancellationToken))
{
 try
 {
 //Connect async method
 await this.Close();
 cancellationToken.ThrowIfCancellationRequested();
 this.tcpClient = new TcpClient();
 canellationToken.ThrowIfCancellationRequested();
 await this.tcpClient.ConnectAsync(remoteServerInfo.Host, remoteServerInfo.Port);
 await this.CloseIfCanceled(cancelationToken);
 // get stream and do SSL handshake if applicable
 this.stream = this.tcpClient.GetStream();
 await this.CloseIfCanceled(cancelationToken);
 if (remoteServerInfo.Ssl)
 {
 var sslStream = new SslStream(this.stream);
 sslStream.AuthenticateAsClient(remoteServerInfo.Host);
 this.stream = sslStream;
 await this.CloseIfCanceled(cancelationToken);
 }
 }
 catch(Exception)
 {
 this.CloseIfCanceled(cancelationToken).Wait();
 throw;
 }
}

There's also async methods on Stream that return Task's so, also your OnDisconected event was not thread safe, you need to assign it to an internal variable. You should also never pass null EventArgs. Also you can simplfiy BeginRecieve to just receive and put it in a loop with async/await and a cancellation token. Also I would remove the goto and replace with a do/while (btw I'm not 100% sure the reduce buffer size logic works, someone else might want to address that)

public async Task SendAsync(byte[] data, CancelationToken token = default(CancellationToken))
{
 try
 {
 await this.stream.WriteAsync(data, 0, data.Length, token);
 await this.stream.FlushAsync(token);
 }
 catch (IOException ex)
 {
 var onDisconected = this.OnDisconected;
 if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
 ; // ignore
 else if (onDisconected != null)
 onDisconected(this, EventArgs.Empty);
 }
}
public async Task Recieve(CancelationToken token = default(CancellationToken))
{
 try
 {
 if(!this.IsConnected || this.IsRecieving)
 throw new InvalidOperationException();
 this.IsRecieving = true;
 byte[] buffer = new byte[bufferSize];
 while(this.IsConnected)
 {
 token.ThrowIfCancellationRequested();
 int bytesRead = await this.stream.ReadAsync(buffer, 0, buffer.Length, token);
 if(bytesRead > 0)
 {
 if(bytesRead == buffer.Length)
 this.BufferSize = Math.Min(this.BufferSize * 10, this.maxBufferSize);
 else
 {
 do
 {
 int reducedBufferSize = Math.Max(this.BufferSize / 10, this.minBufferSize);
 if(bytesRead < reducedBufferSize)
 this.BufferSize = reducedBufferSize;
 }
 while(bytesRead > this.minBufferSize)
 }
 var onDataRecieved = this.OnDataRecieved;
 if(onDataRecieved != null)
 {
 byte[] data = new byte[bytesRead];
 Array.Copy(buffer, data, bytesRead);
 onDataRecieved(this, data);
 }
 }
 buffer = new byte[bufferSize];
 }
 }
 catch(ObjectDisposedException){}
 catch(IOException ex)
 {
 var evt = this.OnDisconnected; 
 if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams
 ;
 if(evt != null)
 evt(this, EventArgs.Empty);
 }
 finally
 {
 this.IsRecieving = false;
 }
}
answered Feb 28, 2015 at 10:00
\$\endgroup\$
5
  • \$\begingroup\$ Thanks for the tips. Can you elaborate on why it is bad to dispose of a class from within itself? \$\endgroup\$ Commented Mar 1, 2015 at 11:04
  • 1
    \$\begingroup\$ Also why do you use Task.Yield()? \$\endgroup\$ Commented Mar 1, 2015 at 11:12
  • \$\begingroup\$ Disposing is meant to be called once, and once called the object is ready to be GAC'd. I think it's confusing if an object can dispose itself, an external caller should dispose it. In this case you want to close your connection from within, which is perfectly valid. Task.Yield() will yield to the calling thread immediately, we do this since we want the rest of the work bellow that call do be done in async (possibly on another thread, the runtime will determine where this work will go). \$\endgroup\$ Commented Mar 2, 2015 at 4:00
  • \$\begingroup\$ Interesting, so what happens if we DON'T use Task.Yield() there? \$\endgroup\$ Commented Mar 2, 2015 at 19:06
  • 1
    \$\begingroup\$ It will process syncronusly until the first await, if there is no await the whole method will be synchronous \$\endgroup\$ Commented Mar 3, 2015 at 1:24
4
\$\begingroup\$
public int MinBufferSize
{
 get
 {
 return this.minBufferSize;
 }
 set
 {
 this.minBufferSize = value;
 }
}
public int MaxBufferSize
{
 get
 {
 return this.maxBufferSize;
 }
 set
 {
 this.maxBufferSize = value;
 }
}

Any reason for not using auto-implemented properties?

public int MinBufferSize { get; set; }
public int MaxBufferSize { get; set; }

if (ex.InnerException != null && ex.InnerException is ObjectDisposedException) // for SSL streams

You don't need to check for null

if (ex.InnerException is ObjectDisposedException) // for SSL streams

From MSDN:

An is expression evaluates to true if the provided expression is non-null, and the provided object can be cast to the provided type without causing an exception to be thrown.

answered Mar 1, 2015 at 2:07
\$\endgroup\$
0
3
\$\begingroup\$

Your choice of values here might be obvious to you, but they aren't to me.

 private int minBufferSize = 8192;
 private int maxBufferSize = 15 * 1024 * 1024;
 private int bufferSize = 8192;

It's good that you defined variables for these, but if would probably be a good idea to leave a few comments explaining why you chose these initial values.

It's also interesting to note that C#6 supports auto property initializers. If/when you move to it, you could greatly simplify the way you're defining your properties.

public int MinBufferSize { get; set; } = 8192;

Even now you could simplify by using an auto-property and initializing these values in the constructor.

public int MinBufferSize { get; set; }
public AsyncTcpClient()
{
 this.MinBufferSize = 8192;
 //...
}

I am very happy to see IDisposable properly implemented though. Not nearly enough people get that right.

 if (!disposed)
 {
 if (disposing)
 {
 // Dispose managed resources.
 if (this.tcpClient != null)
 {
 this.tcpClient.Close();
 this.tcpClient = null;
 }
 }

(削除) Well, it seems you almost got it right at least. It seems that TcpClient implements IDisposable, so you should probably be calling it's Dispose() method here rather than setting it to null.

I don't have enough domain knowledge to say whether or not it's necessary to call Close(), but if it's implemented sanely, I doubt it's necessary. (削除ここまで)

Never mind. TcpClient explicitly implements IDisposable. That looks good.

answered Mar 1, 2015 at 3:19
\$\endgroup\$
1
  • 1
    \$\begingroup\$ Good points. Re IDisposable, TcpClient implements it explicitly. Calling .Close() has the same effect, without the need to cast to IDisposable first. Setting tcpClient to null simply makes sure that it doesn't get disposed twice. \$\endgroup\$ Commented Mar 1, 2015 at 9:45

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.