Can someone please review this code for me? I have not implemented all the methods for simplicity.
/**
* Implements a blocking bounded queue from a given non-blocking unbounded queue implementation
*/
abstract class DerivedQueue<E> implements Queue<E>
{
DerivedQueue(Queue<E> queue, int size)
{
if(queue==null | size <0)
{
throw new RuntimeException("Bad Input");
}
fQueue = queue;
fSize = size;
}
Queue<E> fQueue;
int fSize;
int fCnt;
@Override
public boolean addAll(Collection<? extends E> arg0)
{
return false;
}
@Override
public boolean isEmpty()
{
return (fCnt==0);
}
public E remove()
{
if(fCnt==0)
{
try
{
wait();
}
catch (InterruptedException e)
{
throw new RuntimeException("Waiting thread was interrupted during remove with msg:",e);
}
}
E elem = fQueue.remove();
fCnt--;
notifyAll();
return elem;
}
@Override
public boolean add(E elem)
{
if(fCnt == fSize)
{
try
{
wait();
}
catch (InterruptedException e)
{
throw new RuntimeException("Waiting thread was interrupted during remove with msg:",e);
}
}
return fQueue.add(elem);
}
}
Open questions:
Would calling
notifyAll()
lead to multiple waiting threads checking thewhile()
condition at the same time, and hence there is a possibility that before the while gets satisfied, 2 threads are already out of it causing an outOfBound exception? Or does the methods need to be markedsynchronized
at all, and instead I can use asynchronized
block for reading or writing part of the array while the checking of constrained and waiting can remain unsynchronized?Is there a way to notify precisely only the consumer threads (waiting in take) or only the producer (waiting input) threads in this example?
notifyAll
notifies all of them.
5 Answers 5
This problem screams "semaphore".
Have a semaphore whose count represents the number of slots available in the queue. Enqueuers decrement the semaphore. Dequeuers increment it. This effectively caps the amount of nodes in the queue to some maximum, namely the initial value of the semaphore.
Likewise you can have a different semaphore represent the number of items currently enqueued. In this case dequeuers decrement it and enqueuers increment. For this one, the initial value is zero. This allows dequeuers to block until an item is available.
Your implementation is not threadsafe: Imagine the queue is empty, and several threads which want to remove an element are waiting. Then an element is added and all waiting threads are notified. Then they all try without further tests to remove an element, which will work only for the first thread, the next one will cause an underflow. Same scenario for several threads waiting to add.
So either fix the problem or add a comment with a "not threadsafe" warning.
In my oppinion class variables should always be defined first - even before the constructor. Another issue: why are your variables prefixed with f
? Rename to something less-confusing instead. Especially fSize
and fCnt
are quite close to each other. (Suggesting maxSize
and currentSize
).
-
\$\begingroup\$ thnx I have made some echanges, I use f to denote Class fields and distinguish them from local/method level fields \$\endgroup\$p101– p1012011年06月30日 05:39:18 +00:00Commented Jun 30, 2011 at 5:39
-
\$\begingroup\$ @p101 I use this. to use class fields ;) I know this is not required in java... but as you probably noticed - this doesn't increase readability. \$\endgroup\$Fge– Fge2011年06月30日 08:20:18 +00:00Commented Jun 30, 2011 at 8:20
I could see two more issue with the above code:
- These is missing increment to
fCurrrentCnt++
in theadd
method. - This will result in deadlock, if try to call
remove
first and thenadd
. Both will wait for each other.
You don't need to implement this yourself. ArrayBlockingQueue
already is in the JRE. You can use ArrayBlockingQueue.take
instead of the remove
method and ArrayBlockingQueue.put
instead of the add
method.
See also: Effective Java, 2nd edition, Item 47: Know and use the libraries
Anyway, it does not seem completely thread-safe. The fCurrrentCnt
field in the isEmpty
method is read without any synchronized
block.
[...] synchronization has no effect unless both read and write operations are synchronized.
From Effective Java, 2nd Edition, Item 66: Synchronize access to shared mutable data.
Explore related questions
See similar questions with these tags.
||
. A single pipe|
is for bitwise OR. \$\endgroup\$