I wrote the following snipet code within Cumulocity platform.
runForTenant
must be in a separate thread. What is your an opinion about my code. is there any need for optimization? Thanks
LinkedList<CompletableFuture<Void>> asyncThreads = new LinkedList<CompletableFuture<Void>>();
CompletableFuture<Iterable<ManagedObjectRepresentation>> response = new CompletableFuture<>();
subscriptions.runForEachTenant(() -> {
String tenant = subscriptions.getTenant();
asyncThreads.add(CompletableFuture.runAsync(() -> {
subscriptions.runForTenant(tenant, () -> {
Iterable<ManagedObjectRepresentation> objects = inventoryApi
.getManagedObjectsByFilter(customInventoryFilter).get().allPages();
if (!ObjectUtils.isEmpty(objects)) {
response.complete(objects);
}
});
})); // end of runAsync()
});
boolean allThreadsAreDone = true;
do { // until all threads are done ...
try {
return response.get(50, TimeUnit.MILLISECONDS); // wait for first result
} catch (TimeoutException e) {
// device with UUID Not found in time
// check all threads are done/closed/stopped in all fashions
allThreadsAreDone = true;
for (int i = 0; allThreadsAreDone && i < asyncThreads.size(); i++) {
allThreadsAreDone = asyncThreads.get(i).isDone();
}
} catch (ExecutionException execExc) {
logger.error("Error on lookup: " + execExc.getLocalizedMessage());
} catch (InterruptedException interExc) {
logger.error("Error on lookup: " + interExc.getLocalizedMessage());
}
} while (!allThreadsAreDone);
I want to optimize the last path of the code:
boolean allThreadsAreDone = true;
do { // until all threads are done ...
try {
return response.get(50, TimeUnit.MILLISECONDS); // wait for first result
} catch (TimeoutException e) {
// device with UUID Not found in time
// check all threads are done/closed/stopped in all fashions
allThreadsAreDone = true;
for (int i = 0; allThreadsAreDone && i < asyncThreads.size(); i++) {
allThreadsAreDone = asyncThreads.get(i).isDone();
}
} catch (ExecutionException execExc) {
logger.error("Error on lookup: " + execExc.getLocalizedMessage());
} catch (InterruptedException interExc) {
logger.error("Error on lookup: " + interExc.getLocalizedMessage());
}
} while (!allThreadsAreDone);
2 Answers 2
I have one advice, but since the code is not compiling, it's a bit hard to make a proper review.
Instead of using an java.util.LinkedList
, I suggest that you use the java.util.concurrent.ExecutorService
to handle the threads and use it to check the end of the threads.
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
CompletableFuture<Iterable<ManagedObjectRepresentation>> response = new CompletableFuture<>();
subscriptions.runForEachTenant(() -> {
String tenant = subscriptions.getTenant();
taskExecutor.execute(CompletableFuture.runAsync(() -> {
subscriptions.runForTenant(tenant, () -> {
Iterable<ManagedObjectRepresentation> objects = inventoryApi
.getManagedObjectsByFilter(customInventoryFilter).get().allPages();
if (!ObjectUtils.isEmpty(objects)) {
response.complete(objects);
}
});
}));
});
taskExecutor.shutdown();
try {
taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
}
// The threads are finished.
-
1\$\begingroup\$ If the code does not compile it's off topic and shouldn't be answered \$\endgroup\$dustytrash– dustytrash2020年01月17日 16:19:53 +00:00Commented Jan 17, 2020 at 16:19
-
\$\begingroup\$ it is not possible to show you the whole code ;) \$\endgroup\$Fortran– Fortran2020年01月20日 06:28:21 +00:00Commented Jan 20, 2020 at 6:28
-
\$\begingroup\$ Calling
taskExecutor.execute(CompletableFuture.runAsync(
does not make much sense, sinceCompletableFuture.runAsync
in this case we use ForkJoin internally. Instead pass an executor as a second parameter ofCompletableFuture.runAsync
itself. The number of threads in executor is also an open question. Also, in case of multiple parallel calls to this block, it may quickly end up in resource starvation, due to multiple executors competing for processor. Therefore it is better to pass an executor as a parameter from outside of the block, and don't shut it down. \$\endgroup\$Andrey Lebedenko– Andrey Lebedenko2020年02月07日 13:27:45 +00:00Commented Feb 7, 2020 at 13:27 -
\$\begingroup\$ Also taskExecutor.execute requires Runnable docs.oracle.com/javase/7/docs/api/java/util/concurrent/… \$\endgroup\$Fortran– Fortran2020年02月11日 13:06:57 +00:00Commented Feb 11, 2020 at 13:06
The second loop do...while
can be replaced by
asyncThreads.forEach(cf -> cf.join()); // blocks until all CompletableFutures join in any order.
Though those are not threads, per say, they are CompletableFutures.
subscriptions::runForEachTenant
do? Executes aRunnable
? Naming suggests it should execute aFunction
or aConsumer
oftenant
. \$\endgroup\$