Deployment Zone

Spark, serialization and case classes

In Spark if you try to do anything cute [read: inheritance from an abstract class] with case classes, i.e.,

 abstract class PairReducable[A <: Keyable, B, P <: PairReducable[A, B, P]]
 (val item: A, val amount: B)
 extends Product2[A, B] 
 with Amount[B] {
 override val _1 = item
 override val _2 = amount
 def reduce(other: PairReducable[A, B, P]): P =
 this.item.toKey == other.item.toKey match {
 case false => 
 throw new UnsupportedOperationException("the key values for both items must match in order to combine amounts")
 case true => reduceWith(other.amount)
 }
 protected def reduceWith(othersAmount: B): P
 }
case class Count[A <: Keyable](override val item: A, override val amount: Long) 
 extends PairReducable[A, Long, Count[A]](item, amount) {
 override protected def reduceWith(othersAmount: Long): Count[A] =
 new Count(item, amount + othersAmount)
}

when you encounter serialization exceptions on your workers all you may need to do to resolve it is add java.io.Serializable:

abstract class PairReducable[A <: Keyable, B, P <: PairReducable[A, B, P]] 
 (val item: A, val amount: B)
 extends Product2[A, B] 
 with Amount[B] 
 with java.io.Serializable {
 /* ... */
 }
case class Count[A <: Keyable](override val item: A, override val amount: Long) 
 extends PairReducable[A, Long, Count[A]](item, amount)
 with java.io.Serializable {
 /* ... */
}

You most likely do not need to give them dummy empty parameter constructors, but if you did, it might look something like this terrible thing:

case class Count[A <: Keyable](override val item: A, override val amount: Long) 
 extends PairReducable[A, Long, Count[A]](item, amount)
 with java.io.Serializable {
 def this() = this(null.asInstanceOf[A], 0L)
 /* ... */
}
Please enable JavaScript to view the comments powered by Disqus. comments powered by Disqus

AltStyle によって変換されたページ (->オリジナル) /