By samthebest


2014-08-06 11:07:14 8 Comments

Why do Scala and frameworks like Spark and Scalding have both reduce and foldLeft? So then what's the difference between reduce and fold?

4 comments

@samthebest 2014-08-06 11:07:42

reduce vs foldLeft

A big big difference, not mentioned in any other stackoverflow answer relating to this topic clearly, is that reduce should be given a commutative monoid, i.e. an operation that is both commutative and associative. This means the operation can be parallelized.

This distinction is very important for Big Data / MPP / distributed computing, and the entire reason why reduce even exists. The collection can be chopped up and the reduce can operate on each chunk, then the reduce can operate on the results of each chunk - in fact the level of chunking need not stop one level deep. We could chop up each chunk too. This is why summing integers in a list is O(log N) if given an infinite number of CPUs.

If you just look at the signatures there is no reason for reduce to exist because you can achieve everything you can with reduce with a foldLeft. The functionality of foldLeft is a greater than the functionality of reduce.

But you cannot parallelize a foldLeft, so its runtime is always O(N) (even if you feed in a commutative monoid). This is because it's assumed the operation is not a commutative monoid and so the cumulated value will be computed by a series of sequential aggregations.

foldLeft does not assume commutativity nor associativity. It's associativity that gives the ability to chop up the collection, and it's commutativity that makes cumulating easy because order is not important (so it doesn't matter which order to aggregate each of the results from each of the chunks). Strictly speaking commutativity is not necessary for parallelization, for example distributed sorting algorithms, it just makes the logic easier because you don't need to give your chunks an ordering.

If you have a look at the Spark documentation for reduce it specifically says "... commutative and associative binary operator"

http://spark.apache.org/docs/1.0.0/api/scala/index.html#org.apache.spark.rdd.RDD

Here is proof that reduce is NOT just a special case of foldLeft

scala> val intParList: ParSeq[Int] = (1 to 100000).map(_ => scala.util.Random.nextInt()).par

scala> timeMany(1000, intParList.reduce(_ + _))
Took 462.395867 milli seconds

scala> timeMany(1000, intParList.foldLeft(0)(_ + _))
Took 2589.363031 milli seconds

reduce vs fold

Now this is where it gets a little closer to the FP / mathematical roots, and a little trickier to explain. Reduce is defined formally as part of the MapReduce paradigm, which deals with orderless collections (multisets), Fold is formally defined in terms of recursion (see catamorphism) and thus assumes a structure / sequence to the collections.

There is no fold method in Scalding because under the (strict) Map Reduce programming model we cannot define fold because chunks do not have an ordering and fold only requires associativity, not commutativity.

Put simply, reduce works without an order of cumulation, fold requires an order of cumulation and it is that order of cumulation that necessitates a zero value NOT the existence of the zero value that distinguishes them. Strictly speaking reduce should work on an empty collection, because its zero value can by deduced by taking an arbitrary value x and then solving x op y = x, but that doesn't work with a non-commutative operation as there can exist a left and right zero value that are distinct (i.e. x op y != y op x). Of course Scala doesn't bother to work out what this zero value is as that would require doing some mathematics (which are probably uncomputable), so just throws an exception.

It seems (as is often the case in etymology) that this original mathematical meaning has been lost, since the only obvious difference in programming is the signature. The result is that reduce has become a synonym for fold, rather than preserving it's original meaning from MapReduce. Now these terms are often used interchangeably and behave the same in most implementations (ignoring empty collections). Weirdness is exacerbated by peculiarities, like in Spark, that we shall now address.

So Spark does have a fold, but the order in which sub results (one for each partition) are combined (at the time of writing) is the same order in which tasks are completed - and thus non-deterministic. Thanks to @CafeFeed for pointing out that fold uses runJob, which after reading through the code I realised that it's non-deterministic. Further confusion is created by Spark having a treeReduce but no treeFold.

Conclusion

There is a difference between reduce and fold even when applied to non-empty sequences. The former is defined as part of the MapReduce programming paradigm on collections with arbitrary order (http://theory.stanford.edu/~sergei/papers/soda10-mrc.pdf) and one ought to assume operators are commutative in addition to being associative to give deterministic results. The latter is defined in terms of catomorphisms and requires that the collections have a notion of sequence (or are defined recursively, like linked lists), thus do not require commutative operators.

In practice due to the unmathematical nature of programming, reduce and fold tend to behave in the same way, either correctly (like in Scala) or incorrectly (like in Spark).

Extra: My Opinion On the Spark API

My opinion is that confusion would be avoided if use of the term fold was completely dropped in Spark. At least spark does have a note in their documentation:

This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala.

@kiritsuku 2014-08-06 15:08:45

That is why foldLeft contains the Left in its name and why there is also a method called fold.

@samthebest 2014-08-06 15:44:36

@sschaef true, but fold doesn't exist in, say Scalding, because unlike reduce it doesn't require commutativity. I've updated my answer to explain this. Basically the point I'm trying to make is that the difference between fold* and reduce* is very much related to the roots of FP in Category Theory.

@Xiaohe Dong 2014-08-07 01:32:35

Do you have an example for distinct (i.e. x op y != y op x) by using reduce and fold to see the difference. I dont think it is correct statement for reduce. For example, I use non-commutative operation "divide" to do the reduce. (List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).reduce(_ / _) For the statement "(i.e. x op y != y op x)." it should give me random result, but it gives me the same result every time, so I think reduce still keep the order of cumulation

@samthebest 2014-08-07 07:47:03

@Cloudtech That is a coincidence of it's single threaded implementation, not within it's specification. On my 4-core machine, if I try adding .par, so (List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _) I get different results each time.

@Alex Dean 2014-11-24 20:56:18

Great answer. I think that "reduce should be given a commutative monoid, i.e. an operation that is both commutative and associative" should read: "reduce should be given a commutative semigroup, i.e. a data structure which supports a combining operation that is both commutative and associative". I don't believe that reduce needs an identity element.

@samthebest 2014-11-25 09:20:11

@AlexDean in the context of computer science, no it doesn't really need an identity as empty collections tend to just throw exceptions. But it's mathematically more elegant (and would be more elegant if collections did this) if the identity element is returned when the collection is empty. In mathematics "throw an exception" doesn't exist.

@Alex Dean 2014-11-26 11:06:16

@samthebest - I see what you're saying, thanks. This is why you have to work with sumOption for Semigroups in Algebird...

@Make42 2016-05-17 13:31:40

@samthebest: Are your sure about the commutativity? github.com/apache/spark/blob/… says "For functions that are not commutative, the result may differ from that of a fold applied to a non-distributed collection."

@zero323 2016-06-07 10:51:56

To be fair contract of fold on RDD is exactly the same as contract of Scala ParSeq.fold. Although ParSeqs typically (always?) keep the order it is not really a part of the contract. One can easily implement fold (or not so easily foldByKey) which guarantee order of merging and the main reason to do it as it is done now is performance / flexibility of scheduling.

@samthebest 2016-06-07 20:18:26

@Make42 I've updated my answer, indeed I assumed that the order of combining was done on the order of partitions (which would be nice, and the logical way to implement it), but it turns out it's based on a more of a "first come first serve" basis, and thus non-deterministic.

@Make42 2016-06-08 11:30:26

@samthebest: So, in Spark, all of the combiner methods (reduce, reduceByKey, fold, foldByKey, aggregate, aggregateByKey, combineByKey) need to get functions passed that are both, associative and commutative afterall, right?

@samthebest 2016-06-08 13:03:57

@Make42 That's correct, one could write their own reallyFold pimp though, as: rdd.mapPartitions(it => Iterator(it.fold(zero)(f)))).collect().fold(zero)(f), this wouldn't need f to commute.

@Make42 2016-06-09 11:13:02

@samthebest: Not sure this works... The collect also gets the partitions first come, first serve. How do you know they are in the right order for the second fold? I tried this with a real example and I got my words back in the correct order with both your code and the original Spark fold. I suspect that within a partition fold in Spark will not need commutativity, but that putting the partitions together is the issue. Your solution does not improve on that, since the collect will have the same issue. What do you think?

@samthebest 2016-06-09 13:01:59

@Make42 I read the code for collect, it does preserve order of partitions. Step through it and you'll see this (index, res) => results(index) = res as the result handler - so the result handler uses the partition index to place the result in the Array

@altayseyhan 2017-02-15 22:05:34

@samthebest so at this point I have a question afaik string concat is non-commutative, when I run this code multiple times List("abc","def","ghi","jk","lmnop","qrs","tuv","wx","yz").p‌​ar.reduce(_+_) I guess it should give me random result but I'm getting same result everytime.

@altayseyhan 2017-02-16 06:45:47

@Cloudtech @samthebest I guess in this case of (List(1000000.0) ::: List.tabulate(100)(_ + 0.001)).par.reduce(_ / _) producing different results in each time related with division operation being not associative I mean not related with being non-commutative [docs.scala-lang.org/overviews/parallel-collections/… please look at line starting with "Note: Often, it is thought that.."

@user6022341 2016-06-05 17:21:28

fold in Apache Spark is not the same as fold on not-distributed collections. In fact it requires commutative function to produce deterministic results:

This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala. This fold operation may be applied to partitions individually, and then fold those results into the final result, rather than apply the fold to each element sequentially in some defined ordering. For functions that are not commutative, the result may differ from that of a fold applied to a non-distributed collection.

This has been shown by Mishael Rosenthal and suggested by Make42 in his comment.

It's been suggested that observed behavior is related to HashPartitioner when in fact parallelize doesn't shuffle and doesn't use HashPartitioner.

import org.apache.spark.sql.SparkSession

/* Note: standalone (non-local) mode */
val master = "spark://...:7077"  

val spark = SparkSession.builder.master(master).getOrCreate()

/* Note: deterministic order */
val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String])
require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y })

