RDD算子详解 - Spark

tech2022-12-17  112

算子的分类:

1、transformation

2、action

把RDD转换成一个集合、值或者null。

transformation算子

map

返回一个新的分布式数据集,该数据集是通过将源的每个元素传递给函数func形成的。

val f = (x: Int) => x * x val rdd2: RDD[Int] = rdd1.map(f) rdd2.foreach(x => println(x))

filter

返回一个新的数据集,该数据集是通过选择源中func返回true的那些元素形成的。

rdd1.filter( x => if(x%2==0) true else flase).foreach(println)

flatMap

与map相似,但是每个输入项都可以映射到0个或多个输出项(因此func应该返回Seq而不是单个项)。

val list = List("hello a", "hello b", "hello c") val rdd: RDD[String] = sc.makeRdd(list) rdd.flatMap( (line :String) => line.split(" ") ).foreach( x => println(x))

mapPartitions

与map相似,但是分别在RDD的每个分区(块)上运行,因此当在类型T的RDD上运行时func必须为Iterator <T> => Iterator <U>类型。

val list = List(1,2,3,4,5,6,7,8,9,10) val rdd: RDD[Int] = sc.makeRdd(list, numSlices = 2) // 每次交给f的数据就是一个分区 val lastValue: RDD[Int] = rdd.mapPartitions( f=(x:Iterator[Int]) => {     val partition_sum: Int = x.sum     val resultList = List(partition_sum)     resultList.iterator } ) lastValue.foreach(println)

mapPartitionsWithIndex

与mapPartitions相似,但它还为func提供表示分区索引的整数值,因此当在类型T的RDD上运行时,func必须为(Int,Iterator <T>)=> Iterator <U>类型。

val list = List(1,2,3,4,5,6,7,8,9,10) val rdd: RDD[Int] = sc.makeRdd(list, numSlices = 2) // mapPartitionsWithIndex   每次遍历一个分区,还给一个参数就是分区编号 val resultRDD: RDD[(Int, Int)] = rdd.mapPartitions( f=(index, x) => {     val partition_sum: Int = x.sum     val resultList = List((index, partition_sum))     resultList.iterator } ) resultRDD.foreach( (x: (Int, Int)) => println( x._1, x._2))

sample

sample(withReplacement,fraction,seed)  抽样

参数说明: 有无放回     抽取比率    种子

val result = 1 to 100 val rdd: RDD[Int] = sc.makeRDD(result) //  抽取比率:1、每个元素被抽取到的概率    2、从所有元素中抽取的比率 val sample_result: RDD[Int] = rdd.sample(false, 0.1, 0) // 最后的结果元素个数不一定是10 sample_result.foreach(println)

union

返回一个新的数据集,其中包含源数据集中的元素和参数的并集

intersection

返回一个新的RDD,其中包含源数据集中的元素与参数的交集

distinct

去重

groupByKey

只分组,不聚合

