I recently wrote a simple echo server in Java 7 using the NIO APIs so it was asynchronous and non-blocking. Then I decided, as a learning experience, to redo it with Java 8 hoping to use a more functional style and not have nested callbacks. I'm struggling to understand how to use the new CompletableFuture
class with the Future
s returned by AsynchronousSocketChannel
.
This code currently works but is slower than the Java 7 version. If anyone can point out ways to improve it (or maybe that I'm going about it completely wrong) it would be appreciated. The main problem, to me at least, is that I now have to call get()
on three Future
s, whereas before I didn't have any blocking operations.
try (final AsynchronousServerSocketChannel listener =
AsynchronousServerSocketChannel.open()) {
listener.setOption(StandardSocketOptions.SO_REUSEADDR, true);
listener.bind(new InetSocketAddress("localhost", 8080));
while (true) {
AsynchronousSocketChannel connection = listener.accept().get();
CompletableFuture<AsynchronousSocketChannel> connectionPromise =
CompletableFuture.completedFuture(connection);
CompletableFuture<ByteBuffer> readerPromise = CompletableFuture.supplyAsync(() -> {
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
try {
connection.read(buffer).get();
return (ByteBuffer) buffer.flip();
} catch (InterruptedException | ExecutionException e) {
connectionPromise.completeExceptionally(e);
}
return null;
});
readerPromise.thenAcceptAsync((buffer) -> {
if (buffer != null) {
try {
connection.write(buffer).get();
connection.close();
} catch (InterruptedException | ExecutionException | IOException e) {
readerPromise.completeExceptionally(e);
}
}
});
}
} catch (IOException | InterruptedException | ExecutionException e) {
e.printStackTrace();
}
2 Answers 2
What to do with checked Exceptions thrown within a CompletableFuture
?
Let's look what does CompletableFuture
do with the unchecked exceptions.
As @palacsint noted:
In all other cases, if a stage's computation terminates abruptly with an (unchecked) exception or error, then all dependent stages requiring its completion complete exceptionally as well, with a CompletionException holding the exception as its cause.
That is to say: If completableFuture.get()
would throw, say, an ExecutionException:NullPointerException
; then so would future.thenApply(funcOne).get()
and future.thenApply(funcOne).thenApply(funcTwo).get()
and so on. This is done by at each stage a layer of CompletionException
is peeled and wrapped in another. .get()
peels it and wraps it in an ExecutionException
so that the contents of the ExecutionException
is not drowned in layers and layers of CompletionException
s.
So what will happen if you wrap a checked exception in a CompletionException
. It will percolate through the chain and you would get a ExecutionException:IOException
, as if you could throw a checked exception from a Supplier<T>
. Which you can handle differently than if you just wrapped in a RuntimeException
. This is what you try to do when you do connectionPromise.completeExceptionally(e);
as far as I can understand. And it is what would happen if someone else called completeExceptionally
on the enclosing CompletableFuture
. I am proposing this; because you might want to prefer unrecoverable RuntimeException
s to be program errors as much as prudent(, and no more). If I am ignoring fifty RuntimeException
s because some clients have sketchy wireless connection, then I might ignore some other RuntimeException
that is indicative of a grave programming error. To be fair this may be considered as depending on undocumented behavior. If you are troubled by this, you can define you on you own unchecked checked exception wrapper.
Finally on this point; looking at the API CompletableFuture
s are clearly meant to be chained.
Accepting Connections Synchronously
You need not really do this. You can either start the CompletableFuture
chain with a call to supplyAsync
giving it a suitable Executor
such as a fixed thread pool or cached poll or a custom ThreadPoolExecutor
. Or similarly you accept connections in a callback using AsynchronousChannelGroup
. Since I do not know about NIO2 I am copying that part from some online source.
public static void serverLoop() throws IOException {
// These can be injected and thus configured from without the application
ExecutorService connectPool = Executors.newFixedThreadPool(10);
Executor readPool = Executors.newCachedThreadPool();
Executor writePool = Executors.newCachedThreadPool();
AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(connectPool);
try (AsynchronousServerSocketChannel listener =
AsynchronousServerSocketChannel.open(group)) {
listener.setOption(StandardSocketOptions.SO_REUSEADDR, true);
listener.bind(new InetSocketAddress("localhost", 8080));
while (true) {
listener.accept(null, handlerFrom(
(AsynchronousSocketChannel connection, Object attachment) -> {
assert attachment == null;
CompletableFuture.supplyAsync(() -> {
try {
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
connection.read(buffer).get();
return (ByteBuffer) buffer.flip();
} catch (InterruptedException | ExecutionException ex) {
throw new CompletionException(ex);
}}, readPool)
.thenAcceptAsync((buffer) -> {
try {
if (buffer == null) return;
connection.write(buffer).get();
} catch (InterruptedException | ExecutionException ex) {
throw new CompletionException(ex);
}
}, writePool)
.exceptionally(ex -> {
handleException(ex);
return null;
});
}));
}
}
}
private static <V, A> CompletionHandler<V, A> handlerFrom(BiConsumer<V, A> completed) {
return handlerFrom(completed, (Throwable exc, A attachment) -> {
assert attachment == null;
handleException(exc);
});
}
private static <V, A> CompletionHandler<V, A> handlerFrom(
BiConsumer<V, A> completed,
BiConsumer<Throwable, A> failed) {
return new CompletionHandler<V, A>() {
@Override
public void completed(V result, A attachment) {
completed.accept(result, attachment);
}
@Override
public void failed(Throwable exc, A attachment) {
failed.accept(exc, attachment);
}
};
}
Escaping from Callback Hell
This is more a comment than a review item. As far as I looked, there is not a fluent interface yet for general Future
s. You could roll out your own, but you would have to implement dozens of methods yourself. Because of the peculiarities of Java's type system, one cannot have real monads, traits, extension methods etc facilities other languages provide for library developers that help with this burden.
EDIT
I realized that it is possible to do this:
hoping to use a more functional style and not have nested callbacks.
But I do not claim for one moment this is better than the usual callback hell.
CompletableFuture.completedFuture(
(TriConsumer<CompletionHandler<Integer, ByteBuffer>, ByteBuffer, AsynchronousSocketChannel>)
(self, bbAttachment, scAttachment) -> {
if (bbAttachment.hasRemaining()) {
scAttachment.write(bbAttachment, bbAttachment, self);
} else {
bbAttachment.clear();
}
})
.thenApply((consumer) -> (
(TriConsumer<Integer, ByteBuffer, AsynchronousSocketChannel>)
(result, buffer, scAttachment) -> {
if (result == -1) {
try {
scAttachment.close();
} catch (IOException e) {
e.printStackTrace();
}
}
scAttachment.write((ByteBuffer) buffer.flip(), buffer, handlerFrom(
(Integer result2, ByteBuffer bbAttachment, CompletionHandler<Integer, ByteBuffer> self) -> {
consumer.accept(self, bbAttachment, scAttachment);
}));
}))
.thenApply((consumer) -> (Consumer<AsynchronousSocketChannel>) connection -> {
final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
connection.read(buffer, connection, handlerFrom(
(Integer result, final AsynchronousSocketChannel scAttachment) -> {
consumer.accept(result, buffer, scAttachment);
}));
})
.thenAccept((consumer)->
listener.accept(null, handlerFrom(
(AsynchronousSocketChannel connection, Void v, CompletionHandler<AsynchronousSocketChannel, Void> self) -> {
listener.accept(null, self); // get ready for next connection
consumer.accept(connection);
})));
Where:
@FunctionalInterface
interface TriConsumer<T1, T2, T3> {
void accept(T1 t1, T2 t2, T3 t3);
}
private static <V, A> CompletionHandler<V, A> handlerFrom(TriConsumer<V, A, CompletionHandler<V, A>> completed) {
return handlerFrom(completed, (Throwable exc, A attachment) -> {
assert attachment == null;
handleException(exc);
});
}
private static <V, A> CompletionHandler<V, A> handlerFrom(
TriConsumer<V, A, CompletionHandler<V, A>> completed,
BiConsumer<Throwable, A> failed) {
return new CompletionHandler<V, A>(){
@Override
public void completed(V result, A attachment) {
completed.accept(result, attachment, this);
}
@Override
public void failed(Throwable exc, A attachment) {
failed.accept(exc, attachment);
}
};
}
Remarks
Since nio2 methods are already async, we stick with the sync methods of
CompletableFuture
.Although the callbacks are not nested anymore they are upside down.
-
\$\begingroup\$ I like the original better as it's easier to follow, but thanks for proving it's possible with the edit. Also good point about the chaining. I really missed out on the chance to do that in the original, not sure what I was thinking. \$\endgroup\$opiethehokie– opiethehokie2014年04月23日 22:10:02 +00:00Commented Apr 23, 2014 at 22:10
As far as I see
supplyAsync
andthenAcceptAsync
usesForkJoinPool.commonPool()
which does not use more than three threads on my machine (and it depends on the number of available processors). It could be a bottleneck if you have more than three clients at the same time. Both methods can have anExecutor
argument which would be used instead ofcommonPool()
.The code calls
completeExceptionally
in thecatch
block here:readerPromise.thenAcceptAsync((buffer) -> { if (buffer != null) { try { connection.write(buffer).get(); connection.close(); } catch (InterruptedException | ExecutionException | IOException e { readerPromise.completeExceptionally(e); } } });
It seems to me that it does not have any effect. Javadoc of
completeExceptionally
says the following:If not already completed, causes invocations of
get()
and related methods to throw the given exception.The lambda passed to
thenAcceptAsync
is called when theCompleteFuture
(readerPromise
in this case) is already completed so it doesn't change anything. Consider the following PoC:@Test public void test3() throws Exception { final CompletableFuture<String> reader = CompletableFuture.supplyAsync(() -> { return "data"; }); reader.thenAcceptAsync(x -> { System.out.println("reader.thenAcceptAsync: " + x); boolean transitionToCompleted = reader.completeExceptionally(new RuntimeException( "overridden Future result")); System.out.println(transitionToCompleted); // prints "false" }); Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); System.out.println("reader result: " + reader.get()); // prints "data" }
(Note that there is an
obtrudeException
method but I guess you don't need that either.)The same is true for
connectionPromise.completeExceptionally(e)
, furthermore,connectionPromise
seems completely unused in the code above. I'd remove it.thenAcceptAsync
will run only when the the previous stage completes normally. The following also could be useful:In all other cases, if a stage's computation terminates abruptly with an (unchecked) exception or error, then all dependent stages requiring its completion complete exceptionally as well, with a CompletionException holding the exception as its cause.
Source: CompletionStage javadoc
According to that you might be able to change the reader lambda to the following:
final CompletableFuture<ByteBuffer> readerPromise = CompletableFuture.supplyAsync(() -> { try { final ByteBuffer buffer = ByteBuffer.allocateDirect(1024); connection.read(buffer).get(); buffer.flip(); return buffer; } catch (final Exception e) { throw new RuntimeException(e); } });
It never returns null, so you can remove the null check from writer lambda:
final CompletableFuture<Void> writerPromise = readerPromise.thenAcceptAsync((buffer) -> { try { connection.write(buffer).get(); connection.close(); } catch (Exception e) { throw new RuntimeException(e); } });
Finally, if there's an exception, you could log that with the following:
writerPromise.exceptionally(x -> { x.printStackTrace(); // TODO: use logging instead return null; });
(I'm not familiar with these APIs but this looks right for me.)
Note that I've removed the ugly casting with changing
return (ByteBuffer) buffer.flip();
to
buffer.flip(); return buffer;
Calling
Future.get()
in the lambdas does not seem to asynchronous for me too (althought they're run by another threads) but I've not idea how should it be handled. The two APIs does not seem compatible. (I guess you could get better answers on Stack Overflow with a more specific question.)
Explore related questions
See similar questions with these tags.
listener.accept().get()
was synchronous in the Java 8 version, etc. \$\endgroup\$