I am new to Spark and Scala and I have solved the following problem. I have a table in database with following structure:
id name eid color
1 John S1 green
2 Shaun S2 red
3 Shaun S2 green
4 Shaun S2 green
5 John S1 yellow
And now I want to know how many times a person is red, green or yellow. So the result should be like this
name red yellow green
John 0 1 1
Shaun 1 0 2
I have written this code and it solves the problem, But I am not sure is this the best way to do it. It think my code is large for this small problem and it can be done with smaller code and with best practice. I need some guidance
val rdd = df.rdd.map {
case Row(id: Int, name: String, eid: String, color: String) => ((eid),List((id, name, eid, color)))
}.reduceByKey(_ ++ _)
val result = rdd.map({
case (key, list) => {
val red = list.count(p => p._4.equals("red"))
val yellow = list.count(p => p._4.equals("yellow"))
val green = list.count(p => p._4.equals("green"))
val newList = list.map(x => (x._2, red, yellow, green))
(key, newList.take(1))
}
}).flatMap {
case ((eid), list) =>
list.map {
case (name, red, yellow, green) =>
(eid, name, red, yellow, green)
}
}
import SparkConfig.sc.sqlContext.implicits._
val rDf = result.toDF("eid", "name", "red", "yellow", "green");
rDf.show()
1 Answer 1
Let me start with out-of-the-box solution I would use if I were and after that we'll discuss your code. I assume df
is equivalent to the following structure:
val df = Seq(
(1, "John", "S1", "green"), (2, "Shaun", "S2", "red"),
(3, "Shaun", "S2", "green"), (4, "Shaun", "S2", "green"),
(5, "John", "S1", "yellow")
).toDF("id", "name", "eid", "color")
All you really need to achieve desired output is pivot
:
df.groupBy("name", "eid").pivot("color").count().na.fill(0).show
// +-----+---+-----+---+------+
// | name|eid|green|red|yellow|
// +-----+---+-----+---+------+
// |Shaun| S2| 2| 1| 0|
// | John| S1| 1| 0| 1|
// +-----+---+-----+---+------+
About your code:
Don't fetch more data from a
DataFrame
than you really need. Once you convertDataFrame
you don't longer benefit from early projections, selections and other Catalyst optimizations.It means you have to fetch all the data from external source (like a database) or off-heap storage even if downstream processing requires only a small fraction of it. In other words project early:
df.select($"name", $"eid", $"color").rdd.map { ... }
Avoid dealing with
RDD[Row]
. It is like aSeq[Any]
- nothing you really want in your code. Instead you can useDataset
encoders:df.select($"name", $"eid", $"color").as[(String, String, String)].rdd.map { ... }
Never use list concatenation for reduction. Since
List.++
is O(N) operation and you apply it in a loop overall complexity is roughly O(N2). If you really want to group data usegroupByKey
.But don't group if operation can be expressed using
reduceByKey
with a truly reducing (requiring roughly constant memory) function.There many ways you can approach this for example with
aggregateByKey
(note that I intentionally use mutable buffer):import scala.collection.mutable.Map val pairs = df .select($"name", $"eid", $"color") .as[(String, String, String)] .rdd.map { case (name, eid, color) => ((name, eid), color) } def seqOp(acc: Map[String, Long], x: String) = { acc(x) = 1L + acc.getOrElse(x, 0L) acc } def mergeOp(acc1: Map[String, Long], acc2: Map[String, Long]) = { acc2.foreach { case (k, v) => acc1(k) = v + acc1.getOrElse(k, 0L) } acc1 } pairs .aggregateByKey(Map.empty[String, Long])(seqOp, mergeOp) .map { case ((name, eid), vs ) => (name, eid, vs.get("red"), vs.get("green"), vs.get("yellow")) }.toDF("name", "eid", "red", "green", "yellow")
Explore related questions
See similar questions with these tags.