I have a buffer - int
array with defined length. I want to read and write values into this array in parallel. For instance, 5 readers and 3 writers with different intervals. Additionally, I want to block waiting readers and waiting writers until a specific operation (read or write) is available.
Buffer:
public class IntBuffer
{
private readonly AutoResetEvent _readDone = new AutoResetEvent(false);
private readonly AutoResetEvent _writeDone = new AutoResetEvent(false);
private readonly int[] _array;
private int _index;
public IntBuffer(int bufferSize)
{
_array = new int[bufferSize];
_index = -1;
}
public void Write(int insert)
{
while (true)
{
if (_index >= -1 && _index < _array.Length - 1)
{
try
{
Monitor.Enter(this);
if (_index >= -1 && _index < _array.Length - 1)
{
_array[++_index] = insert;
return;
}
}
finally
{
Monitor.Exit(this);
_readDone.Set();
}
}
Console.WriteLine("waiting for add...");
_writeDone.WaitOne();
}
}
public int Read()
{
while (true)
{
if (_index >= 0)
{
try
{
Monitor.Enter(this);
if (_index >= 0)
{
var val = _array[_index--];
return val;
}
}
finally
{
Monitor.Exit(this);
_writeDone.Set();
}
}
Console.WriteLine("waiting for read...");
_readDone.WaitOne();
}
}
}
Reader:
public class Reader
{
private readonly IntBuffer _buff;
public Reader(IntBuffer buff)
{
_buff = buff;
}
public void Start(int sleepMs)
{
while (true)
{
Thread.Sleep(sleepMs);
Console.WriteLine($"reader read {_buff.Read()}");
}
}
}
Writer:
public class Writer
{
private readonly IntBuffer _buff;
private readonly Random _random = new Random(Guid.NewGuid().GetHashCode());
public Writer(IntBuffer buff)
{
_buff = buff;
}
public void Start(int sleepMs)
{
while (true)
{
Thread.Sleep(sleepMs);
int val = _random.Next(1, 500);
_buff.Write(val);
Console.WriteLine($"writer write {val}");
}
}
}
All in one:
var buffer = new IntBuffer(8);
new Thread(() => new Writer(buffer).Start(1700)).Start();
new Thread(() => new Writer(buffer).Start(2900)).Start();
new Thread(() => new Writer(buffer).Start(3450)).Start();
new Thread(() => new Reader(buffer).Start(5000)).Start();
new Thread(() => new Reader(buffer).Start(3000)).Start();
new Thread(() => new Reader(buffer).Start(1400)).Start();
Please review and add remarks.
1 Answer 1
What you are trying to archive is very similar to the BlockingCollection. Note that you can look at the .net framework's source code to get an idea how to implement it professionally ;).
The overall implementation seems to work and I think the blocking mechanism does what it is supposed to.
However, some points to note:
- Do not use the instance of the buffer as synchronization object (
Monitor.Enter(this)
;) because the object can be locked by someone else from outside... Create a private sync object instead. - In method
Read
, the temporary variableval
is not required - just return the value. - I would rename the class to make clear, that the call of
Write
may block the current thread (maybe BlockingBuffer, BlockingStack or something like that). - Why not making the class generic?
Explore related questions
See similar questions with these tags.
AutoResetEvent
implementsIDisposable
, so your classIntBuffer
should as well and both of those should be disposed of in theDispose()
method. Further,Reader
andWriter
will then have references toIntBuffer
, so they should implementIDisposable
as well. \$\endgroup\$