I got a piece of Java code using Hadoop to calculate min, max, average and variance on a large dataset made of (index value) couples separated by a newline:
0 11839923.64831265
1 5710431.90800272
It's compiled locally and run on a remote distributed HDFS instance by a sh script.
My main concerns are:
if the output is collected in a different order, the thing just stops working, and instead of returning one result for each key, it prints the same key over and over, filling the terminal
it creates a new
Text
instance every time, which looks really inefficient, but when I tired to use a single shared constant, it stopped working. Maybe using anEnum
would do fine, but I don't feel like changing it until I fixed the previous point.it's using a
Scanner
inside the mapper to treat multi-line inputs properly, but only single-line input is showing up. Does Hadoop guarantee each mapper only receives one-line inputs, or is the remote setup that makes it so?it uses the "old" approach of extending the
MapReduceBase
class and implementing theMapper
/Reducer
interface. I've read that, with the new 2.0 APIs it's sufficient to extend oneMapper
orReducer
class. Yet, I can't find any migration doc with simple migration doc, and the official WordCount example tutorial is stuck at r1.2.1. EDIT: found a reference for this. And another one here.
Here's the code:
package org.myorg;
import java.io.IOException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.util.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class calcAll {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, DoubleWritable> {
public void map(LongWritable key, Text value, OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws IOException {
// this will work even if we receive more than 1 line
Scanner scanner = new Scanner(value.toString());
String line;
String[] tokens;
double observation;
while (scanner.hasNext()) {
line = scanner.nextLine();
tokens = line.split("\\s+");
observation = Double.parseDouble(tokens[1]);
output.collect(new Text("values"), new DoubleWritable(observation));
}
}
}
public static class Combine extends MapReduceBase implements Reducer<Text, DoubleWritable, Text, DoubleWritable> {
public void reduce(Text key, Iterator<DoubleWritable> values, OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws IOException {
double count = 0d; // should be an int, but anyway...
double min = Double.MAX_VALUE;
double max = Double.MIN_VALUE;
double sum = 0d;
double sumSquared = 0d;
double value;
while (values.hasNext()) {
++count;
value = values.next().get();
min = Math.min(min, value);
max = Math.max(max, value);
sum += value;
sumSquared += value * value;
}
// keep in alphabetical order or KABOOM!
output.collect(new Text("count"), new DoubleWritable(count));
output.collect(new Text("max"), new DoubleWritable(max));
output.collect(new Text("min"), new DoubleWritable(min));
output.collect(new Text("sum"), new DoubleWritable(sum));
output.collect(new Text("sumSquared"), new DoubleWritable(sumSquared));
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, DoubleWritable, Text, DoubleWritable> {
public void reduce(Text key, Iterator<DoubleWritable> values, OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws IOException {
if (key.equals(new Text("count"))) {
double count = 0d;
double value;
while (values.hasNext()) {
value = values.next().get();
count += value;
}
output.collect(new Text("count"), new DoubleWritable(count));
}
if (key.equals(new Text("max"))) {
double max = Double.MIN_VALUE;
double value;
while (values.hasNext()) {
value = values.next().get();
max = Math.max(max, value);
}
output.collect(new Text("max"), new DoubleWritable(max));
}
if (key.equals(new Text("min"))) {
double min = Double.MAX_VALUE;
double value;
while (values.hasNext()) {
value = values.next().get();
min = Math.min(min, value);
}
output.collect(new Text("min"), new DoubleWritable(min));
}
if (key.equals(new Text("sum"))) {
double sum = 0d;
double value;
while (values.hasNext()) {
value = values.next().get();
sum += value;
}
output.collect(new Text("sum"), new DoubleWritable(sum));
}
if (key.equals(new Text("sumSquared"))) {
double sumSquared = 0d;
double value;
while (values.hasNext()) {
value = values.next().get();
sumSquared += value;
}
output.collect(new Text("sumSquared"), new DoubleWritable(sumSquared));
}
}
}
public static boolean applySecondPass(Path in, Path out) {
double count = 0d, max = 0d, min = 0d, sum = 0d, sumSquared = 0d;
try (FileSystem fs = FileSystem.get(new Configuration());
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(in)));) {
String line;
String[] words;
line = br.readLine();
while (line != null) {
words = line.split("\\s+");
switch (words[0]) {
case "count":
count = Double.parseDouble(words[1]);
break;
case "max":
max = Double.parseDouble(words[1]);
break;
case "min":
min = Double.parseDouble(words[1]);
break;
case "sum":
sum = Double.parseDouble(words[1]);
break;
case "sumSquared":
sumSquared = Double.parseDouble(words[1]);
break;
}
line = br.readLine();
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
double avg = sum / count;
// (Sum_sqr - (Sum*Sum)/n)/(n - 1)
double variance = (sumSquared - (sum * sum) / count) / (count - 1);
try (FileSystem fs = FileSystem.get(new Configuration());
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(out, true)));) {
String line;
line = "avg\t" + String.valueOf(avg) + System.lineSeparator();
bw.write(line);
line = "variance\t" + String.valueOf(variance) + System.lineSeparator();
bw.write(line);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(calcAll.class);
conf.setJobName("calcAll");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(DoubleWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Combine.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
Path out1 = new Path(args[1]);
FileOutputFormat.setOutputPath(conf, out1);
JobClient.runJob(conf); // blocking call
// the output is a set of files, merge them before continuing
Path out1Merged = new Path(args[2]);
Configuration config = new Configuration();
try {
FileSystem hdfs = FileSystem.get(config);
FileUtil.copyMerge(hdfs, out1, hdfs, out1Merged, false, config, null);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
// calculate on job output
boolean success = applySecondPass(out1Merged, new Path(args[3]));
System.out.println("Second pass successful? " + success);
System.exit(success ? 1 : 0);
}
}
-
\$\begingroup\$ How large is your dataset? Unless it can't fit on a typical hard-drive, I would re-examine whether you even need to MR. \$\endgroup\$Pat– Pat2014年03月02日 00:10:50 +00:00Commented Mar 2, 2014 at 0:10
-
\$\begingroup\$ Several GBs. These are just base calculations that could be done with multi-threading, but, I'll need to do more complex stuff later, so I'd better learn MR now. \$\endgroup\$Agostino– Agostino2014年03月03日 13:16:42 +00:00Commented Mar 3, 2014 at 13:16
1 Answer 1
I'm not familiar with Hadoop, take my advice with care.
Are you sure that the following is precise enough?
double count = 0d; // should be an int, but anyway...
Consider the following:
final double original = 123456789123456789.0; double d = original; d++; System.out.println(d == original); // true System.out.printf("%f%n", original); // 123456789123456784,000000 System.out.printf("%f%n", d); // 123456789123456784,000000
I'd use a
long
there. I guess it's not a problem here but if I'm right the reducer could suffer from this.See also:
- Why not use Double or Float to represent currency?
- Effective Java, 2nd Edition, Item 48: Avoid float and double if exact answers are required
tokens = line.split("\\s+");
The implementation of
split
was simply this in Java 6:public String[] split(String regex, int limit) { return Pattern.compile(regex).split(this, limit); }
Java 7 contains some fast-paths, but at the end it still calls:
return Pattern.compile(regex).split(this, limit);
In your case (with the
\\s+
pattern) it does not use any fast-path, so it might be worth to store the compiledPattern
instance and callsplit
on that. (I guess it would be faster but the JVM might cache that for you.)double count = 0d, max = 0d, min = 0d, sum = 0d, sumSquared = 0d;
I'd put the variable declarations to separate lines. From Code Complete 2nd Edition, p759:
With statements on their own lines, the code reads from top to bottom, instead of top to bottom and left to right. When you’re looking for a specific line of code, your eye should be able to follow the left margin of the code. It shouldn’t have to dip into each and every line just because a single line might contain two statements.
Additionally,
max
andmin
is not used (they are only written), you could safely remove them (or might want to print them to the output).@Override public void reduce(Text key, Iterator<DoubleWritable> values, OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws IOException { if (key.equals(new Text("count"))) {
If
Text
is thread-safe/immutable you could storenew Text("count")
(and the other ones) in fields instead of constructing them on every call for equals as well as foroutput.collect()
:output.collect(new Text("sumSquared"), new DoubleWritable(sumSquared));
It looks like that
sum
andsumSquared
have the same implementation in the reducer. Is that a bug? If not you could create a method to eliminate the duplicated logic.sum
,sumSquared
,min
,max
should be constants instead of magic numbers. There are used multiple times.Declaring variables before their usage with a bigger scope looks microoptimization:
String line; String[] tokens; double observation; while (scanner.hasNext()) { line = scanner.nextLine(); tokens = line.split("\\s+"); observation = Double.parseDouble(tokens[1]);
It would be readable declaring them inside the loop. (Effective Java, Second Edition, Item 45: Minimize the scope of local variables)
String line; String[] words; line = br.readLine(); while (line != null) { words = line.split("\\s+");
also could be
String line = br.readLine(); while (line != null) { final String[] words = line.split("\\s+");
The reduce method contains a lot of similar structures. I'd consider using a function interface with some implementations:
public interface AggregateFunction { void addValue(double value); double getResult(); } public class MaxFunction implements AggregateFunction { private double max = Double.MAX_VALUE; @Override public void addValue(final double value) { max = Math.max(value, max); } @Override public double getResult() { return max; } } public class SumFunction implements AggregateFunction { private double sum = 0.0; @Override public void addValue(final double value) { sum += value; } @Override public double getResult() { return sum; } }
The will reduce the
Reducer
:public class Reduce extends MapReduceBase implements Reducer<Text, DoubleWritable, Text, DoubleWritable> { // assuming that Text is thread-safe/immutable private final Text countKey = new Text("count"); private final Text maxKey = new Text("max"); private final Text minKey = new Text("min"); private final Text sumKey = new Text("sum"); private final Text sumSquaredKey = new Text("sumSquared"); @Override public void reduce(Text key, Iterator<DoubleWritable> values, OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws IOException { aggregate("count", countKey, values, output, new SumFunction()); aggregate("max", maxKey, values, output, new MaxFunction()); aggregate("min", minKey, values, output, new MinFunction()); aggregate("sum", sumKey, values, output, new SumFunction()); aggregate("sumSquared", sumSquaredKey, values, output, new SumSquaredFunction()); } }
line = br.readLine(); while (line != null) { words = line.split("\\s+"); switch (words[0]) { ... } line = br.readLine(); }
I'd restructure the loop for better readability:
while (true) { final String line = br.readLine(); if (line == null) { break; } final String[] words = line.split("\\s+"); ... }
String line; line = "avg\t" + String.valueOf(avg) + System.lineSeparator(); bw.write(line); line = "variance\t" + String.valueOf(variance) + System.lineSeparator(); bw.write(line);
The
line
variable is used for multiple purposes. Using separate variable would make the code easier to read.final String averageLine = "avg\t" + String.valueOf(avg) + System.lineSeparator(); bw.write(averageLine); final String varianceLine = "variance\t" + String.valueOf(variance) + System.lineSeparator(); bw.write(varianceLine);
Anyway, using a
PrintWriter
would be the best:bw.printf("avg\t%d%n", avg); bw.printf("variance\t%d%n", variance);
Instead of commenting write a unit test for it:
// keep in alphabetical order or KABOOM!
It is much safer, especially if you use continuous integration or run unit test automatically on every build.
There are unused imports:
import org.apache.hadoop.util.*;
It's cleaner omit them. Eclipse could do that for you with a keypress (Ctrl+Shift+O).
public class calcAll {
According to the Code Conventions for the Java Programming Language class names should start with uppercase letters.
I'd try to give a more descriptive name,
SomethingStatistics
or something similar.- Code Conventions for the Java Programming Language, 9 - Naming Conventions
- Effective Java, 2nd edition, Item 56: Adhere to generally accepted naming conventions
Calling the mapper
Map
is confusing (since there is ajava.util.Map
too). I'd choose something more descriptive. The same is true forReduce
.Class names should be nouns, not verbs, like
Map
orReduce
.
-
\$\begingroup\$ Your answer has lots of good points, I'll upvote it, but I cannot accept it until. I truly need to figure out the points I have expressed in the question. My main concern is the first point, the collection of the output is extremely fragile now. I'll comment on your points, though 1. this can be taken care of when everything else works. It requires changing all DoubleWritable types to a more generic ObjectWritable, and then casting to either DoubleWritable or IntWritable 2. I'll probably drop the splitting altogether if I can be sure I always receive one-liners as input \$\endgroup\$Agostino– Agostino2014年03月03日 18:59:26 +00:00Commented Mar 3, 2014 at 18:59
-
\$\begingroup\$ 3. Already tried that, but it breaks, it's probably related to how Hadoop stores the keys 4. It's fine, min and max are used by an external sh file 5. It's fine, I could put them in the same if 6. OK 7. I got the habit in my C++ days, I prefer not to trust the Java compiler for optimizations 8. I don't get it, especially the "aggregate" call, I see no "aggregate" function in the reducer 9. I prefer not to use break statements and instead make the exit condition clear 10. OK, good one 11. this is my biggest problem, I need to solve it, not just better advertise it 12. OK 13. OK 14. OK, MyMapper \$\endgroup\$Agostino– Agostino2014年03月03日 18:59:51 +00:00Commented Mar 3, 2014 at 18:59
Explore related questions
See similar questions with these tags.