I'm currently learning how to use Apache Spark. In order to do so, I implemented a simple wordcount (not really original, I know). There already exists an example on the documentation providing the needed code. However, I tried to go one step further and to produce the result as a sorted list (by frequence) by using the spark modules.
A use case would be sortedWordcount("I fish a fish") -> [(fish, 2), (I,1), (a,1))]
Here is how I proceed:
public List<String> sortedWordcount() {
SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("myFile.txt");
// classical wordcount, nothing new there
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) {
return Arrays.asList(s.split(" "));
}
});
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) {
return a + b;
}
});
// to enable sorting by value (count) and not key -> value-to-key conversion pattern
JavaPairRDD<Tuple2<Integer, String>, Integer> countInKey = counts.mapToPair(a -> new Tuple2(new Tuple2<Integer, String>(a._2, a._1), null)); // setting value to null, since it won't be used anymore
List<Tuple2<Tuple2<Integer, String>, Integer>> wordSortedByCount = countInKey.sortByKey(new TupleComparator(), false).collect();
List<String> result = new ArrayList<>();
IntStream.range(0, wantedSize).forEach(i -> result.add(wordSortedByCount.get(i)._1._2));
return result;
}
private class TupleComparator implements Comparator<Tuple2<Integer, String>>, Serializable {
@Override
public int compare(Tuple2<Integer, String> tuple1, Tuple2<Integer, String> tuple2) {
return tuple1._1 < tuple2._1 ? 0 : 1;
}
}
To sum up on this code work:
- it stores the text as a list of word
- it changes this list of word into a list of tuple, each tuple being
(word, 1)
withword
as key - it combine tuples with the same key and counts them, tuples are now
(word, freqWord)
- it changes the way tuples are displayed, they now become
((freqWord, word), null)
- tuples are sorted by key (see the
Comparator
implementation)
My main concern is about performances, but I'll gladly welcome any critiques on any aspects of this code. Is there any way to achieve the same goal in less / more efficient steps?
2 Answers 2
I'll review this code with keeping in mind that you want to leave Apache Spark intact, as you intend to use it in the future for purposes it is intended to do.
I strongly believe that your current code can be written in pure Java 8, though that is not the point of the question.
Use lambdas wherever you can!
This will greatly enhance readability of your code.
As an example:
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) {
return Arrays.asList(s.split(" "));
}
});
could be rewritten as the following (I checked and FlatMapFunction
is indeed an interface that qualifies to be a functional interface):
JavaRDD<String> words = lines.flatMap(str -> Arrays.asList(s.split(" ")));
Similarly it holds that:
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
can be rewritten as:
JavaPairRDD<String, Integer> pairs = words.mapToPair(str -> new Tuple2<>(s, 1));
If type inference is not strong enough, then you need to write: new Tuple2<String, Integer>(s, 1)
.
Lastly it also holds for:
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) {
return a + b;
}
});
can be written as:
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
or even better:
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(Integer::sum);
Use the new way to write comparators
You can much much easier write comparators in Java 8, it works via key extraction, you can turn this code:
private class TupleComparator implements Comparator<Tuple2<Integer, String>>, Serializable {
@Override
public int compare(Tuple2<Integer, String> tuple1, Tuple2<Integer, String> tuple2) {
return tuple1._1 < tuple2._1 ? 0 : 1;
}
}
into this code:
Comparator<Tuple2<Integer, String>> tupleComparator = Comparator.comparing(tuple2 -> tuple2._1);
This will automatically and automagically create a working comparator that works by comparator the _1
field of tuple2
, which is inferred to be Tuple2<Integer, String>
.
Now this line:
List<Tuple2<Tuple2<Integer, String>, Integer>> wordSortedByCount = countInKey.sortByKey(new TupleComparator(), false).collect();
can become:
List<Tuple2<Tuple2<Integer, String>, Integer>> wordSortedByCount =
countInKey.sortByKey(Comparator.comparing(tuple2 -> tuple2._1, false).collect();
Normally one would use method references in case of Comparator.comparing
, if Tuple2
had a get1()
method, then you would write it as Comparator.comparing(Tuple2::get1)
.
Upon further inspecting your code I see that you possibly intentionally have made your comparator serializable, if you need that for Apache Spark, then you can do that as well, though the solution does get a fair bit uglier then:
List<Tuple2<Tuple2<Integer, String>, Integer>> wordSortedByCount =
countInKey.sortByKey((Comparator<Tuple2<Integer, String>> & Serializable)Comparator.comparing(tuple2 -> tuple2._1, false).collect();
Via this mechanism it is possible to make a type implement a wrapper interface without writing boilerplate. It is more common in the form of (Runnable & Serializable)someRunnable
than in this form though.
Final words
Lastly your code formatting is in need for some improvements with respect to indenting the anonymous classes, but it may be an oversight and the rest of the code looks great.
When I have some time I'd be eager to show you a pure Java 8 solution as I strongly believe that Apache Spark is making a mess of your code currently. I do understand however that you need to operate with Apache Spark at some point and for that you may be forced to use Apache Spark's classes.
My method using Java 8
As addendum I'll show how I would identify your problem in question and show you how I would do it.
Input: An input file, consisting of words. Output: A list of the words sorted by frequency in which they occur.
Map<String, Long> occurenceMap = Files.readAllLines(Paths.get("myFile.txt"))
.stream()
.flatMap(line -> Arrays.stream(line.split(" ")))
.collect(Collectors.groupingBy(i -> i, Collectors.counting()));
List<String> sortedWords = occurenceMap.entrySet()
.stream()
.sorted(Comparator.comparing((Map.Entry<String, Long> entry) -> entry.getValue()).reversed())
.map(Map.Entry::getKey)
.collect(Collectors.toList());
This will do the following steps:
- Read all lines into a
List<String>
(care with large files!) - Turn it into a
Stream<String>
. - Turn that into a
Stream<String>
by flat mapping everyString
to aStream<String>
splitting on the blanks. - Collect all elements into a
Map<String, Long>
grouping by the identity (i -> i
) and using as downstreamCollectors.counting()
such that the map-value will be its count. - Get a
Set<Map.Entry<String, Long>>
from the map. - Turn it into a
Stream<Map.Entry<String, Long>>
. - Sort by the reverse order of the value of the entry.
- Map the results to a
Stream<String>
, you lose the frequency information here. - Collect the stream into a
List<String>
.
Beware that the line .sorted(Comparator.comparing((Map.Entry<String, Long> entry) -> entry.getValue()).reversed())
should really be .sorted(Comparator.comparing(Map.Entry::getValue).reversed()
, but type inference is having issues with that and for some reason it will not compile.
I hope the Java 8 way can give you interesting insights.
-
\$\begingroup\$ Thanks a lot ! I'm not quite used to lambdas yet, so it's always good to see how to use them. Same for comparator, it's indeed much easier (however, I still need the
serializable
thing, it's indeed a spark requirement). \$\endgroup\$merours– merours2014年07月10日 12:49:33 +00:00Commented Jul 10, 2014 at 12:49 -
\$\begingroup\$ @fxm Just added how I would do it with pure Java 8, though I'm still (as said) unsure of where you need to inferface with Apache Spark ultimately. \$\endgroup\$skiwi– skiwi2014年07月10日 12:55:27 +00:00Commented Jul 10, 2014 at 12:55
-
\$\begingroup\$ The idea of spark is to distribute these calculations over a cluster and not to be used on a single machine, hence its use in my case. Anyway, thanks for the full java8 sample ! \$\endgroup\$merours– merours2014年07月10日 13:05:25 +00:00Commented Jul 10, 2014 at 13:05
-
\$\begingroup\$ On a side note, I'm affraid
(Comparator<Tuple2<Integer, String>> & Serializable)
doesn't work for me, could it be a version problem ? I'm using java1.8.0_05
. \$\endgroup\$merours– merours2014年07月10日 13:24:15 +00:00Commented Jul 10, 2014 at 13:24 -
\$\begingroup\$ @fxm I don't know, it does work in pure Java. I have now found out though that a lot of core Apache Spark seems to be written in Scala, not exactly sure how it works together with Java, and neither sure where the Serializable constraint comes from. \$\endgroup\$skiwi– skiwi2014年07月10日 13:33:29 +00:00Commented Jul 10, 2014 at 13:33
Your comparator is broken:
private class TupleComparator implements Comparator<Tuple2<Integer, String>>, Serializable { @Override public int compare(Tuple2<Integer, String> tuple1, Tuple2<Integer, String> tuple2) { return tuple1._1 < tuple2._1 ? 0 : 1; } }
Comparators are required to follow a contract where if compare(x,y)
is 1, then compare(y,x)
is -1. If compare(x,y)
is 0, then compare(y,x)
is 0.
Without the above contract in place, the sort operations will result in unreliable ordering. The TimSort routine (used since Jav7) will throw an exception if the Comparator is broken (and it notices)....
In your case, if you have two values with the same lengths, your compare will return 1
TupleA -> (hello, 1)
TupleB -> (world, 1)
will have:
compare(TupleA, TupleB) -> 1
compare(TupleB, TupleA) -> 1
This will cause the sort to fail.
It appears that in your test data you do not have data where the word counts are the same.....
Your compare method should be:
public int compare(Tuple2<Integer, String> tuple1, Tuple2<Integer, String> tuple2) {
return Integer.compare(tuple1._1, tuple2._1);
}
-
\$\begingroup\$ That's strange, I have words with the same count and got no exception for that. I'm compiling this code with maven, might it be a configuration problem ? Anyway, I didn't know about this contract notion, so thanks for the information ! \$\endgroup\$merours– merours2014年07月10日 13:00:45 +00:00Commented Jul 10, 2014 at 13:00
Explore related questions
See similar questions with these tags.
new FlatMapFunction<String, String>()
, I don't think the<String, String>
here is necessary, it could as well be<>
(diamond inference, Java 7), and it may possibly be replaced by a lambda, though I'll investigate that option when providing an answer. \$\endgroup\$String, String
is necessary, I'm gettingcannot use '<>' with anonymous inner classes
without. I'll see if I can find a way to express it as a lambda. \$\endgroup\$