3
\$\begingroup\$

I want to implement reactive write to file. In low level, it uses NIO and Futures. After reading this section I created this code but maybe there's more clean or less code solution.

Thanks.

public class LocalImageService implements IImageService {
 private String imageDir = "/tmp";
@Override
public Mono<String> storeImage(@NonNull InputStream imageStream, @NonNull String ext) {
 val processor = new ImageProcessor();
 processor.fileName = UUID.randomUUID() + "." + ext;
 val targetFile = new File(imageDir, processor.fileName);
 final Flux<CompletableFuture<Integer>> bridge = Flux.push(sink -> {
 try {
 processor.sink = sink;
 processor.inputChannel = Channels.newChannel(imageStream);
 processor.fileChannel = AsynchronousFileChannel.open(targetFile.toPath(),
 StandardOpenOption.CREATE, StandardOpenOption.WRITE);
 processor.processNextChunk();
 } catch (IOException e) {
 processor.onError(new RuntimeException("Can not write image file: " + processor.fileName, e));
 }
 });
 return bridge.flatMap(Mono::fromFuture)
 .reduce(0, (acc, num) -> {
 processor.buffer.compact();
 processor.processNextChunk();
 return acc + num;
 })
 .then(Mono.just(targetFile.getPath()));
}
private static final class ImageProcessor {
 int position = 0;
 ReadableByteChannel inputChannel;
 AsynchronousFileChannel fileChannel;
 FluxSink<CompletableFuture<Integer>> sink;
 final ByteBuffer buffer = ByteBuffer.allocate(4096);
 String fileName;
 void onComplete() {
 closeStreams();
 if (null != sink) {
 sink.complete();
 }
 }
 private void closeStreams() {
 IOUtils.closeQuietly(inputChannel);
 IOUtils.closeQuietly(fileChannel);
 }
 void onError(Throwable e) {
 closeStreams();
 if (null != sink) {
 sink.error(e);
 }
 }
 void processNextChunk() {
 try {
 int receivedBytes;
 if ((receivedBytes = inputChannel.read(buffer)) >= 0 || buffer.position() > 0) {
 buffer.flip();
 val lastWrite = fileChannel.write(buffer, position);
 position += receivedBytes;
 if (lastWrite != null) {
 sink.next(CompletableFuture.supplyAsync(() -> {
 try {
 return lastWrite.get();
 } catch (InterruptedException | ExecutionException e) {
 onError(new RuntimeException("Can not write image file: " + fileName, e));
 return -1;
 }
 }));
 }
 } else {
 onComplete();
 }
 } catch (IOException e) {
 onError(new RuntimeException("Can not write image file: " + fileName, e));
 }
 }
}
}
asked Nov 10, 2017 at 13:15
\$\endgroup\$
1

0

Know someone who can answer? Share a link to this question via email, Twitter, or Facebook.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.