Source code

001/*
002 * BioJava development code
003 *
004 * This code may be freely distributed and modified under the
005 * terms of the GNU Lesser General Public Licence. This should
006 * be distributed with the code. If you do not have a copy,
007 * see:
008 *
009 * http://www.gnu.org/copyleft/lesser.html
010 *
011 * Copyright for this code is held jointly by the individual
012 * authors. These should be listed in @author doc comments.
013 *
014 * For more information on the BioJava project and its aims,
015 * or to join the biojava-l mailing list, visit the home page
016 * at:
017 *
018 * http://www.biojava.org/
019 *
020 */
021
022package org.biojava.utils;
023
024import java.util.LinkedList;
025
026/**
027 * <p><code>SimpleThreadPool</code> is a basic implementation of
028 * <code>ThreadPool</code> for use where we don't wish to introduce a
029 * dependency on a 3rd-party pool. In general, objects which require a
030 * pool should only use the interface and parameterize such that other
031 * implementations may be dropped in in place of this one, possibly
032 * using this one as a fallback.</p>
033 *
034 * <p>This class offers a service for running <code>Runnable</code>s
035 * using multiple threads, the number of which is specified in the
036 * constructor. <code>Runnable</code>s are queued in a simple FIFO
037 * queue. The worker threads wait on the queue when it is empty and
038 * are notified when a new <code>Runnable</code> is submitted.</p>
039 *
040 * <p>This implementation will prevent an application from exiting
041 * until <code>stopThreads()</code> is called unless the pool contains
042 * daemon threads.</p>
043 *
044 * @author Keith James
045 * @since 1.3
046 */
047public class SimpleThreadPool implements ThreadPool
048{
049 protected PooledThread [] threads;
050 protected int priority;
051
052 private LinkedList queue;
053 private boolean daemon;
054 private boolean waiting;
055 private boolean stopped;
056
057 /**
058 * Creates a new <code>SimpleThreadPool</code> containing 4
059 * non-daemon threads and starts them. The threads have priority
060 * Thread.NORM_PRIORITY. Because threads are non-deamon you will need
061 * to call stopThreads() to terminate them.
062 */
063 public SimpleThreadPool()
064 {
065 this(4, false);
066 }
067
068 /**
069 * Creates a new <code>SimpleThreadPool</code> containing the
070 * specified number of threads and starts them. The threads have
071 * priority Thread.NORM_PRIORITY.
072 *
073 * @param threadCount an <code>int</code> thread count.
074 * @param daemon a <code>boolean</code> indicating whether the
075 * threads should be daemons. If threads are non-deamon you will need
076 * to call stopThreads() to terminate them.
077 */
078 public SimpleThreadPool(int threadCount, boolean daemon)
079 {
080 this(threadCount, daemon, Thread.NORM_PRIORITY);
081 }
082
083 /**
084 * Creates a new <code>SimpleThreadPool</code> containing the
085 * specified number of threads and starts them.
086 *
087 * @param threadCount an <code>int</code> thread count.
088 * @param daemon a <code>boolean</code> indicating whether the
089 * threads should be daemons. If threads are non-deamon you will need
090 * to call stopThreads() to terminate them.
091 * @param priority an <code>int</code> priority for the threads.
092 */
093 public SimpleThreadPool(int threadCount, boolean daemon, int priority)
094 {
095 this.daemon = daemon;
096 this.priority = priority;
097 queue = new LinkedList();
098 threads = new PooledThread[threadCount];
099 stopped = true;
100 waiting = false;
101 startThreads();
102 }
103
104 public void addRequest(Runnable task)
105 {
106 if (waiting || stopped)
107 throw new IllegalStateException("Thread pool has been closed to new requests");
108
109 synchronized(queue)
110 {
111 queue.add(task);
112 // Notify threads blocked in nextRequest()
113 queue.notifyAll();
114 }
115 }
116
117 public void startThreads()
118 {
119 if (! stopped)
120 throw new IllegalStateException("Thread pool is already started");
121
122 stopped = false;
123
124 synchronized(threads)
125 {
126 for (int i = 0; i < threads.length; i++)
127 {
128 threads[i] = new PooledThread();
129 if (daemon)
130 threads[i].setDaemon(true);
131 threads[i].setPriority(priority);
132 threads[i].start();
133 }
134 }
135 }
136
137 /**
138 * Waits for all working threads to return and then stops them. If the
139 * thread pool contains non-daemon threads you will have to call this method
140 * to make your program return.
141 * @throws IllegalStateException if the pool is already stopped.
142 */
143 public void stopThreads()
144 {
145 if (stopped)
146 throw new IllegalStateException("Thread pool has already been stopped");
147
148 stopped = true;
149
150 synchronized(queue)
151 {
152 // Ensure working threads return and die
153 while (threadsAlive() > 0)
154 {
155 try
156 {
157 queue.wait(500);
158 queue.notifyAll();
159 }
160 catch (InterruptedException ie) { }
161 }
162 }
163 }
164
165 public void waitForThreads()
166 {
167 if (stopped)
168 throw new IllegalStateException("Thread pool has been stopped");
169
170 waiting = true;
171
172 synchronized(threads)
173 {
174 // Ensure queue gets emptied and all work is done
175 while (! queue.isEmpty() || threadsWorking() > 0)
176 {
177 try
178 {
179 threads.wait();
180 }
181 catch (InterruptedException ie) { }
182 }
183 }
184
185 waiting = false;
186 }
187
188 /**
189 * <code>threadsWorking</code> returns the number of threads
190 * currently performing work.
191 *
192 * @return an <code>int</code>.
193 */
194 public int threadsWorking()
195 {
196 int workingCount = 0;
197
198 synchronized(threads)
199 {
200 for (int i = 0; i < threads.length; i++)
201 if (threads[i].working)
202 workingCount++;
203 }
204
205 return workingCount;
206 }
207
208 /**
209 * <code>threadsIdle</code> returns the number of threads
210 * currently waiting for work.
211 *
212 * @return an <code>int</code>.
213 */
214 public int threadsIdle()
215 {
216 return threads.length - threadsWorking();
217 }
218
219 /**
220 * <code>requestsQueued</code> returns the number of
221 * <code>Runnable</code>s currently queued.
222 *
223 * @return an <code>int</code>.
224 */
225 public int requestsQueued()
226 {
227 return queue.size();
228 }
229
230 /**
231 * <code>threadsAlive</code> returns the number of threads
232 * currently alive.
233 *
234 * @return an <code>int</code>.
235 */
236 protected int threadsAlive()
237 {
238 int aliveCount = 0;
239
240 synchronized(threads)
241 {
242 for (int i = 0; i < threads.length; i++)
243 if (threads[i].isAlive())
244 aliveCount++;
245 }
246
247 return aliveCount;
248 }
249
250 /**
251 * <code>nextRequest</code> gets the next <code>Runnable</code>
252 * from the queue. This method blocks if the queue is empty and
253 * the pool has not stopped. If the pool has stopped it returns
254 * null.
255 *
256 * @return a <code>Runnable</code> or null if the pool has been
257 * stopped.
258 */
259 protected Runnable nextRequest()
260 {
261 synchronized(queue)
262 {
263 try
264 {
265 while (! stopped && queue.isEmpty())
266 queue.wait();
267 }
268 catch (InterruptedException ie) { }
269
270 if (stopped)
271 return null;
272 else
273 return (Runnable) queue.removeFirst();
274 }
275 }
276
277 /**
278 * <code>PooledThread</code> is a thread class which works within
279 * the pool. It sets its boolean flag true when working,
280 * synchronizing this on the array which contains all the
281 * <code>PooledThread</code>s.
282 */
283 private class PooledThread extends Thread
284 {
285 boolean working = false;
286
287 public void run()
288 {
289 while (true)
290 {
291 Runnable task = nextRequest();
292
293 // If the pool is stopped the queue returns null and
294 // the thread dies
295 if (task == null)
296 break;
297
298 // Synchronize on thread array to update state
299 synchronized(threads)
300 {
301 working = true;
302 }
303
304 task.run();
305
306 synchronized(threads)
307 {
308 working = false;
309 threads.notify();
310 }
311 }
312 }
313 }
314}

AltStyle によって変換されたページ (->オリジナル) /