I have created a C# library that can send commands to a device and process the command specific responses and broadcasts over a serial port (or other communications method).
The library must also be able to handle request and response extensions held in other libraries as certain devices implement an extended command set, but it must be possible to choose whether these extended commands are utilised (I guess using reflection in the client app) however I have yet to find a good way to implement this bit yet.
I have created a class of type Packet that is able to create the packet and add its payload, calculate its checksum write the packet to the stream and observe a packet router for receiving and processing responses.
public class Packet : IObserver<Packet>
{
//The following properties are used for returning a response
private IDisposable unsubscriber;
public bool ResponseReceived { get; private set; }
public double Timeout { get; set; }
private Packet ResponsePacket;
//The following properties are used for building a packet
internal PacketHeaderType Header { get; private set; } //This an enum of byte with possible values of 0-15
internal List<byte> Payload { get; private set; }
protected int PayloadLength { get { return Payload.Count; } }
protected byte HeaderByte { get { return (byte)((Convert.ToByte(Header) << 4) | PayloadLength); } } //we need to add the packet length to the lower nibble of the header before sending
public Packet(PacketHeaderType header, List<byte> payload)
{
this.Header = header;
this.Payload = new List<byte>(payload);
this.ResponseReceived = false;
this.Timeout = 5;
}
public Packet(PacketHeaderType headerByte)
{
this.Header = headerByte;
this.Payload = new List<byte>();
this.ResponseReceived = false;
this.Timeout = 5;
}
internal byte XorByte
{
get
{
Byte xorByte = Convert.ToByte(HeaderByte);
for (int i = 0; i < PayloadLength; i++)
xorByte ^= Payload.ToArray()[i];
return xorByte;
}
}
public async Task WriteAsync(Stream stream, bool returnResponse = false, bool flush = true, CancellationToken token = default(CancellationToken))
{
var buffer = new List<byte>();
buffer.Add(HeaderByte);
if (Payload != null && PayloadLength > 0)
{
buffer.AddRange(Payload);
}
buffer.Add(XorByte);
await stream.WriteAsync(buffer.ToArray(), 0, buffer.Count);
if (flush)
{
await stream.FlushAsync();
}
if (returnResponse)
{
var ts = new TimeSpan();
ts = TimeSpan.FromSeconds(Timeout);
while (DateTime.Now + ts < DateTime.Now && !ResponseReceived)
{
await Task.Delay(10);
}
}
}
private bool ValidResponse(Packet p)
{
return false;
}
public virtual void Subscribe(IObservable<Packet> provider)
{
if (provider != null)
unsubscriber = provider.Subscribe(this);
}
public virtual void OnCompleted()
{
this.Unsubscribe();
}
public virtual void OnError(Exception e)
{
Console.WriteLine("{0}: Cannot get packet from router.", this.ToString());
}
public virtual void OnNext(Packet value)
{
if (ValidResponse(value))
{
ResponsePacket = value;
ResponseReceived = true;
Unsubscribe();
}
}
public virtual void Unsubscribe()
{
unsubscriber.Dispose();
}
//This probably shouldn't be here as we now use it in the packet handler
public static async Task<Packet> ReadAsync(Stream stream, CancellationToken cancellationToken = default(CancellationToken))
{
//TODO: we should read the stream until we find a valid header!
var headerBuffer = new byte[1];
int bytesRead = await stream.ReadAsync(headerBuffer, 0, 1, cancellationToken);
if (bytesRead == 0)
{
return null;
}
var headerVal = (byte)(headerBuffer[0] >> 4);
var payloadLength = (headerBuffer[0] & 0x0F);
Packet packet = new Packet((PacketHeaderType)Enum.Parse(typeof(PacketHeaderType), headerVal.ToString()));
//TODO: we should handle unknown packets some how!
//if (Enum.IsDefined(typeof(XpressNetV3PacketType), headerVal))
var bytesRemaining = payloadLength;
var offset = 0;
if (payloadLength > 0)
{
var payloadBytes = new byte[payloadLength];
do
{
bytesRead = await stream.ReadAsync(payloadBytes, offset, bytesRemaining, cancellationToken);
if (bytesRead == 0)
{
throw new XpressNetProtocolViolationException("Unexpected end of stream");
}
bytesRemaining -= bytesRead;
offset += bytesRead;
} while (bytesRemaining > 0);
}
//check the xor matches
var xorByte = stream.ReadByte();
if (xorByte != packet.XorByte)
throw new XpressNetProtocolViolationException("Packet checksum is invalid");
return packet;
}
}
I have created child classes that implement Type
packet for each of the valid commands (I don't think it is necessary to show them here...
I have created an IObservable message router in order for the packet to get a response:
public class PacketRouter : IObservable<Packet>
{
private List<IObserver<Packet>> observers = new List<IObserver<Packet>>();
public IDisposable Subscribe(IObserver<Packet> observer)
{
if (!observers.Contains(observer))
observers.Add(observer);
return new Unsubscriber(observers, observer);
}
private class Unsubscriber : IDisposable
{
private List<IObserver<Packet>> _observers;
private IObserver<Packet> _observer;
public Unsubscriber(List<IObserver<Packet>> observers, IObserver<Packet> observer)
{
this._observers = observers;
this._observer = observer;
}
public void Dispose()
{
if (_observer != null && _observers.Contains(_observer))
_observers.Remove(_observer);
}
}
public void SendPacket(Packet packet)
{
foreach (var observer in observers)
{
if (packet != null)
observer.OnError(new PacketUnknownException());
else
observer.OnNext(packet);
}
}
public void EndTransmission()
{
foreach (var observer in observers.ToArray())
if (observers.Contains(observer))
observer.OnCompleted();
observers.Clear();
}
}
public class PacketUnknownException : Exception
{
internal PacketUnknownException()
{
}
}
I have also created a class of type PacketHandler
that is able to read bytes from a stream and create it into a packet object. It also raises a broadcast event if the correct types of packets are discovered, otherwise it passes them to the messageRouter for handling by the library.
public class BroadcastMessageReceivedEventArgs : EventArgs
{
public Packet Packet { get; set; }
}
public static class PacketHandler
{
public static event EventHandler<BroadcastMessageReceivedEventArgs> BroadcastMessageReceived = delegate {};
public static void OnBroadcastMessageReceived(BroadcastMessageReceivedEventArgs e)
{
EventHandler<BroadcastMessageReceivedEventArgs> handler = BroadcastMessageReceived;
if (handler != null)
{
handler(null, e);
}
}
public static async Task<Packet> ReadPacketAsync(Stream stream, CancellationToken cancellationToken = default(CancellationToken))
{
var headerBuffer = new byte[1];
int bytesRead = await stream.ReadAsync(headerBuffer, 0, 1, cancellationToken);
if (bytesRead == 0)
{
return null;
}
var headerVal = (byte)(headerBuffer[0] >> 4);
var payloadLength = (headerBuffer[0] & 0x0F);
Packet packet = new Packet((PacketHeaderType)Enum.Parse(typeof(PacketHeaderType), headerVal.ToString()));
var bytesRemaining = payloadLength;
var offset = 0;
if (payloadLength > 0)
{
var payloadBytes = new byte[payloadLength];
do
{
bytesRead = await stream.ReadAsync(payloadBytes, offset, bytesRemaining, cancellationToken);
if (bytesRead == 0)
{
throw new XpressNetProtocolViolationException("Unexpected end of stream");
}
bytesRemaining -= bytesRead;
offset += bytesRead;
} while (bytesRemaining > 0);
}
//check the xor matches
var xorByte = stream.ReadByte();
if (xorByte != packet.XorByte)
throw new XpressNetProtocolViolationException("Packet checksum is invalid");
PacketReceived(packet);
return packet;
}
internal static void PacketReceived(Packet packet)
{
//TODO: Handle Broadcast Messages and AckResp
if ((packet.Header == PacketHeaderType.EmergencyStopAll) )
//|| (packet.Header == PacketHeaderType.CommandStationOperationResponse && packet.Payload.ElementAt(0) ))...
{
BroadcastMessageReceivedEventArgs bmr = new BroadcastMessageReceivedEventArgs();
bmr.Packet = packet;
OnBroadcastMessageReceived(bmr);
}
else
{
PacketRouter pump = new PacketRouter();
pump.SendPacket(packet);
}
}
}
The way I envisage using the library would be like this:
public async string GetCmdStnSoftwareVersion()
{
var msgReq = new CmdStnSoftwareVersionReqMessage();
await var response msgReq.WriteAsync(sPort.BaseStream, true);
return String.Format("{0}.{1}", response.Major, response.Minor);
}
Is this a good pattern and/or example implementation for handling responses which is compatible with implementing the extension libraries. Can anyone provide input?
2 Answers 2
Style
You are using to many new lines
You should use braces
{}
also for singlelineif
statements orfor
loops won't make your code slower or faster more secure. Read about theapple bug
. ( And yes @Magus I know you have another opinion. Feel free to express it in the comments )
Code against an interface not against an implementation
Instead of
internal List<byte> Payload { get; private set; }
you should use
internal IList<byte> Payload { get; private set; }
Constructor chaining
public Packet(PacketHeaderType header, List<byte> payload) { this.Header = header; this.Payload = new List<byte>(payload); this.ResponseReceived = false; this.Timeout = 5; } public Packet(PacketHeaderType headerByte) { this.Header = headerByte; this.Payload = new List<byte>(); this.ResponseReceived = false; this.Timeout = 5; }
Can be refactored by using constructor chaining and also IEnumerable<T>
instead of `List to
public Packet(PacketHeaderType header, IEnumerable<byte> payload):this()
{
this.Header = header;
this.Payload = new List<byte>(payload);
}
public Packet(PacketHeaderType headerByte):this()
{
this.Header = headerByte;
this.Payload = new List<byte>();
}
private Packet()
{
this.ResponseReceived = false;
this.Timeout = 5;
}
Property vs Method
Properties should return a value fast. If there is calculation involved you should better use a method. It won't be faster , but this is what one would expect.
internal byte XorByte { get { Byte xorByte = Convert.ToByte(HeaderByte); for (int i = 0; i < PayloadLength; i++) xorByte ^= Payload.ToArray()[i]; return xorByte; } }
Also calling ToArray()
on List<T>
to get an item of the list by index is ...
Just use the indexer of the List<T>
.
So this can be refactored to
internal byte GetXoredByte()
{
Byte xorByte = Convert.ToByte(HeaderByte);
for (int i = 0; i < PayloadLength; i++)
{
xorByte ^= Payload[i];
}
return xorByte;
}
Unneeded code
As Payload
can't be null
you can remove any checks.
Like inside the Packet.WriteAsync()
method:
if (Payload != null && PayloadLength > 0) { buffer.AddRange(Payload); }
can be refactored to
buffer.AddRange(Payload);
Checking for timeout
if (returnResponse) { var ts = new TimeSpan(); ts = TimeSpan.FromSeconds(Timeout); while (DateTime.Now + ts < DateTime.Now && !ResponseReceived) { await Task.Delay(10); } }
If you don't set the TimeOut
property to a value < 0
the first condition of the while
can be removed. But I guess that hadn't be what you were after. So refactoring to the following, with also the faster check first, will make the condition succeed
if (returnResponse && !ResponseReceived)
{
var ts = TimeSpan.FromSeconds(Timeout); // thanks to @firda
DateTime maximumTime = DateTime.Now + ts;
while (!ResponseReceived && DateTime.Now < maximumTime)
{
await Task.Delay(10);
}
}
As I have seen now the following method
private bool ValidResponse(Packet p)
{
return false;
}
I believe you have never tested your implementation at least you didn't with setting returnResponse = true
for the optional parameter of the Packet.WriteAsync()
method. Why so ? Because the only place you change the ResponseReceived
property is here:
public virtual void OnNext(Packet value)
{
if (ValidResponse(value))
{
ResponsePacket = value;
ResponseReceived = true;
Unsubscribe();
}
}
The if
condition won't ever be valid. You also need to think about what would happen if ResponseReceived
would be true
. You only set this value to false
inside of the constructor. So a second call to Packet.WriteAsync()
will have a problem with the while loop above.
One last thing (but not the last I noticed)
from PacketHandler
public void SendPacket(Packet packet) { foreach (var observer in observers) { if (packet != null) observer.OnError(new PacketUnknownException()); else observer.OnNext(packet); } }
this should be if(packet == null)
You code is buggy. Please recheck your code and if you still want a review of your code you should create a follow up. See also: https://codereview.meta.stackexchange.com/a/1765/29371
-
1\$\begingroup\$ This definitelly needs fixing and a follow-up. BTW: Small typo:
while (!ResponseReceived 6 DateTime.Now < maximumTime)
, should be&&
instead of6
. Andvar ts = new TimeSpan(); ts = TimeSpan.FromSeconds(Timeout);
looks odd to me as well. +1 for extensive review. \$\endgroup\$user52292– user522922014年10月07日 08:24:00 +00:00Commented Oct 7, 2014 at 8:24 -
\$\begingroup\$ @firda Thanks for the hint. Buggy code leads to buggy reviews ;-) \$\endgroup\$Heslacher– Heslacher2014年10月07日 08:25:34 +00:00Commented Oct 7, 2014 at 8:25
Few Assumptions
The usage and extensibility idea seems to be hidden in those two lines:
var msgReq = new CmdStnSoftwareVersionReqMessage();
await var response msgReq.WriteAsync(sPort.BaseStream, true);
That looks like you plan to derive CmdStnSoftwareVersionReqMessage
from Packet
while providing header
and payload
in the constructor and overriding ValidResponse
which should therefore be protected virtual
. Did I get it right?
Await Response
Unfortunatelly, I am forced to use .NET 3.5 and therefore don't understand async
/await
that much (use threads and events instead). I hope that some WriteAsync
override in your CmdStnSoftwareVersionReqMessage
or casting the result to appropriate type (as the original returns Task
, not Task<Packet>
or Task<TheResponse>
) will solve the problem with var response
type and later usage: response.Major, response.Minor
.
What I don't like is the active waiting (or it at least seems to be active waiting):
if (returnResponse)
{
var ts = new TimeSpan();
ts = TimeSpan.FromSeconds(Timeout);
while (DateTime.Now + ts < DateTime.Now && !ResponseReceived)
{
await Task.Delay(10); //<<<< this waiting
}
}
Packet: Observer, Router: Obsarveble
This looks like good pattern for new async
/await
system introduced in .NET 4.0 (or 4.5). I have used similar patter (my Link
is your Router
, Packet
is the same, my Accepts
method is your ValidResponse
). I cannot see where you subscribe the transmitted packet to the receiver, but assume you will do that somewhere.
Some inefficient code
internal byte XorByte
{
get
{
Byte xorByte = Convert.ToByte(HeaderByte);
for (int i = 0; i < PayloadLength; i++)
xorByte ^= Payload.ToArray()[i]; //<<< here
return xorByte;
}
}
Payload.ToArray()[i]
?? Again and again in the loop? You don't need the ToArray
. And I would personally use byte[]
instead of List<byte>
as type of the variable, IEnumerable<byte>
in constructor, IList<byte>
(or some readonly collection) as property type and some MemoryStream
during the build phase.
byte[]
instead ofList<byte>
on payload andIEnumerable<byte>
on construction), but I have used Packet-Payload design in my own solution for similar thing. There was aDictionary
that was mappingHeaderTypes
(service IDs) toType
of Payload-derived classes. Thisstatic
dictionary (could be made device-dependent) was handling all the conversion/mapping. \$\endgroup\$List<byte>
tobyte[]
/IEnumerable<byte>
style change, I would thing about thatDictionary<ServiceCode,Type>
andActivator.Create()
withvirtual Parse
/Build
for extensions. More info needed. \$\endgroup\$