整个过程是对每个分区进行聚合,再对聚合结果进行聚合
U:ClassTag 泛型 表明它只接受某一特定类型得输入值 zeroValue: U 初始值 一个具体的值 不是函数 seqOp: (U, T) => U表明每一个分片都由一个seqOp函数执行 combOp:聚合操作
val list = List(1,2,3,4,5,6,7,8,9) val (mul, sum, count) = sc.parallelize(list,3).aggregate((1,0,0))( (acc, num) => (acc._1 * num, acc._2 + num, acc._3 +1), (x, y) => (x._1 * y._1, x._2 + y._2, x._3 + y._3) ) println((mul, sum, count))(362880,45,9) 可以理解为 acc就是(1,0,0)初始值 num就是 rdd里的每个元素 而x,y是对不同分区的数进行相加 part1(1,0,0) + part2(1,0,0) +… 所以这个函数可以对rdd内的元素进行多方面的求值
对每个分区内的元素按照key先聚合一次,取最大值 随后对每个分区进行聚合,取得最大值的对应key 的value相加
首先会将初始化zeroValue 也就是将zeroValue的值赋予对应的key的value操作 类似于例子1 中的 zeroValue =0, 对应的是(“a”, 1+0), (“a”, 2+0),
scala> val rdd = sc.parallelize(List(("a",1),("a",2),("a",3),("b",4),("b",5),("c",6)),3) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24 scala> rdd.foldByKey(0)(_+_).collect res5: Array[(String, Int)] = Array((c,6), (a,6), (b,9)) scala> rdd.foldByKey(1)(_*_).collect res6: Array[(String, Int)] = Array((c,6), (a,6), (b,20))createCombiner: V => C, 首先对value进行一个初始化 类似于 map操作一样 mergeValue: (C, V) => C, 对每个分区内的元素进行聚合操作 C 就是上一步的C , c._1就是第一步C的一部分, c._2就是C 的二部分 mergeCombiners: (C, C) => C, 对所有分区聚合
scala> val rdd = sc.parallelize(List(("a",1),("a",2),("a",3),("b",4),("b",5),("c",6)),1) scala> val result = rdd.combineByKey( | x => (1,x), | (c1:(Int, Int), c2) => (c1._1 + 1, c1._2 + c2), // c1 => (1,x) | (x:(Int, Int), y:(Int, Int) ) =>( x._1 + y._1, x._2 + y._2) | ) scala> result.collect res9: Array[(String, (Int, Int))] = Array((a,(3,6)), (b,(2,9)), (c,(1,6)))same as combineByKey