3
\$\begingroup\$

I am new to Spark and Scala and I have solved the following problem. I have a table in database with following structure:

 id name eid color
 1 John S1 green
 2 Shaun S2 red
 3 Shaun S2 green
 4 Shaun S2 green
 5 John S1 yellow

And now I want to know how many times a person is red, green or yellow. So the result should be like this

 name red yellow green
 John 0 1 1
 Shaun 1 0 2 

I have written this code and it solves the problem, But I am not sure is this the best way to do it. It think my code is large for this small problem and it can be done with smaller code and with best practice. I need some guidance

 val rdd = df.rdd.map {
 case Row(id: Int, name: String, eid: String, color: String) => ((eid),List((id, name, eid, color)))
}.reduceByKey(_ ++ _)
val result = rdd.map({
 case (key, list) => {
 val red = list.count(p => p._4.equals("red"))
 val yellow = list.count(p => p._4.equals("yellow"))
 val green = list.count(p => p._4.equals("green"))
 val newList = list.map(x => (x._2, red, yellow, green))
 (key, newList.take(1))
 }
}).flatMap {
 case ((eid), list) =>
 list.map {
 case (name, red, yellow, green) =>
 (eid, name, red, yellow, green)
 }
}
import SparkConfig.sc.sqlContext.implicits._
val rDf = result.toDF("eid", "name", "red", "yellow", "green");
rDf.show()
200_success
145k22 gold badges190 silver badges478 bronze badges
asked Sep 7, 2016 at 9:39
\$\endgroup\$

1 Answer 1

1
\$\begingroup\$

Let me start with out-of-the-box solution I would use if I were and after that we'll discuss your code. I assume df is equivalent to the following structure:

val df = Seq(
 (1, "John", "S1", "green"), (2, "Shaun", "S2", "red"),
 (3, "Shaun", "S2", "green"), (4, "Shaun", "S2", "green"), 
 (5, "John", "S1", "yellow")
).toDF("id", "name", "eid", "color")

All you really need to achieve desired output is pivot:

df.groupBy("name", "eid").pivot("color").count().na.fill(0).show
// +-----+---+-----+---+------+ 
// | name|eid|green|red|yellow|
// +-----+---+-----+---+------+
// |Shaun| S2| 2| 1| 0|
// | John| S1| 1| 0| 1|
// +-----+---+-----+---+------+

About your code:

  • Don't fetch more data from a DataFrame than you really need. Once you convert DataFrame you don't longer benefit from early projections, selections and other Catalyst optimizations.

    It means you have to fetch all the data from external source (like a database) or off-heap storage even if downstream processing requires only a small fraction of it. In other words project early:

    df.select($"name", $"eid", $"color").rdd.map { ... }
    
  • Avoid dealing with RDD[Row]. It is like a Seq[Any] - nothing you really want in your code. Instead you can use Dataset encoders:

    df.select($"name", $"eid", $"color").as[(String, String, String)].rdd.map {
     ... 
    }
    
  • Never use list concatenation for reduction. Since List.++ is O(N) operation and you apply it in a loop overall complexity is roughly O(N2). If you really want to group data use groupByKey.

  • But don't group if operation can be expressed using reduceByKey with a truly reducing (requiring roughly constant memory) function.

    There many ways you can approach this for example with aggregateByKey (note that I intentionally use mutable buffer):

    import scala.collection.mutable.Map
    val pairs = df
     .select($"name", $"eid", $"color")
     .as[(String, String, String)]
     .rdd.map { case (name, eid, color) => ((name, eid), color) }
    def seqOp(acc: Map[String, Long], x: String) = {
     acc(x) = 1L + acc.getOrElse(x, 0L)
     acc
    }
    def mergeOp(acc1: Map[String, Long], acc2: Map[String, Long]) = {
     acc2.foreach { case (k, v) => acc1(k) = v + acc1.getOrElse(k, 0L) }
     acc1
    }
    pairs
     .aggregateByKey(Map.empty[String, Long])(seqOp, mergeOp)
     .map {
     case ((name, eid), vs ) => 
     (name, eid, vs.get("red"), vs.get("green"), vs.get("yellow"))
     }.toDF("name", "eid", "red", "green", "yellow")
    
answered Sep 9, 2016 at 13:50
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.