/* Note: all posible permutations */
require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)

Explained:

Structure of fold for RDD

def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
  var jobResult: T
  val cleanOp: (T, T) => T
  val foldPartition = Iterator[T] => T
  val mergeResult: (Int, T) => Unit
  sc.runJob(this, foldPartition, mergeResult)
  jobResult
}

is the same as structure of reduce for RDD:

def reduce(f: (T, T) => T): T = withScope {
  val cleanF: (T, T) => T
  val reducePartition: Iterator[T] => Option[T]
  var jobResult: Option[T]
  val mergeResult =  (Int, Option[T]) => Unit
  sc.runJob(this, reducePartition, mergeResult)
  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

where runJob is performed with disregard of partition order and results in need of commutative function.

foldPartition and reducePartition are equivalent in terms of order of processing and effectively (by inheritance and delegation) implemented by reduceLeft and foldLeft on TraversableOnce.

Conclusion: fold on RDD cannot depend on order of chunks and needs commutativity and associativity.

@samthebest 2016-06-06 10:03:11

I have to admit that the etymology is confusing and programming literature is lacking in formal definitions. I think it's safe to say that fold on RDDs is indeed really just the same as reduce, but this doesn't respect the root mathematical differences (I've updated my answer to be even more clear). Though I disagree that we really need commutativity provided one is confident whatever their partioner is doing, it's preserving order.

@user6022341 2016-06-06 18:10:45

Undefined order of fold is not related to partitioning. It is a direct consequence of a runJob implementation.

@samthebest 2016-06-07 09:54:28

AH! Sorry I couldn't work out what your point was, but having read through the runJob code I see that indeed it does the combining according to when a task is finished, NOT the order of the partitions. It's this key detail that makes everything fall into place. I've edited my answer again and thus corrected the mistake you point out. Please could you either remove your bounty since we are now in agreement?

@user6022341 2016-06-07 18:53:08

I cannot edit or remove - there is no such option. I can award but I think you get quite a few points from a attention alone, am I wrong? If you confirm that you want me to reward I do it in the next 24 hours. Thanks for corrections and sorry for a method but it looked like you ignore all the warnings, it is a big thing, and answer has been quoted all over the place.

@samthebest 2016-06-07 21:13:49

How about you award it to @Mishael Rosenthal since he was the first to clearly state the concern. I have no interest in the points, I just like using SO for the SEO and organisation.

@Mishael Rosenthal 2015-03-16 16:24:58

If I am not mistaken, even though the Spark API does not require it, fold also requires for the f to be commutative. Because the order in which the partitions will be aggregated is not assured. For example in the following code only the first print out is sorted:

import org.apache.spark.{SparkConf, SparkContext}

object FoldExample extends App{

  val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("Simple Application")
  implicit val sc = new SparkContext(conf)

  val range = ('a' to 'z').map(_.toString)
  val rdd = sc.parallelize(range)

  println(range.reduce(_ + _))
  println(rdd.reduce(_ + _))
  println(rdd.fold("")(_ + _))
}  

Print out:

abcdefghijklmnopqrstuvwxyz

abcghituvjklmwxyzqrsdefnop

defghinopjklmqrstuvabcwxyz

@samthebest 2016-06-07 21:16:44

After some back and forth, we believe you are correct. The order of combining is first come first serve. If you run sc.makeRDD(0 to 9, 2).mapPartitions(it => { java.lang.Thread.sleep(new java.util.Random().nextInt(1000)); it } ).map(_.toString).fold("")(_ + _) with 2+ cores several times, I think you will see it produces random (partition-wise) order. I've updated my answer accordingly.

@morazow 2014-08-07 23:53:17

One other difference for Scalding is the use of combiners in Hadoop.

Imagine your operation is commutative monoid, with reduce it will be applied on the map side also instead of shuffling/sorting all data to reducers. With foldLeft this is not the case.

pipe.groupBy('product) {
   _.reduce('price -> 'total){ (sum: Double, price: Double) => sum + price }
   // reduce is .mapReduceMap in disguise
}

pipe.groupBy('product) {
   _.foldLeft('price -> 'total)(0.0){ (sum: Double, price: Double) => sum + price }
}

It is always good practice to define your operations as monoid in Scalding.

Related Questions

Sponsored Content

16 Answered Questions

[SOLVED] What is the difference between Scala's case class and class?

14 Answered Questions

[SOLVED] Difference between object and class in Scala

  • 2009-11-18 11:21:58
  • Steve
  • 173449 View
  • 588 Score
  • 14 Answer
  • Tags:   scala class object

12 Answered Questions

[SOLVED] What is the difference between a var and val definition in Scala?

  • 2009-11-24 16:57:39
  • Derek Mahar
  • 102844 View
  • 288 Score
  • 12 Answer
  • Tags:   scala

7 Answered Questions

[SOLVED] Fold and foldLeft method difference

  • 2012-07-03 21:00:35
  • Karel Bílek
  • 27804 View
  • 54 Score
  • 7 Answer
  • Tags:   scala

7 Answered Questions

[SOLVED] difference between foldLeft and reduceLeft in Scala

4 Answered Questions

[SOLVED] Difference between fold and reduce?

5 Answered Questions

[SOLVED] Difference between fold and foldLeft or foldRight?

  • 2011-06-06 14:58:55
  • Andriy Drozdyuk
  • 23932 View
  • 63 Score
  • 5 Answer
  • Tags:   scala fold

1 Answered Questions

[SOLVED] Difference between fold and reduce revisted

6 Answered Questions

[SOLVED] Scala vs. Groovy vs. Clojure

1 Answered Questions

[SOLVED] Scala : fold vs foldLeft

  • 2013-04-19 18:40:39
  • thlim
  • 19167 View
  • 42 Score
  • 1 Answer
  • Tags:   scala reduce fold

Sponsored Content