My project has a central concept of a Callback<T>
which is defined very simply:
public interface Callback<T> {
void call(T item) throws Exception;
}
This is used to enforce correct resource management (e.g. database connections, HTTP sessions) while still allowing the data to be streamed through.
This works very well for code that is written with it in mind, but occasionally we'll come to a point where we want to hook it into an external API that takes an Iterator rather than a Callback. This seems roughly analogous to an in-process UNIX pipe (with objects rather than bytes), so I'm referring to the operation as piping.
Unfortunately, the code required to implement it isn't very simple:
/**
* Limited adapter from a {@link Callback} to an {@link Iterator}. Performs roughly the inverse operation of
* {@link #stream(Callback, Iterable)}. Requires two callbacks. The first is passed a Callback
* which is the 'source' end of the pipe. The second is passed an Iterator, the 'consuming' end of the pipe.
* Every element pushed into the source Callback will come out at the consuming Iterator. Due to the push-pull
* nature of this operation, the use of an auxiliary thread is required.
*
* <p>While in general we encourage writing all API producers and consumers to use Callbacks directly,
* it is occasionally desirable to interface with an Iterator-based API that is either mandated externally
* or is inconvenient to change.
*
* <p>The somewhat inconvenient method call signature (with everything wrapped again in Callbacks) is
* required to ensure correct resource management.
*
* <p>If the either thread is interrupted, the main thread terminates with an exception
* and the worker thread dies.
*
* @param fromSource invoked with the source Callback
* @param intoCallback invoked with the destination Iterator
* @param pipeName the name of the auxiliary thread
*/
@Beta
public static <T> void pipeToIterator(
final Callback<Callback<T>> fromSource,
final Callback<Iterator<T>> intoCallback,
final String pipeName)
throws InterruptedException
{
final SynchronousQueue<T> queue = new SynchronousQueue<T>();
@SuppressWarnings("unchecked") // Used only for == checks
final T poisonPill = (T) new Object();
final ExecutorService pipe = com.mogwee.executors.Executors.newSingleThreadExecutor(pipeName);
// Set when the child thread exits
final AtomicBoolean childThreadAborted = new AtomicBoolean();
// Thrown as a marker from the callback if the child thread unexpectedly exits
final Exception childTerminated = new Exception();
try {
pipe.submit(new Callable<Void>() {
@Override
public Void call() throws Exception
{
try {
// Read items off of the queue until we see the poison pill
QueueIterator<T> iterator = new QueueIterator<T>(queue, poisonPill);
intoCallback.call(iterator);
Iterators.getLast(iterator); // Consume the rest of the iterator, in case the callback returns early
return null;
} finally
{
childThreadAborted.set(true);
}
}
});
} finally { // Ensure we always shutdown the service so that we never leak threads
pipe.shutdown();
}
try {
// Offer items from the callback into the queue
fromSource.call(new Callback<T>() {
@Override
public void call(T item) throws Exception
{
// Check periodically if the child thread is dead, and give up
while (!queue.offer(item, CHILD_DEATH_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS))
{
if (childThreadAborted.get())
{
throw childTerminated;
}
}
}
});
} catch (InterruptedException e) {
// Give up, let the finally block kill the worker
Thread.currentThread().interrupt();
throw e;
} catch (Exception e) {
if (e == childTerminated) { // The child thread died unexpectedly
throw new IllegalStateException("worker thread interrupted");
}
throw Throwables.propagate(e);
} finally {
try
{
if (!childThreadAborted.get())
{
queue.offer(poisonPill, POISON_PILL_GIVEUP_MS, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e)
{
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
} finally
{
pipe.shutdownNow();
}
}
}
/**
* Consume elements from a queue until a given poison pill is found.
*/
private static class QueueIterator<T> implements Iterator<T>
{
private SynchronousQueue<T> queue;
private T element;
private T poisonPill;
QueueIterator(SynchronousQueue<T> queue, T poisonPill)
{
this.queue = queue;
this.poisonPill = poisonPill;
}
@Override
public boolean hasNext()
{
ensureNext();
return element != poisonPill;
}
@Override
public T next()
{
ensureNext();
if (element == poisonPill)
{
throw new NoSuchElementException();
}
try {
return element;
}
finally {
element = null;
}
}
private void ensureNext()
{
if (element == null)
{
try
{
element = queue.take();
} catch (InterruptedException e)
{
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
}
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
}
(the code, along with some unit tests, is also on GitHub, in case that's easier)
This is a lot of intricate threaded code. I'm generally pretty wary of writing code like this, but I've been unable to come up with much of a better approach. My question is twofold: Is there a better way to handle this, or perhaps a nicer way of writing the code? Failing that, I'm pretty sure that there's at least a few race conditions / errors hiding in here, so some code review would be much appreciated :-)
In case an example of how this is used helps:
Callbacks.pipeToIterator(new Callback<Callback<UserAccount>>() {
@Override
public void call(Callback<UserAccount> callback) throws Exception
{
userClient.doQuery(Queries.<UserAccount>allItems(), callback);
}
}, new Callback<Iterator<UserAccount>>() {
@Override
public void call(Iterator<UserAccount> iter) throws Exception
{
searchService.updateIndex(iter);
}
}, "user-search-index-updater");
1 Answer 1
The code looks fine. Here are some small notes and questions which you may find useful.
The code seems as a custom implementation of actor-based concurrency. It has some implementations (Akka is, for example) which you should check if you haven't seen them already. (In Venkat Subramaniam's book, Programming Concurrency on the JVM: Mastering Synchronization, STM, and Actors, there is a good introduction to Akka.)
Have you considered using a queue which is not a
SynchronousQueue
? It might improve the throughput.I'd rename
childThreadAborted
tochildThreadFinished
since it will betrue
when it simply finishes.Do you instantiate the
childTerminated
exception early for thee == childTerminated
check only? If that's the only reason I'd check thechildThreadFinished
boolean in thecatch
block and create the exception in thewhile
loop with the proper stacktrace. It would help debugging if the exception contains a real stacktrace.Instead of the
childThreadFinished
flag you could check theisDone()
method of theFuture
which is returned from thepipe.submit
call.
-
1\$\begingroup\$ Thanks for the pointer to Akka, that looks very interesting! I'll probably not adopt it for this, but might use it in my next project... \$\endgroup\$Steven Schlansker– Steven Schlansker2012年07月27日 18:43:07 +00:00Commented Jul 27, 2012 at 18:43