I just started learning reactive streams with RxJava. After reading a couple of books and a lot of articles our there I am still having trouble understanding how to coordinate multiple threads.
I would appreciate a critique of the following code, particularly if anyone knows a better way to do it.
I basically want to execute multiple tasks in the background, and as results start to get back I want to get immediately notified of the results. I want the background tasks to run in individual threads, but I want the notification thread to run in a single thread.
My idea is that the notification code does not need to worry about synchronization and I can achieve that by making sure the notification code only runs in one thread.
This was my best attempt of that:
public class Question {
public static void main(String[] args) {
//A hypothetical list of tasks to run asynchronously
List<Callable<String>> tasks = Arrays.asList(
() -> "One: " + Thread.currentThread().getName(),
() -> "Two: " + Thread.currentThread().getName(),
() -> "Three: " + Thread.currentThread().getName()
);
//A blocking queue to hold the results when ready
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
//when a task is done, this observer puts it in the queue.
//observer code then will run in the currently processing thread
Observer<String> observer = Observers.create(queue::offer);
tasks.stream().map(Question::async).forEach(result -> result.subscribe(observer));
//as tasks get resolve and enter the queue, this other observer process the results
//in my current thread, not in any of the task threads.
consumer(queue, 3).forEach(item -> {
System.out.println("Received " + item + " at " + Thread.currentThread().getName());
});
}
static <T> Observable<T> async(Callable<T> supplier) {
return Observable.<T>create(subscriber -> {
try {
subscriber.onNext(supplier.call());
subscriber.onCompleted();
}
catch (Exception ex) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(ex);
}
}
}).subscribeOn(Schedulers.computation());
}
static <T> Observable<T> consumer(BlockingQueue<T> queue, int count) {
return Observable.<T>create(subscriber -> {
for (int i = 0; i < count; i++) {
try {
T text = queue.take();
subscriber.onNext(text);
}
catch (InterruptedException ex) {
subscriber.onError(ex);
}
}
subscriber.onCompleted();
}).observeOn(Schedulers.immediate());
}
}
And I get:
Received One: RxComputationThreadPool-1 at main Received Two: RxComputationThreadPool-2 at main Received Three: RxComputationThreadPool-3 at main
So, this works like a charm, but I have the feeling it is still too verbose and that maybe there is a way to make it simpler with RxJava somehow.
-
\$\begingroup\$ It looks like your output is from a different version of the program than the source posted in your question? \$\endgroup\$VoiceOfUnreason– VoiceOfUnreason2015年10月17日 04:46:56 +00:00Commented Oct 17, 2015 at 4:46
-
\$\begingroup\$ @VoiceOfUnreason It looks like it was the result of some editing of the code that I posted, but I just fixed it. \$\endgroup\$Edwin Dalorzo– Edwin Dalorzo2015年10月17日 08:41:39 +00:00Commented Oct 17, 2015 at 8:41
1 Answer 1
You are jumping through a lot of hoops to get your queue, consumer, and observer strategies aligned. I look at your code, and I don't really follow it all. Part of the reason is that I don't know RxJava at all, but, normally, I can follow these things without much research.
So, your code streams the tasks, for each one, it creates an Observable for it which is subscribed on the Reactive computation engine. The observer for the Observable is set to add the result to the queue.
Now, for the queue, you create an Observer that registers one action for each expected result. You then loop over those events, and print the result.
I can understand why you may want to use RxJava for some situations, but, that last part makes no sense to me... Let me show you your code:
//as tasks get resolve and enter the queue, this other observer process the results //in my current thread, not in any of the task threads. consumer(queue, 3).forEach(item -> { System.out.println("Received " + item + " at " + Thread.currentThread().getName()); });
where consumer is:
static <T> Observable<T> consumer(BlockingQueue<T> queue, int count) { return Observable.<T>create(subscriber -> { for (int i = 0; i < count; i++) { try { T text = queue.take(); subscriber.onNext(text); } catch (InterruptedException ex) { subscriber.onError(ex); } } subscriber.onCompleted(); }).observeOn(Schedulers.immediate()); }
What does that code do? It looks for three strings on a queue, and prints them.
for (int i = 0; i < 3; i++) {
System.out.println("Received " + queue.take() + " at " + Thread.currentThread().getName());
}
What's wrong with that (apart from the 3 constant)? ^^^^
Having figured out all the loops you are going through, I struggled to understand the advantage that RxJava is giving you, especially when Java has already got the "right tool" in the java.util.concurrent.*
toolbox.
Let me explain, the CompletionService
concept says: Submit a bunch of jobs to the service, and the service will tell you when they complete. It has a submit(...)
method to add jobs to the service, and a take(...)
method which will wait until the next-completed task, and return it.
Creating a Thread Factory that matches the names you have in RxJava quickly, I have this code:
private static final AtomicInteger threadId = new AtomicInteger(0);
private static Thread threadFactory(Runnable r) {
Thread t = new Thread(r, "RxComputationThreadPool-" + threadId.incrementAndGet());
t.setDaemon(true);
return t;
}
private static ExecutorService pool = Executors.newCachedThreadPool(Reactive::threadFactory);
That code creates an ExecutorService which has threads named things like your code's results.
Now, with that code available, I can have a really simple problem space:
public static void main(String[] args) throws InterruptedException, ExecutionException {
List<Callable<String>> tasks = Arrays.asList(
() -> "One: " + Thread.currentThread().getName(),
() -> "Two: " + Thread.currentThread().getName(),
() -> "Three: " + Thread.currentThread().getName()
);
int count = tasks.size();
CompletionService<String> completor = new ExecutorCompletionService<>(pool);
tasks.stream().forEach(completor::submit);
for (int i = 0; i < count; i++) {
System.out.println("Received: " + completor.take().get() + " at " + Thread.currentThread().getName());
}
}
Note that the CompletionService returns a completed Future<String>
, not the actual String
, so you have to get()
the string from the Future<String>
.
I have put this code up on to ideone so you can see it in action.
Note, the solution I provided has no need for RxJava, and that's because, if you want to execute a bunch of things asynchronously, and retrieve the results in a single thread, then perhaps the observable pattern is not the right one....
-
\$\begingroup\$ Thanks for the feedback. As a matter of fact I had too implemented this exact future already using precisely the
ExecutorCompletionService
. However I've been reading about the power of reactive streams and I wanted to see if I could do it better that way. So far I haven't been able to make it simpler and this question was about me trying to find out if it is about my lack of skills with reactive streams or if indeed it cannot be made simpler this way. So, your suggestion is great, and it's compatible with what I had in the past, but my interest is on figuring in out the RxJava way. \$\endgroup\$Edwin Dalorzo– Edwin Dalorzo2015年10月17日 18:41:58 +00:00Commented Oct 17, 2015 at 18:41