I don't get to write multithreaded applications very often, so be gentle with my threads :-)
I have an API input which contains a "score" and then a bunch of child nodes which also have scores, and perhaps child nodes of their own. I need to quickly sum up the total score of the entire tree.
I've elected a newWorkStealingPool as I have read that this is a good implementation for applications where workers may spawn other workers.
My handler is awfully static. I don't always work in Java, but this felt like the appropriate solution. Correct if wrong? :-)
I think I've made thread safe the methods that could be troublesome. I'd obviously like to remove the synchronization from methods that maybe don't need it.
I'm very concerned about how I monitor the running threads to decide when my work is done. I think there is a race condition here that I am having trouble understanding, even if for now, this project works.
Don't worry too much about my throwing an Exception, please :-). It's just for now so as to not clutter up the other code.
Thank you!!
Oh! Java 8 is pretty new to me, too. I only just considered where I could use some new functionality.
Runner
import java.net.URL;
public class Runner {
public static void main(String[] args) throws Exception {
try{
NodeHandler.addNode(new Node(new URL(args[0])));
}catch(java.net.MalformedURLException e){
System.out.println("Malformed URL Exception: " + e.getMessage());
}
System.out.println(Double.toString(NodeHandler.getTotal()));
}
}
Node Handler
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class NodeHandler {
private static int runningThreads = 0;
private static double rewardsTotal = 0;
private static ExecutorService es = Executors.newWorkStealingPool();
public static synchronized void addNode(Node node) {
es.execute(node);
}
public static synchronized void incrementThreadCount() {
runningThreads++;
}
public static synchronized void decrementThreadCount() {
runningThreads--;
}
public static synchronized void updateScore(double score){
scoreTotal += score;
}
public static int getRunningThreads() {
return runningThreads;
}
public static double getTotal(){
while(getRunningThreads() > 0){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return scoreTotal;
}
}
Node
import java.util.*;
import java.net.URL;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
public class Node implements Runnable{
private URL url;
private StringBuilder rawJson = new StringBuilder();
Node(URL url){
NodeHandler.incrementThreadCount();
this.url = url;
}
private void processNode() throws Exception{
Scanner sc = new Scanner(url.openStream());
while (sc.hasNext()) {
rawJson.append(sc.nextLine());
}
sc.close();
JSONParser jParser = new JSONParser();
JSONObject jObj = (JSONObject) jParser.parse(rawJson.toString());
JSONArray children = (JSONArray) jObj.get("children");
NodeHandler.updateScore(Double.parseDouble(jObj.get("score").toString()));
if (children != null && children.size() > 0) {
for (Object o : children) {
URL url = new URL(o.toString());
NodeHandler.addNode(new Node(url));
}
}
NodeHandler.decrementThreadCount();
}
@Override
public void run(){
try {
processNode();
}catch(Exception e){
e.printStackTrace();
}
}
}
1 Answer 1
A few things that I noticed, not necessarily in order:
The globally locked and secured
total
is an unnecessary bottleneck. The aggregation you're performing is really well-suited for subresult aggregation. Instead ofupdateScore
which can only ever be entered by one thread, you should have a running aggregate as you traverse the node hierarchy.Node should not know about
NodeHandler
. You're tightly coupling these two together through a static method, which makes your code hard to test. Instead of a static class, use dependency-injectionYour termination condition isn't perfectly secure against premature termination. Also the
getTotal
is basically busy-waiting. You also only ever have multiples of seconds when running this task, which doesn't bode well for smaller node-structures.Declaring
rawJson
as a member-variable is not a good solution. The compiler may or may not notice that it's in fact a local variable and may or may not correctly free the memory you're not using anymore.Java EE has a JsonParser implementation since Java 1. That implementation crucially allows directly streaming the JSON access, so long as you're using forward access only. This allows you to keep a minimal memory-footprint. Look into
Json.createParser(InputStream)
Since you're creating tasks from your initial task, the
ForkJoinPool
and the use ofForkJoinTask
s is the "correct" model to use here. With this knowledge we can rewrite your program to be significantly more idiomatic and probably quite a bit faster :) This also allows you to not need to think about the termination condition of your operation.
-
\$\begingroup\$ do note that because of floating point errors, depending on the order of calculation you may get different results \$\endgroup\$Vogel612– Vogel6122018年08月03日 21:10:32 +00:00Commented Aug 3, 2018 at 21:10
-
\$\begingroup\$ This is all great stuff! I wanted to ask for clarification on your initial point about the running aggregate. Would this be like an aggregation thread holding a collection of our scores which we concurrently update as we traverse the nodes? Then when all tasks are finished we can sum the stored scores? And on your last point: It looks like the newWorkStealingPool is backed by a ForkJoinPool. In fact, it's just an abstraction over it. I guess I'm missing what I should change about the implementation on the threadpool here. \$\endgroup\$charstar– charstar2018年08月03日 23:56:05 +00:00Commented Aug 3, 2018 at 23:56
-
1\$\begingroup\$ @charstar re 1: I was suggesting to have a "total" inside a Node, which incorporates the total from all it's children + it's own score. That way you don't even need global state and the static total disappears. That even allows you to calculate two of these tree-structures in parallel. The threadpool itself is ... okay. It's a good solution for the abstraction of a Node being a thread. If your node was a
ForkJoinTask
instead (or even more idiomatic aRecursiveTask
), that abstraction falls apart. You won't even need an explicitly declared ForkJoinThreadPool then \$\endgroup\$Vogel612– Vogel6122018年08月04日 09:20:17 +00:00Commented Aug 4, 2018 at 9:20
Explore related questions
See similar questions with these tags.