在(K,V)对的数据集上调用时,返回(K,Iterable <V>)对的数据集。注意:如果要分组以便对每个键执行聚合(例如求和或平均值),则使用reduceByKey或aggregateByKey将产生更好的性能。注意:默认情况下,输出中的并行度取决于父RDD的分区数。您可以传递一个可选numPartitions参数来设置不同数量的任务。 val list = List( ("a",1), ("a",3), ("b",2), ("b",3), ("c",1), (c",3)) val rdd: RDD[(String,Int)] = sc.makeRDD(list) val resultRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey() resultRDD.foreach(x => {     println(x._1, x._2.mkString("-")) }) // (a,1-3) (b,2-3) (c,1-3)

 

reduceByKey

分组后,再聚合

在(K,V)对的数据集上调用时,返回(K,V)对的数据集,其中每个键的值使用给定的reduce函数func(其类型必须为(V,V)=>)进行汇总V.与in一样groupByKey,reduce任务的数量可以通过可选的第二个参数进行配置。

val list = List( ("a",1), ("a",3), ("b",2), ("b",3), ("c",1), (c",3)) val rdd: RDD[(String,Int)] = sc.makeRDD(list) // reduceByKey    ==     groupByKey  +  reduce/map val resultRDD: RDD[(String, Iterable[Int])] = rdd.reduceByKey((a, b) => a+b).foreach(x=> println(x._1, x._2)) // (a,4) (b,5) (c,4)

 

aggregateByKey

在(K,V)对的数据集上调用时,返回(K,U)对的数据集,其中每个键的值使用给定的Combine函数和中性的“零”值进行汇总。允许与输入值类型不同的聚合值类型,同时避免不必要的分配。像in中一样groupByKey,reduce任务的数量可以通过可选的第二个参数进行配置。

案例:求和

val list = List( ("a",1), ("a",3), ("b",2), ("b",3), ("c",1), ("c",3)) val rdd: RDD[(String,Int)] = sc.makeRDD(list) // rdd.aggregateByKey[U](zeroValue: U)(seqOp: (U,Int) => U, combOp: (U, U) => U) val result_key_value: RDD[(String, Int)] = rdd.aggregateByKey(0)(     (sum, number) => sum + number,     (sum1, sum2) => sum1 + sum2 ) result_key_value.foreach( x => println(x._1, x._2))

 

combineByKey

案例: 统计每一个key中,value的出现次数  (错误案例,最后的次数是key的次数,不是value的次数

val result = List(("a", "A"), ("a", "A"), ("a", "B"), ("b", "A"), ("b", "B"), ("c", "A"), ("c", "B"), ("c", "B")) val rdd: RDD[(String, String)] = sc.makeRDD(result) //  rdd.combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null) // 这里的V代表String,就是每个value元素。因为是ByKey,所以不用考虑Key。 val result_key_value_count: RDD[(String, (String, Int))] = rdd.combineByKey(     (value: String) => (value, 1),     (c: (String, Int), value: String) => (c._1, c._2 + 1),     (c: (String, Int), d: (String, Int)) => (c._1, c._2 + d._2) ) result_key_value_count.foreach( x => println(x._1, x._2._1, x._2._2))

案例: 求每个key的value值的平均值

val conf: SparkConf = new SparkConf().setAppName("Trans").setMaster("local[*]") val sc = new SparkContext(conf) val rdd = sc.parallelize(Array(("a", 88), ("b", 96), ("a", 91), ("b", 93), ("a", 95), ("b", 98)), 2) // (先计算每个key出现的次数以及可以对应值的总和,再相除得到结果) //  rdd.combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null) // 这里的V代表Int,就是每个value元素。因为是ByKey,所以不用考虑Key。 val result_key_value_count: RDD[(String, (Int, Int))] = rdd.combineByKey[(Int, Int)]( (num: Int) => (num, 1), (t: (Int, Int), num: Int) => (t._1 + num, t._2 + 1), (t1: (Int, Int), t2: (Int, Int)) => (t1._1 + t2._1, t1._2 + t2._2) ) result_key_value_count.collect().foreach(println) val resultRDD: RDD[(String, Int)] = result_key_value_count.map( case (key, t) => (key, t._1 * 1D / t._2) ) resultRDD.collect().foreach(println) sc.stop()

 

sortBy

可指定字段排序,默认按照字典顺序排序(数字和字符串都是从小到大)

val list = List( ("a",1), ("c",3),("a",3), ("b",2), ("b",3), ("c",1) ) val rdd: RDD[(String,Int)] = sc.makeRDD(list) // 按照value排序(全局) rdd.sortBy(x => x._2, false).foreach(println) // 先按照key排序,再按照value排序 rdd.sortBy(x => (x._1, x._2)).foreach(println)

sortByKey

在由K实现Ordered的(K,V)对的数据集上调用时,返回(K,V)对的数据集,按布尔值指定,按键以升序或降序排序ascending。

val list = List( ("a",1), ("c",3),("a",3), ("b",2), ("b",3), ("c",1) ) val rdd: RDD[(String,Int)] = sc.makeRDD(list) // 按照key排序(全局) rdd.sortByKey(false).foreach(println)

join

在(K,V)和(K,W)类型的数据集上调用时,返回(K,(V,W))对的数据集,其中每个键都有所有成对的元素。外连接通过支持leftOuterJoin,rightOuterJoinfullOuterJoin

val result1 = List(("a", 1),("b",2)) val result2 = List(("a", 11),("b",22)) val rdd1: RDD[(String, Int)] = sc.makeRDD(result1) val rdd2: RDD[(String, Int)] = sc.makeRDD(result2) val lastResultJoin: RDD[(String, (Int,Int))] = rdd1.join(rdd2) lastResultJoin.foreach(println) // (a, (1, 11)) (b, (2, 22)) //rdd1.leftOuterJoin(rdd2) //rdd1.rightOuterJoin(rdd2)

 

cogroup

在(K,V)和(K,W)类型的数据集上调用时,返回(K,(Iterable <V>,Iterable <W>))元组的数据集。此操作也称为groupWith。

join的底层实现就是cogroup。cogroup就是把各个rdd的数据放到一起,然后做join。在cogroup的基础上做笛卡尔积就变成join了。

val result1 = List(("a", 1),("a", 3),("b",2)) val result2 = List(("a", 11),("a", 33),("b",22)) val rdd1: RDD[(String, Int)] = sc.makeRDD(result1) val rdd2: RDD[(String, Int)] = sc.makeRDD(result2) val lastResultCogroup: RDD[(String, (Iterable[Int],Iterable[Int]))] = rdd1.cogroup(rdd2) // (a, (Iterable(1,3)), Iterable(11,33)) // (b, (Iterable(2)), Iterable(22)) lastResultCogroup.foreach( x => { println(x._1, x._2._1.mkString("-"), x._2._2.mkString("-")) }) // (a,1-3,11-33) // (b,2,22)

 

cartesian

笛卡尔积。在类型T和U的数据集上调用时,返回(T,U)对(所有元素对)的数据集。

val result1 = List(("a", 1),("a", 3),("b",2)) val result2 = List(("a", 11),("a", 33),("b",22)) val rdd1: RDD[(String, Int)] = sc.makeRDD(result1) val rdd2: RDD[(String, Int)] = sc.makeRDD(result2) val value: RDD[((String,Int), (String, Int))] = rdd1.cartesian(rdd2) value.foreach(println) // 结果有9条记录

 

pipe

通过调用外部的脚本来执行RDD

coalesce

减小分区数

将RDD中的分区数减少到numPartitions。filter大型数据集后,对于更有效地运行操作很有用。

repartition

重新分区,但是会负载均衡。

随机地重新随机排列RDD中的数据,以创建更多或更少的分区,并在整个分区之间保持平衡。这始终会拖曳网络上的所有数据。

重新进行shuffle,所有数据都参与shuffle,使数据负载均衡。

repartitionAndSortWithinPartitions

 

foldByKey

foldByKey(zeroValue)(seqOp)

该函数用于K/V做折叠,合并处理,与aggregate类似,第一个括号的参数应用于每个V值,第二个括号函数是聚合(例如:_+_)

partitionBy

partitionBy(partitioner)

按照指定的分区器,对RDD进行分区

cache

persist

subtract

差集

subtractByKey

针对K/V数据的差集

 

action算子

aggregate

案例:求平均值

val list = List(1,2,3,4,5,6,7,8,9,10) val rdd: RDD[Int] = sc.makeRdd(list) // rdd.aggregate(zeroValue:U)(seqOp: (U, T) => U, combOp: (U, U) => U)   第一个参数是初始化值 // seqOp,U的类型与我们在第一步中定义的初始值得类型相同。这里的T代表RDD中每个元素的值 val result_avg: (Int, Int) = rdd.aggregate((0, 0))(     (sum_count, number) => (sum_count._1+number, sum_count._2+1),     (a1, a2) => (a1._1 + a2._2, a1._2 + a2._2) ) println(result_avg._1 * 1D / result_avg._2)

reduce

 

collect

RDD -> 集合

不包含任何业务处理逻辑,只把每个worker节点当中的executor当中执行的结果数据收集到客户端。

触发action操作

count

返回RDD当中的元素个数

first

取第一个元素

take

取前几个元素

takeSample

与transformation算子中的sample的区别:

takeSample返回的是一个集合,并且返回的结果在driver端,sample返回的结果在每个worker中。

takeOrdered

先排序,再取

saveAsTextFile

保存在某个文件系统的某个目录

saveAsSequenceFile

RDD中的元素是K/V,把K/V的数据以二进制的字节形式保存到磁盘。

好处是:把文件放入到磁盘,然后刷到内存中的时候会很快。

saveAsObjectFile

把对象保存

countByKey

对K/V数据,统计key的个数

仅在类型(K,V)的RDD上可用。返回(K,Int)对的哈希图以及每个键的计数。

foreach

遍历

foreachPartition

遍历每个分区

fold

折叠

reduceByKeyLocally

返回一个map

top

取前几个

lookup

针对K/V型的RDD进行查找

 

 

最新回复(0)