I have a synchronized list that I use as a taskqueue. In this queue I have some long values that need to be proceed and create heavy I/O and delay. I thought of using threads to get things done faster.
Is there a better way to implement all 50 threads to wait than using t.join();
? Is there anything wrong with the way I access the queue in my run()
implementation?
Implementation:
public class Start {
private static List<Long> queue = Collections.synchronizedList(new ArrayList<Long>());
private static HashSet<MyThread> threads = new HashSet<MyThread>();
public static void main(String[] args) throws InterruptedException {
// add elements to workqueue
for(int i = 0; i < 100000; i++) {
queue.add(new Date().getTime());
}
// create 50 new threads
for (int i = 0; i < 50; i++) {
MyThread t = new MyThread(queue);
t.start();
threads.add(t);
}
// wait for all 50 threads to finish
for (MyThread t : threads) {
t.join();
}
System.out.println("Done");
}
}
Thread:
public class MyThread extends Thread {
private List<Long> queue = null;
public MyThread(List<Long> queue) {
this.queue = queue;
}
@Override
public void run() {
boolean _break = false;
int size = 0;
long task = -1;
while(true) {
synchronized (queue) {
if(!queue.isEmpty()) {
size = queue.size();
// get task for worker
task = queue.remove(0);
} else {
_break = true;
}
}
if(_break == true) {
System.out.println("Thread-" + this.getId() + " Size: 0");
break;
} else {
// do some stuff with the task
// ...
// ...
System.out.println("Thread-" + this.getId() + " Size: " + size);
}
}
}
}
1 Answer 1
Use an Executor
Maintaining a work queue and a thread pool are problems that an executor tackles:
This interface provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling, etc. An Executor is normally used instead of explicitly creating threads.
The use of Future
s also helps you impose time constraints and handle exceptions thrown during execution.
(Note that you'll typically want to use ExecutorService
or CompletionService
as Executor
is a minimalistic interface.)
To paraphrase your example:
class IncrediblyLongRunningTask implements Callable<Long> {
@Override
public Long call() throws Exception {
// ...
}
}
public void compute() throws InterruptedException {
final ExecutorService service = Executors.newFixedThreadPool(50);
final List<IncrediblyLongRunningTask> tasks = new ArrayList<>();
for ( int i = 0; i < /*...*/; i++ ) {
task.add(new IncrediblyLongRunningTask());
}
final List<Future<Long>> futures = service.invokeAll(tasks); // returns when all done
service.shutdown();
for ( final Future<Long> future : futures ) {
// ...
}
}