spark核心函数

tech2025-09-29  16

文章目录

aggregateaggregateByKeyfoldByKeycombineByKeycombineByKeyWithClassTag

aggregate

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {

整个过程是对每个分区进行聚合,再对聚合结果进行聚合

U:ClassTag 泛型 表明它只接受某一特定类型得输入值 zeroValue: U 初始值 一个具体的值 不是函数 seqOp: (U, T) => U表明每一个分片都由一个seqOp函数执行 combOp:聚合操作

val list = List(1,2,3,4,5,6,7,8,9) val (mul, sum, count) = sc.parallelize(list,3).aggregate((1,0,0))( (acc, num) => (acc._1 * num, acc._2 + num, acc._3 +1), (x, y) => (x._1 * y._1, x._2 + y._2, x._3 + y._3) ) println((mul, sum, count))

(362880,45,9) 可以理解为 acc就是(1,0,0)初始值 num就是 rdd里的每个元素 而x,y是对不同分区的数进行相加 part1(1,0,0) + part2(1,0,0) +… 所以这个函数可以对rdd内的元素进行多方面的求值

aggregateByKey

scala> val rdd = sc.parallelize(List(("a",1),("a",2),("a",3),("b",4),("b",5),("c",6)),3) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> rdd.glom.collect res0: Array[Array[(String, Int)]] = Array(Array((a,1), (a,2)), Array((a,3), (b,4)), Array((b,5), (c,6))) val res = rdd.aggregateByKey(0)(math.max(_,_),_+_) res: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at aggregateByKey at <console>:25 res.collect res2: Array[(String, Int)] = Array((c,6), (a,5), (b,9))

对每个分区内的元素按照key先聚合一次,取最大值 随后对每个分区进行聚合,取得最大值的对应key 的value相加

foldByKey

foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

首先会将初始化zeroValue 也就是将zeroValue的值赋予对应的key的value操作 类似于例子1 中的 zeroValue =0, 对应的是(“a”, 1+0), (“a”, 2+0),

scala> val rdd = sc.parallelize(List(("a",1),("a",2),("a",3),("b",4),("b",5),("c",6)),3) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24 scala> rdd.foldByKey(0)(_+_).collect res5: Array[(String, Int)] = Array((c,6), (a,6), (b,9)) scala> rdd.foldByKey(1)(_*_).collect res6: Array[(String, Int)] = Array((c,6), (a,6), (b,20))

combineByKey

def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)

createCombiner: V => C, 首先对value进行一个初始化 类似于 map操作一样 mergeValue: (C, V) => C, 对每个分区内的元素进行聚合操作 C 就是上一步的C , c._1就是第一步C的一部分, c._2就是C 的二部分 mergeCombiners: (C, C) => C, 对所有分区聚合

scala> val rdd = sc.parallelize(List(("a",1),("a",2),("a",3),("b",4),("b",5),("c",6)),1) scala> val result = rdd.combineByKey( | x => (1,x), | (c1:(Int, Int), c2) => (c1._1 + 1, c1._2 + c2), // c1 => (1,x) | (x:(Int, Int), y:(Int, Int) ) =>( x._1 + y._1, x._2 + y._2) | ) scala> result.collect res9: Array[(String, (Int, Int))] = Array((a,(3,6)), (b,(2,9)), (c,(1,6)))

combineByKeyWithClassTag

def combineByKeyWithClassTag[C]( createCombiner: V => C,  //map端,改变 v 的返回值类型 mergeValue: (C, V) => C,  //map端,预聚合 mergeCombiners: (C, C) => C,  //reduce端,聚合 partitioner: Partitioner,  //分区对象 mapSideCombine: Boolean = true,  //是否开启map端聚合,默认开启 serializer: Serializer = null)

same as combineByKey

最新回复(0)