I saw a tutorial from a site where it showed the use of ArrayBlockingQueue
as a thread-safe concurrent data-structure . Just for learning purposes , i tried to build something similar using synchronized and wait-notify methods in java. I'm really very new to java.util.concurrent
and i'm hoping someone could guide me to some improvements and point out the bad parts of this code i wrote :
import java.util.ArrayList;
import java.util.Random;
public class LowLevelProducerConsumer {
ArrayList<Integer> list = new ArrayList<Integer>();
private final int size = 10;
private int elems = 0;
Object lock = new Object();
Random r = new Random(System.currentTimeMillis());
public void producer() {
while (true) {
synchronized (lock) {
try {
while (elems != size) {
list.add(r.nextInt(100));
elems++;
}
} finally {
// allows consumer to remove an entry
lock.notify();
}
}
}
}
public void consumer() throws InterruptedException {
Thread.sleep(100); // any better way to ensure producer runs first ?
int ran = 0;
while (true) {
// consumer tries to acquire lock here , but it can do so only after
// producer has called notify
synchronized (lock) {
// do i need a lock.wait() here somewhere ?
ran = r.nextInt(10);
if (ran == 7) { // just an arbitrary condition
int loc = r.nextInt(list.size());
int data = list.remove(loc);
System.out.format(
"%d removed from index %d , size : %d\n",
data, loc, list.size());
// decrementing elems to let the producer work again
elems--;
Thread.sleep(300);
}
// release the lock so that producer can fill things up again
lock.notify();
}
}
}
public static void main(String[] args) {
final LowLevelProducerConsumer low = new LowLevelProducerConsumer();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
low.producer();
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
low.consumer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
t2.start();
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ps : i'v kept my understanding of how the code works in the comments. Any correction to them would be much appreciated.
-
\$\begingroup\$ For learning purposes, read more code written by qualified programmers. Sources of main java libraries, including ArrayBlockingQueue, are available. But very often that code is overoptimized, so reader can get habits of bad style. For beginners, style is more important than execution speed. \$\endgroup\$Alexei Kaigorodov– Alexei Kaigorodov2013年10月15日 18:41:46 +00:00Commented Oct 15, 2013 at 18:41
2 Answers 2
Miscellaneous Remarks
- Your
elems
variable is justlist.size()
, so why not just uselist.size()
everywhere and eliminateelems
? - Your other
size
is a constant, and should be calledSIZE
by convention. - Instead of spreading out
int ran = 0;
ran = r.nextInt(10);
, andif (ran == 7) { ... }
over three lines, you should just doif (r.nextInt(10) == 7) { ... }
. Putting it all on one line takes less mental effort to follow the code.
Threading
- Both the producer and consumer threads are infinite loops. Neither thread will end normally, and therefore trying to
.join()
them is superfluous. - Rather than ensuring that the producer runs first, you should just design the consumer to correctly handle the case when there are no more values to consume. As @DanielR pointed out, when
list.size()
is zero,r.nextInt(list.size())
throws anIllegalArgumentException
. Your use of
lock.notify()
right now is superfluous, as there is no correspondinglock.wait()
that expects a notification. A good place to insert alock.wait()
would be where you suspected. (Without thelock.wait()
, the consumer would keep trying frantically to fetch a value, which could work, but is inefficient. Insertinglock.wait()
makes the consumer yield the processor to the producer until the producer notifies the consumer that the values have been replenished.)public void consumer() throws InterruptedException { synchronized (lock) { while (list.isEmpty()) { // Nothing to consume for now; wait for more. lock.wait(); } ... } }
I think that the vocabulary is slightly off. To me, a "producer" and a "consumer" are both threads. I would say,
Thread producer = new Thread(new Runnable() { @Override public void run() { low.produce(); } });
Therefore, I would rename your methods as verbs
produce()
andconsume()
.In Java, every Object can act as a lock. There is no need to make a separate
lock
object; you could just synchronize onlist
orthis
instead. (It doesn't matter which object you choose as the lock, as long as the producer and consumer both agree on which object it is.) As a special case,public void method() { synchronized (this) { ... } }
can be written as
public synchronized void method() { ... }
which is slightly more readable and also compiles to slightly tighter bytecode.
I would take move the while-loops from
produce()
andconsume()
into therun()
method. That would have the triple benefit of- keeping
produce()
andconsume()
simpler - making it more obvious that each
Runnable
is an infinite loop - moving the
synchronized
keyword into the method signatures, making it easier to see thatproduce()
andconsume()
run mutually exclusively, and generating slightly tighter bytecode.
- keeping
Proposed Solution
import java.util.ArrayList;
import java.util.Random;
public class LowLevelProducerConsumer {
ArrayList<Integer> list = new ArrayList<Integer>();
private final int SIZE = 10;
Random r = new Random(System.currentTimeMillis());
public synchronized void produce() {
try {
while (list.size() != SIZE) {
list.add(r.nextInt(100));
}
} finally {
// Notifies consumer that entries have been generated
this.notify();
}
}
public synchronized void consume() throws InterruptedException {
while (list.isEmpty()) {
// Nothing to consume for now; wait for more.
// System.err.println("Consumer waiting...");
this.wait();
}
if (r.nextInt(10) == 7) { // just an arbitrary condition
int loc = r.nextInt(list.size());
int data = list.remove(loc);
System.out.format(
"%d removed from index %d , size : %d\n",
data, loc, list.size());
Thread.sleep(300);
}
}
public static void main(String[] args) {
final LowLevelProducerConsumer low = new LowLevelProducerConsumer();
Thread producer = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
low.produce();
}
}
});
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
low.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
producer.start();
consumer.start();
}
}
This line is unsafe:
int loc = r.nextInt(list.size());
an error is raised if list.size()
is 0
.
Your code implements some very strange producer consumer behaviour, especially the random value at the consumer without any waiting time. Please describe in words the intended behaviour.