I've written a circular buffer with multithreaded write and single read ability:
public class RingBuffer<E> extends AbstractList<E> implements RandomAccess {
class Pointer {
int p = 0;
public int getPointer(){
return this.p;
}
public void increasePointer(int p) {
this.p = p;
}
}
private final int n;
private final List<E> rb;
private int size = 0;
private Pointer ptr_w = new Pointer();
private Pointer ptr_r = new Pointer();
public RingBuffer(int n) {
this.n = n + 1;
this.rb = new ArrayList<E>(Collections.nCopies(this.n, (E) null));
}
public int size() {
return this.size;
}
private int wrap(int i) {
int m = i % this.n;
return (m < 0) ? m += this.n : m;
}
@Override
public E get(int i) {
return (this.size == 0) ? null : this.rb.get(i);
}
@Override
public E remove(int i) {
E e = this.get(this.ptr_r.getPointer());
if(e != null) {
this.rb.set(this.ptr_r.getPointer(), null);
setPointer(false, this.ptr_r);
}
System.out.println("Read: " + this.ptr_r.getPointer() + " " + size());
return e;
}
@Override
public synchronized boolean add(E e) {
if (size() == this.n - 1)
throw new IllegalStateException("Cannot add element. Ringbuffer is filled.");
this.rb.set(this.ptr_w.getPointer(), e);
setPointer(true, this.ptr_w);
System.out.println("Write: " + this.ptr_w.getPointer() + " " + size());
return true;
}
private synchronized void setPointer(boolean a, Pointer ptr) {
this.size = (a) ? this.size + 1 : this.size - 1;
ptr.increasePointer(wrap(ptr.getPointer() + 1));
}
}
I would like to know what I could improve and what you think about the code. I'm new to multithreading and would love to get some feedback.
1 Answer 1
I highly recommend Java Concurrency in Practice by Brian Goetz et al. This will give you a solid foundation for building multi-threaded Java applications.
The biggest problem with RingBuffer
is that it is not thread-safe. You must synchronize all read and write operations to be certain to see the latest values for fields. This doesn't mean you have to lock the structure the entire time.
This is explained far better in the book than I can here. The short of it is that without synchronization via memory barriers, the JVM is free to cache values read in one thread and miss writes happening in another.
- Thread A calls
get
and sees that the buffer is empty. - Thread B calls
add
which updates the pointer. - Thread A calls
get
again, but without a memory barrier the cached pointer value tells it that the queue is still empty.
You'll need to add synchronization to guard any access of the pointers and size. Sometimes you can merely add synchronized
to the method, but this usually hurts performance. You want to shrink your synchronized blocks to the bare minimum; prefer using synchronized (<lock>)
blocks over synchronized methods.
Here are some other suggestions unrelated to concurrency:
RingBuffer
composes anArrayList
so it should not extendAbstractList
itself. It shouldn't even implement the basicList
interface norRandomAccess
because it doesn't truly support theget
operation. It should implementQueue
instead.As written,
get
allows reading outside the buffer. Are you sure you want to support this? See (1) above.You don't need a class to hold an integer pointer given that it is internal to
RingBuffer
. A simpleint
is sufficient.The modulo operator
%
should handle negative values already (test this!) sowrap
could be simplified.
-
\$\begingroup\$ I knew that I should shrink the synchronized blocks to the bare of minimum. Therefore I added the
setPointer(boolean a, Pointer ptr)
function which is synchronized. Read and write use this function to get the position and set the size, in my opinion the threads are synchronised. I’m wrong? Yea, your right with theget
I will have a look at the implement. Thanks for your hints. \$\endgroup\$Andre Hofmeister– Andre Hofmeister2013年03月24日 19:22:51 +00:00Commented Mar 24, 2013 at 19:22 -
\$\begingroup\$ Assuming you had a similar synchronized
getPointer
method to fix the stale read problem I addressed above, you still have concurrency issues. For exampe, inremove
you read the element at the pointer but increment the pointer in a separate operation. Another thread could read the same element before the first had a chance to increment the pointer. \$\endgroup\$David Harkness– David Harkness2013年03月24日 19:58:48 +00:00Commented Mar 24, 2013 at 19:58 -
\$\begingroup\$ Thanks again. If I put the
set
function fromrb
intosetPointer(boolean a, Pointer ptr)
I’m fine? Or is there a better way? I created the Pointer class. cause with this I’m able to set the read and write pointer in the function with out distinguish which pointer I pass. A simpleint
would be passed as value not as reference, so I don’t know which pointer was passed. \$\endgroup\$Andre Hofmeister– Andre Hofmeister2013年03月25日 19:04:42 +00:00Commented Mar 25, 2013 at 19:04 -
\$\begingroup\$ No, that is not sufficient. See this explanation of the race condition. \$\endgroup\$David Harkness– David Harkness2013年03月25日 21:27:48 +00:00Commented Mar 25, 2013 at 21:27