This is my ProcessDataFileParallel.Java file. I'm taking numbers from a .txt file and putting it into an array in Java. While it works, I would like to improve the code and possibly change some algorithms to better alternatives.
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Scanner;
public class ProcessDataFileParallel {
public static void main(String[] args) throws IOException {
int[] array = new int[100000];
int index = 0;
Scanner inputFile = null;
Scanner scan = new Scanner(System.in);
boolean running = false;
String emptyString;
FileReader file = new FileReader("dataset529.txt");
BufferedReader br = new BufferedReader(file);
emptyString = br.readLine();
try {
while ((emptyString = br.readLine()) != null) {
array[index++] = Integer.parseInt(emptyString);
}
} finally {
if (br.readLine() == null)
br.close();
}
inputFile = new Scanner(file);
// Read File from Dataset529.txt
if(inputFile != null){
System.out.println("Number of integers in: " + file);
try{
while (inputFile.hasNext() && inputFile.hasNextInt()){
array[index] = inputFile.nextInt();
index++;
inputFile.next();
}
}finally{
inputFile.close();
}
// Print dataset529.txt to Array in Java with For Loop below.
for (int ai = 0; ai < index; ai++){
System.out.println("Array Index: " + "(" + ai + ")" + "== >" + " " + array[ai]);
}
System.out.println("How many Threads?"); // Scanner - Ask user how many threads
int n = scan.nextInt(); // Create variable 'n' to handle whatever integer the user specifies. nextInt() is used for the scanner to expect and Int.
Thread[] myThreads = new Thread[n]; // Variable n placed here to determine how many Threads user wanted
Worker[] workers = new Worker[n]; // Variable N placed here - the number user specified will determine # of workers.
int range = array.length/n; //divides the array into n threads (what scanner specifies)
for (index = 0; index < n; index ++){
int start = index*range;
int end = start + range;
workers[index] = new Worker(start,end, array);
}
for( index = 0; index < n; index++){
myThreads[index] = new Thread(workers[index]);
myThreads[index].start();
}
if (running){
for (Thread t : myThreads) {
if (t.isAlive()) {
running = true;
break;
}
}
} while (running);
for (Worker worker : workers) {
System.out.println("Max = " + worker.getMax());
}
}
}
}
Worker
class:
public class Worker implements Runnable {
private int start;
private int end;
private int randomNumbers[];
int max = Integer.MIN_VALUE;
public Worker(int start, int end, int[] randomNumbers) {
this.start = start;
this.end = end;
this.randomNumbers = randomNumbers;
}
public void run() {
for (int index = start; index < end; index++) {
if (randomNumbers[index] >max)
max = randomNumbers[index];
}
}
public int getMax() {
return max;
}
}
Is there any way I can improve/condense my code? Also, is there any way to possibly do alternatives that would be better?
-
\$\begingroup\$ Is this meant to be more like an academic exercise on multi-threading? Also, are you on Java 8? \$\endgroup\$h.j.k.– h.j.k.2015年10月30日 03:06:57 +00:00Commented Oct 30, 2015 at 3:06
1 Answer 1
Bug
If the array length is not a multiplier of the number of threads, you'll miss the remainder! Quick example:
public static void printRange(int length, int n) {
int range = length / n;
for (int index = 0; index < n; index ++){
int start = index * range;
int end = start + range;
System.out.printf("[%s, %s)%n", start, end);
}
}
// Output for printRange(10, 3):
[0, 3)
[3, 6)
[6, 9)
// Output for printRange(10, 6):
[0, 1)
[1, 2)
[2, 3)
[3, 4)
[4, 5)
[5, 6)
As you can see, you'll miss the 10th element for printRange(10, 3)
, and up to half the array for printRange(10, 6)
.
edit: A simplistic approach is to spin a new Thread
to handle the remaining elements, but as you can see from the second example, you will end up a relatively high number of five elements for it, whereas the first five threads are only processing (and returning) one element. An alternative approach is to add an extra element per Thread
so that you have more threads doing slightly more work, than one single thread doing most of the work.
Multi-threading
The preferred way of doing multi-threading since Java 5 is to use an ExecutorService
to help you manage the lifecycle of threads. You should read up more on Oracle's tutorial to understand how they are used.
In addition, since you want each thread to compute and return a result for you, you should be looking at the Callable
interface. Implementations override the call()
method to return an appropriate result.
try-with-resources
If you are on Java 7 and above, you should use try-with-resources
for efficient handling of the underlying I/O resource:
public static void main(String[] args) {
try (Scanner scanner = new Scanner(System.in)) {
// ...
}
}
Hard-coding array length
int[] array = new int[100000];
What happens if your file has more than this number of elements? Actually, you also seem to be processing your file twice, once using the BufferedReader
approach, and the second using a Scanner
. Is this expected?
edit:
Getting the max value
Your current implementation forgets to actually compare the max of each partition with each other to arrive at the desired result.
Is this meant to be more like an academic exercise on multi-threading? - myself
The reason why I'm asking is that while it looks like you're looking more at a map-reduce approach to your problem, it may not even be necessary in the first place. Processing 100,000 integers (based on your int[]
declaration) or less in a single thread is relatively fast enough on any modern hardware capable of running the JVM. If you are however trying to compare billions of numbers and/or given a (strangely low) memory constraint, then that's when multi-threading will help. However, your question will be quite different at that stage.
Getting the max value (part 2)
From Java 8 onwards, there are two approaches of running tasks asynchronously:
ExecutorService
to createFuture
s, and then getting the results from each of them.- Chaining
CompletableFuture
s (Java 8 onwards).
The code below demonstrates both:
public class AsyncMax {
private static final int MAX = 10;
public static void main(String[] args) {
System.out.println("Using ExecutorService:");
ExecutorService service = Executors.newWorkStealingPool();
List<Future<Integer>> esResults = IntStream.range(0, MAX).parallel()
.mapToObj(i -> service.submit(() -> produce(i).get()))
.collect(Collectors.toList());
esResults.stream()
.map(f -> get(f))
.max(Comparator.naturalOrder())
.ifPresent(System.out::println);
service.shutdown();
System.out.println("Using CompletableFuture:");
CompletableFuture<List<Integer>> cfResults = IntStream.range(0, MAX).parallel()
.mapToObj(i -> produce(i))
.map(CompletableFuture::supplyAsync)
.collect(allOf());
cfResults.thenApply(results -> results.stream().max(Comparator.naturalOrder()))
.join()
.ifPresent(System.out::println);
}
}
For both approaches, IntStream.range(0, MAX).parallel()
is used to parallelize the production of MAX
integers, via the produce(int)
method (shown below). The intermediary esResults
and cfResults
variables are created solely to highlight where the asynchronous tasks will 'end', to be followed by how the results are collected. One can certainly daisy-chain the method calls completely.
ExecutorService
:- Start an
ExecutorService
implementation, the Java 8newWorkStealingPool()
is an example here. submit()
aCallable<Integer>
that returns anInteger
upon completion.- Collect the results to a
List<Future<Integer>>
, i.e. aList
ofFuture
s that returnInteger
s. - Stream the
List
byget()
-ting (method shown below) each resultingInteger
. - From the
Stream<Integer>
, callmax(Comparator.naturalOrder())
to get the maximum value, and then print it viaifPresent(Consumer)
. - Last,
shutdown()
theExecutorService
.
- Start an
CompletableFuture
:- Pass each
Supplier<Integer>
fromproduce(int)
toCompletableFuture.supplyAsync(Supplier)
. - Collect each
CompletableFuture<Integer>
into a resultingCompletableFuture<List<Integer>>
via a customallOf()
Collector
(method shown below). - From that
CompletableFuture
,thenApply()
it to aFunction<List<Integer>, Integer>
that derives the maximum value from theList<Integer>
. join()
to get the result and print it viaifPresent(Consumer)
.
- Pass each
Supporting methods:
This produces the given
int
aftersleep()
-ing for some time:private static Supplier<Integer> produce(int i) { return () -> { try { Thread.sleep((int) ((1 + Math.random()) * (MAX - i) * 500)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } System.out.printf("Returning [%s] from Thread[%s]%n", i, Thread.currentThread().getName()); return i; }; }
This is a
Stream
-friendly way ofget()
-ing from aFuture
, by catching checkedException
s first:private static <T> T get(Future<T> future) { try { return future.get(); } catch (InterruptedException | ExecutionException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } }
This is a
Collector
implementation for waiting on allCompletableFuture
s to finish, before returning theList
of results (inspired by this article):private static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> allOf() { return Collector.of(ArrayList<CompletableFuture<T>>::new, (a, t) -> a.add(t), (a, b) -> { a.addAll(b); return a; }, a -> CompletableFuture.allOf( a.toArray(new CompletableFuture[a.size()])) .thenApply(v -> a.stream().map(CompletableFuture::join) .collect(Collectors.toList()))); }
Conclusion:
Instead of thinking solely on having to synchronize write access on a resulting array, both the ExecutorService
or CompletableFuture
approaches are more declarative, and IMO the latter more so that the former. This is achieved by letting us express how to generate our asynchronous tasks, followed by what needs to be done after gathering the results. :)
-
\$\begingroup\$ I'm not very good at this stuff, but I'm trying to basically take what I've learned in the past and incorporate Multi-threading. It wasn't my intention to process the file twice. I just wanted to find the max in the file, but split into threads to help speed up the process. Are there any good examples that you know of using executor service using arrays? \$\endgroup\$user3577397– user35773972015年10月30日 04:05:53 +00:00Commented Oct 30, 2015 at 4:05
-
\$\begingroup\$ @user3577397 see updated answer. \$\endgroup\$h.j.k.– h.j.k.2015年10月30日 06:19:58 +00:00Commented Oct 30, 2015 at 6:19
-
1\$\begingroup\$ Yes, it's an academic exercise. I read that this is more practical for dealing with billions of numbers, but it's just meant to teach me the concept. I guess I could change the size of the array to billions, and generate a text file with billions of numbers. Prior to attempting threading, I was able to find the max through 100k numbers with just standard code. Then I attempted to take code I already wrote and add Threading functionality to it. And that's what I came up with above. Thanks for helping me. I'll try to take your tips and improve it \$\endgroup\$user3577397– user35773972015年10月30日 13:59:41 +00:00Commented Oct 30, 2015 at 13:59
-
1\$\begingroup\$ @user3577397 see updated answer. In short, don't think about placing the results into a array where you need to worry about synchronizing write access. Use either methods described to better express the steps required. \$\endgroup\$h.j.k.– h.j.k.2015年11月01日 13:15:06 +00:00Commented Nov 1, 2015 at 13:15
-
1\$\begingroup\$ Thank you, you've been really helpful. I'll try to incorporate these. \$\endgroup\$user3577397– user35773972015年11月02日 17:47:57 +00:00Commented Nov 2, 2015 at 17:47
Explore related questions
See similar questions with these tags.