Skip to content

Instantly share code, notes, and snippets.

@richwhitjr
Last active August 29, 2015 14:27
Show Gist options
  • Select an option

  • Save richwhitjr/b605da465300cc25caa2 to your computer and use it in GitHub Desktop.

Select an option

Save richwhitjr/b605da465300cc25caa2 to your computer and use it in GitHub Desktop.
Attempt at making a folding function in scalding that makes use of map side aggregation.
/**
* Fold with map side aggregation using a monoid.
* */
class FoldExtension[T](pipe: TypedPipe[T]) {
def foldByKey[K, V, B](z : B)(fn : scala.Function2[B, V, B])
(implicit ev : scala.Predef.<:<[T, scala.Tuple2[K, V]],
ord : scala.Ordering[K],
aggPlus : Semigroup[B]) : UnsortedGrouped[K, B] = {
val sg = new Semigroup[Either[V, B]] {
def plus(left: Either[V, B], right: Either[V, B]) = {
(left, right) match {
case (Left(l), Left(r)) => Right(fn(fn(z, l), r))
case (Right(l), Left(r)) => Right(fn(l, r))
case (Left(l), Right(r)) => Right(fn(r, l))
case (Right(l), Right(r)) => Right(aggPlus.plus(l, r))
}
}
}
pipe
.group[K, V]
.mapValues(v => Left(v))
.sum(sg)
.mapValueStream(_.collect{case(Right(r)) => r})
}
}
object FoldExtension {
implicit def extension[T](pipe: TypedPipe[T]): FoldExtension[T] = new FoldExtension[T](pipe)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment