I am trying to write a simple queue like ArrayBlockingQueue in which the head of the queue will be removed if the queue is full while adding an element. The class should just have the below public methods
- To get the size of the Queue
- To get an element from the head of the queue
- To add an element at the tail of the queue
Can someone review the below code and let me know if there is a better way of doing this?
public class CircularArrayNonBlockingQueue<E> {
private ArrayBlockingQueue<E> blockingQueue;
public CircularArrayNonBlockingQueue(int size) {
blockingQueue = new ArrayBlockingQueue<>(size);
}
public synchronized int size() {
return blockingQueue.size();
}
public synchronized void add(E element) {
if(blockingQueue.remainingCapacity() <= 0) {
blockingQueue.poll();
}
blockingQueue.add(element);
}
public synchronized E poll() {
return blockingQueue.poll();
}
}
-
\$\begingroup\$ Don't you want to use EvictingQueue from Google guava? stackoverflow.com/a/25059883/3314834 \$\endgroup\$komarik– komarik2017年12月19日 19:52:29 +00:00Commented Dec 19, 2017 at 19:52
-
\$\begingroup\$ I am planning to use this in a producer consumer type pattern in which consumer will be blocked till a new element is available but the producer will never be blocked , if there is no space then head will be removed to have space for new element. \$\endgroup\$tuk– tuk2017年12月20日 07:13:42 +00:00Commented Dec 20, 2017 at 7:13
1 Answer 1
This has been discussed in stackoverflow
Having a thread-safe backend collection does not necessarily make a correct program. When only your
add
method issynchronized
, thetake()
method may run concurrently to it, so it is possible that after yourif(blockingQueue.remainingCapacity() <= 0)
test withinadd
, a concurrently runningtake()
removes an element, so thepoll()
withinadd
may remove an element unnecessarily. There is a perceivable difference to the situation whereadd()
would complete before thetake()
, as the consuming thread would receive a different item. It other words, the effect would be as ifadd
would sometimes not remove the oldest item, but the second oldest one.On the other hand, if you use
synchronized
for all of your methods consistently, there is no need to have a thread-safe backend collection:import java.util.ArrayDeque; public class CircularNonBlockingQueue<E> { private final ArrayDeque<E> blockingQueue; private final int maxSize; public CircularNonBlockingQueue(int size) { if(size<1) throw new IllegalArgumentException("size == "+size); blockingQueue = new ArrayDeque<>(size); maxSize = size; } public synchronized int size() { return blockingQueue.size(); } public synchronized void add(E element) { if(blockingQueue.size() == maxSize) { blockingQueue.poll(); } blockingQueue.add(element); notify(); } public synchronized E take() throws InterruptedException { while(blockingQueue.isEmpty()) wait(); return blockingQueue.remove(); } }
However, if you can live with weaker guarantees regarding the oldest element, you can use a
BlockingQueue
and don’t need anysynchronized
:public class CircularNonBlockingQueue<E> { private final ArrayBlockingQueue<E> blockingQueue; public CircularNonBlockingQueue(int size) { blockingQueue = new ArrayBlockingQueue<>(size); } public int size() { return blockingQueue.size(); } public void add(E element) { while(!blockingQueue.offer(element)) { blockingQueue.poll(); } } public E take() throws InterruptedException { return blockingQueue.take(); } }
It must be noted that neither of these solutions provides "fairness". So if the number of producer and consumer threads is large compared to the queue’s capacity, there is the risk that producers repeatedly remove items without reactivating threads blocked in
take()
. So you should always ensure to have a sufficiently large capacity.