I have tried to mimic Golang channels in C# and its performance is pretty good compared to golang itself. On my machine, each channel operation of Golang takes ~75 nano-sec and each Chan<T>
(in C#) operation takes ~90 nano-sec.
Please let me know if this code can be improved in any way.
class Chan<T>
{
readonly int size;
T[] buffer;
long head = -1;
long tail = -1;
long closed = 0;
public Chan() { this.size = 0; }
public Chan(int size)
{
if (size < 0) throw new ArgumentOutOfRangeException();
this.size = size;
this.buffer = new T[this.size];
}
object headLock = new object();
public bool To(T t)
{
lock (headLock)
{
long localClosed = 0L;
if (tail - head == buffer.Length) SpinWait.SpinUntil(() => (localClosed = Interlocked.Read(ref closed)) > 0 || tail - head < buffer.Length);
if (localClosed > 0) return false;
var newTail = Interlocked.Increment(ref tail);
buffer[newTail % buffer.Length] = t;
return true;
}
}
object tailLock = new object();
public bool From(out T val)
{
lock (tailLock)
{
long localClosed = 0L;
if (tail - head == 0) SpinWait.SpinUntil(() => (localClosed = Interlocked.Read(ref closed)) > 0 || tail - head > 0);
if (localClosed > 0)
{
val = default(T);
return false;
}
var newHead = Interlocked.Increment(ref head);
val = buffer[newHead % buffer.Length];
return true;
}
}
public void Close()
{
Interlocked.Increment(ref closed);
}
}
3 Answers 3
Without the select operator, you haven't got Go channels - you've just got buffered queues, which are much easier to implement but much less useful.
Also, it's important to allow channels with a zero size buffer - in that case the sender should synchronise with the receiver.
-
\$\begingroup\$ Ouch! I totally forgot select! \$\endgroup\$Kaveh Shahbazian– Kaveh Shahbazian2013年10月10日 11:35:00 +00:00Commented Oct 10, 2013 at 11:35
-
1\$\begingroup\$ It may be worth to mention that the default constructor of
Chan
should set the buffer size to zero, not one. The default buffer size of achan
in Go is zero. \$\endgroup\$user36– user362014年04月09日 14:05:35 +00:00Commented Apr 9, 2014 at 14:05
Chan
sounds like the abbreviated name forChannel
and apparently it's a channel. So I'd useChannel
or maybe evenGoChannel
.- The most commonly used naming convention I have seen for private members is to prefix them with an underscore. This way you can see at the first glance whether it's a local variable or a class member. This also means you can get rid of
this.
most of the time. - I really prefer to spell out the access modifier even if
private
is default but YMMV. - Method names should describe actions or operations (because they operate on data and sometimes modify the state of the object).
To
andFrom
are not actions or operations.(削除) That being said: The channel seems to have fixed size queue semantics (FIFO) so I'd consider calling the operations(The semantic of the underlying data structure should not be exposed. Don't know what I was thinking there.) Rather use theEnqueue
andDequeue
which would make it immediately clear how the data is being processed. (削除ここまで)Send
andReceive
semantics from the go definition. (削除) Given the previous point it could be useful to have aPeek
method to check what will come next. (削除ここまで)_head
and_tail
are longs and access is not guaranteed to be atomic so you should useInterlocked.Read
to obtain them.Also the implementation is actually broken. Assume two threads A and B, A calls
Send()
and B callsReceive()
, first execution_head == _tail == -1
:- A: execute
Interlocked.Increment(_tail)
(_tail
is now 0) - B:
_tail - _head > 0
is true (0 - -1 == 1
), leaves spinlock - B: execute
Interlocked.Increment(_head)
(_head
is now 0) - B: read
_buffer[_head]
- A: write
_buffer[_tail]
- B has read from buffer before element was written.
This problem can be easily reproduced with this test case (almost every iteration results in dupes):
[TestCase] public void TestSPSC() { int numItems = 10000; int numIterations = 100; for (int i = 0; i < numIterations; ++i) { var channel = new Channel<int>(100); var writer = Task.Factory.StartNew(() => { foreach (var num in Enumerable.Range(1, numItems)) { channel.Send(num); } channel.Close(); }); var reader = Task.Factory.StartNew<List<int>>(() => { var numbers = new List<int>(numItems); for (int idx = 1; idx <= numItems; ++idx) { int num; var res = channel.Receive(out num); numbers.Add(num); } return numbers.OrderBy(x => x).ToList(); }); Task.WaitAll(writer, reader); var dupes = reader.Result.GroupBy(x => x).Where(g => g.Count() > 1).ToList(); if (dupes.Count > 0) { Console.WriteLine("{0}: {1} DUPES!", i, dupes.Count); } } }
- A: execute
I changed the implementation to use .NET's BlockingCollection<T>
wrapped around a ConcurrentQueue<T>
:
public class Channel<T>
{
private BlockingCollection<T> _buffer;
public Channel() : this(1) { }
public Channel(int size)
{
_buffer = new BlockingCollection<T>(new ConcurrentQueue<T>(), size);
}
public bool Send(T t)
{
try
{
_buffer.Add(t);
}
catch (InvalidOperationException)
{
// will be thrown when the collection gets closed
return false;
}
return true;
}
public bool Receive(out T val)
{
try
{
val = _buffer.Take();
}
catch (InvalidOperationException)
{
// will be thrown when the collection is empty and got closed
val = default(T);
return false;
}
return true;
}
public void Close()
{
_buffer.CompleteAdding();
}
public IEnumerable<T> Range()
{
T val;
while (Receive(out val))
{
yield return val;
}
}
}
The code is much easier to read and has probably less bugs than your self implemented one. It's also fast. I can pump 10,000,000 items (I tested with int
) through a channel (buffer size 100) with single producer single consumer in 5sec. That's 0.5ns per item.
[TestCase]
public void TestSPSC_Performance()
{
int numItems = 10000000;
int numIterations = 10;
var stopWatch = new Stopwatch();
stopWatch.Start();
for (int i = 0; i < numIterations; ++i)
{
var channel = new Channel<int>(100);
var writer = Task.Factory.StartNew(() => { foreach (var num in Enumerable.Range(1, numItems)) { channel.Send(num); } channel.Close(); });
var reader = Task.Factory.StartNew<List<int>>(() => { var res = new List<int>(numItems); foreach (var num in channel.Range()) { res.Add(num); } return res; });
Task.WaitAll(writer, reader);
}
stopWatch.Stop();
var elapsedMs = stopWatch.Elapsed.TotalMilliseconds;
Console.WriteLine("SPSC N = {0}: {1:.00}ms/iteration, {2:.00}ns/item (tx+rx)", numItems, elapsedMs / numIterations, elapsedMs * 1000.0 / numItems / numIterations);
}
-
\$\begingroup\$ I was trying to mimic golang chan. But in C# land you are right about naming; I've done some modifications based on your answer. \$\endgroup\$Kaveh Shahbazian– Kaveh Shahbazian2013年10月10日 10:16:56 +00:00Commented Oct 10, 2013 at 10:16
-
\$\begingroup\$ A Peek method is not in general a good idea because it's ok for several receivers to use the channel at once, in which case Peek cannot necessarily provide an accurate prediction of what you will read, because another process might have read the value first. That's why Go's channels do not provide that primitive. \$\endgroup\$rog– rog2013年10月10日 11:35:27 +00:00Commented Oct 10, 2013 at 11:35
-
\$\begingroup\$ @rog, sure but if you only have 1 consumer then it might be \$\endgroup\$ChrisWue– ChrisWue2013年10月10日 18:48:18 +00:00Commented Oct 10, 2013 at 18:48
-
\$\begingroup\$ If you've got only one consumer and the buffer size is greater than zero and you must avoid actually reading the value. Better to leave it out IMHO - it's a misleading operation. \$\endgroup\$rog– rog2013年10月11日 12:35:21 +00:00Commented Oct 11, 2013 at 12:35
-
\$\begingroup\$ @KavehShahbazian: Found a problem with your implementation. I got interested in this and started implementing
Select
and wrote a bunch of unit tests for the class. \$\endgroup\$ChrisWue– ChrisWue2013年10月15日 07:17:03 +00:00Commented Oct 15, 2013 at 7:17
I would not use SpinWait
, since this basically runs a small loop that checks the condition over and over again. This means a lot of CPU cycles are wasted. I would suggest a signalling construct like the ManualResetEventSlim
class.
You can read about this class and similar constructs on this excellent page about threading in C#.
Note: the SpinWait
class is only preferred when you know in advance that the wait times will be very small (smaller than time it takes to do a thread context switch). You can configure the ManulResetEventSlim
class to spin for a short time and then fall back to a kernel-based wait operation, by setting the SpinCount
property. This is usefull if you expect a very short wait time, but don't want to waste a too manu CPU cylces when it turns out that you have to wait longer.
-
\$\begingroup\$ agreed - spinning is almost never the right solution. \$\endgroup\$rog– rog2013年10月10日 11:27:35 +00:00Commented Oct 10, 2013 at 11:27
BufferBlock
is quite similar to Go channel. \$\endgroup\$BufferBlock
does not provide the concept of 'closing' a channel. \$\endgroup\$Complete()
method is for. Though it does it differently than your code: after completing a block, you can't add new items to it, but the items that are already in it can still be processed. \$\endgroup\$