算子的分类:
1、transformation
2、action
把RDD转换成一个集合、值或者null。
返回一个新的分布式数据集,该数据集是通过将源的每个元素传递给函数func形成的。
val f = (x: Int) => x * x val rdd2: RDD[Int] = rdd1.map(f) rdd2.foreach(x => println(x))返回一个新的数据集,该数据集是通过选择源中func返回true的那些元素形成的。
rdd1.filter( x => if(x%2==0) true else flase).foreach(println)与map相似,但是每个输入项都可以映射到0个或多个输出项(因此func应该返回Seq而不是单个项)。
val list = List("hello a", "hello b", "hello c") val rdd: RDD[String] = sc.makeRdd(list) rdd.flatMap( (line :String) => line.split(" ") ).foreach( x => println(x))与map相似,但是分别在RDD的每个分区(块)上运行,因此当在类型T的RDD上运行时func必须为Iterator <T> => Iterator <U>类型。
val list = List(1,2,3,4,5,6,7,8,9,10) val rdd: RDD[Int] = sc.makeRdd(list, numSlices = 2) // 每次交给f的数据就是一个分区 val lastValue: RDD[Int] = rdd.mapPartitions( f=(x:Iterator[Int]) => { val partition_sum: Int = x.sum val resultList = List(partition_sum) resultList.iterator } ) lastValue.foreach(println)与mapPartitions相似,但它还为func提供表示分区索引的整数值,因此当在类型T的RDD上运行时,func必须为(Int,Iterator <T>)=> Iterator <U>类型。
val list = List(1,2,3,4,5,6,7,8,9,10) val rdd: RDD[Int] = sc.makeRdd(list, numSlices = 2) // mapPartitionsWithIndex 每次遍历一个分区,还给一个参数就是分区编号 val resultRDD: RDD[(Int, Int)] = rdd.mapPartitions( f=(index, x) => { val partition_sum: Int = x.sum val resultList = List((index, partition_sum)) resultList.iterator } ) resultRDD.foreach( (x: (Int, Int)) => println( x._1, x._2))sample(withReplacement,fraction,seed) 抽样
参数说明: 有无放回 抽取比率 种子
val result = 1 to 100 val rdd: RDD[Int] = sc.makeRDD(result) // 抽取比率:1、每个元素被抽取到的概率 2、从所有元素中抽取的比率 val sample_result: RDD[Int] = rdd.sample(false, 0.1, 0) // 最后的结果元素个数不一定是10 sample_result.foreach(println)返回一个新的数据集,其中包含源数据集中的元素和参数的并集。
返回一个新的RDD,其中包含源数据集中的元素与参数的交集。
去重
只分组,不聚合
在(K,V)对的数据集上调用时,返回(K,Iterable <V>)对的数据集。注意:如果要分组以便对每个键执行聚合(例如求和或平均值),则使用reduceByKey或aggregateByKey将产生更好的性能。注意:默认情况下,输出中的并行度取决于父RDD的分区数。您可以传递一个可选numPartitions参数来设置不同数量的任务。 val list = List( ("a",1), ("a",3), ("b",2), ("b",3), ("c",1), (c",3)) val rdd: RDD[(String,Int)] = sc.makeRDD(list) val resultRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey() resultRDD.foreach(x => { println(x._1, x._2.mkString("-")) }) // (a,1-3) (b,2-3) (c,1-3)
分组后,再聚合
在(K,V)对的数据集上调用时,返回(K,V)对的数据集,其中每个键的值使用给定的reduce函数func(其类型必须为(V,V)=>)进行汇总V.与in一样groupByKey,reduce任务的数量可以通过可选的第二个参数进行配置。
val list = List( ("a",1), ("a",3), ("b",2), ("b",3), ("c",1), (c",3)) val rdd: RDD[(String,Int)] = sc.makeRDD(list) // reduceByKey == groupByKey + reduce/map val resultRDD: RDD[(String, Iterable[Int])] = rdd.reduceByKey((a, b) => a+b).foreach(x=> println(x._1, x._2)) // (a,4) (b,5) (c,4)
在(K,V)对的数据集上调用时,返回(K,U)对的数据集,其中每个键的值使用给定的Combine函数和中性的“零”值进行汇总。允许与输入值类型不同的聚合值类型,同时避免不必要的分配。像in中一样groupByKey,reduce任务的数量可以通过可选的第二个参数进行配置。
案例:求和
val list = List( ("a",1), ("a",3), ("b",2), ("b",3), ("c",1), ("c",3)) val rdd: RDD[(String,Int)] = sc.makeRDD(list) // rdd.aggregateByKey[U](zeroValue: U)(seqOp: (U,Int) => U, combOp: (U, U) => U) val result_key_value: RDD[(String, Int)] = rdd.aggregateByKey(0)( (sum, number) => sum + number, (sum1, sum2) => sum1 + sum2 ) result_key_value.foreach( x => println(x._1, x._2))
案例: 统计每一个key中,value的出现次数 (错误案例,最后的次数是key的次数,不是value的次数)
val result = List(("a", "A"), ("a", "A"), ("a", "B"), ("b", "A"), ("b", "B"), ("c", "A"), ("c", "B"), ("c", "B")) val rdd: RDD[(String, String)] = sc.makeRDD(result) // rdd.combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null) // 这里的V代表String,就是每个value元素。因为是ByKey,所以不用考虑Key。 val result_key_value_count: RDD[(String, (String, Int))] = rdd.combineByKey( (value: String) => (value, 1), (c: (String, Int), value: String) => (c._1, c._2 + 1), (c: (String, Int), d: (String, Int)) => (c._1, c._2 + d._2) ) result_key_value_count.foreach( x => println(x._1, x._2._1, x._2._2))案例: 求每个key的value值的平均值
val conf: SparkConf = new SparkConf().setAppName("Trans").setMaster("local[*]") val sc = new SparkContext(conf) val rdd = sc.parallelize(Array(("a", 88), ("b", 96), ("a", 91), ("b", 93), ("a", 95), ("b", 98)), 2) // (先计算每个key出现的次数以及可以对应值的总和,再相除得到结果) // rdd.combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null) // 这里的V代表Int,就是每个value元素。因为是ByKey,所以不用考虑Key。 val result_key_value_count: RDD[(String, (Int, Int))] = rdd.combineByKey[(Int, Int)]( (num: Int) => (num, 1), (t: (Int, Int), num: Int) => (t._1 + num, t._2 + 1), (t1: (Int, Int), t2: (Int, Int)) => (t1._1 + t2._1, t1._2 + t2._2) ) result_key_value_count.collect().foreach(println) val resultRDD: RDD[(String, Int)] = result_key_value_count.map( case (key, t) => (key, t._1 * 1D / t._2) ) resultRDD.collect().foreach(println) sc.stop()
可指定字段排序,默认按照字典顺序排序(数字和字符串都是从小到大)
val list = List( ("a",1), ("c",3),("a",3), ("b",2), ("b",3), ("c",1) ) val rdd: RDD[(String,Int)] = sc.makeRDD(list) // 按照value排序(全局) rdd.sortBy(x => x._2, false).foreach(println) // 先按照key排序,再按照value排序 rdd.sortBy(x => (x._1, x._2)).foreach(println)在由K实现Ordered的(K,V)对的数据集上调用时,返回(K,V)对的数据集,按布尔值指定,按键以升序或降序排序ascending。
val list = List( ("a",1), ("c",3),("a",3), ("b",2), ("b",3), ("c",1) ) val rdd: RDD[(String,Int)] = sc.makeRDD(list) // 按照key排序(全局) rdd.sortByKey(false).foreach(println)在(K,V)和(K,W)类型的数据集上调用时,返回(K,(V,W))对的数据集,其中每个键都有所有成对的元素。外连接通过支持leftOuterJoin,rightOuterJoin和fullOuterJoin。
val result1 = List(("a", 1),("b",2)) val result2 = List(("a", 11),("b",22)) val rdd1: RDD[(String, Int)] = sc.makeRDD(result1) val rdd2: RDD[(String, Int)] = sc.makeRDD(result2) val lastResultJoin: RDD[(String, (Int,Int))] = rdd1.join(rdd2) lastResultJoin.foreach(println) // (a, (1, 11)) (b, (2, 22)) //rdd1.leftOuterJoin(rdd2) //rdd1.rightOuterJoin(rdd2)
在(K,V)和(K,W)类型的数据集上调用时,返回(K,(Iterable <V>,Iterable <W>))元组的数据集。此操作也称为groupWith。
join的底层实现就是cogroup。cogroup就是把各个rdd的数据放到一起,然后做join。在cogroup的基础上做笛卡尔积就变成join了。
val result1 = List(("a", 1),("a", 3),("b",2)) val result2 = List(("a", 11),("a", 33),("b",22)) val rdd1: RDD[(String, Int)] = sc.makeRDD(result1) val rdd2: RDD[(String, Int)] = sc.makeRDD(result2) val lastResultCogroup: RDD[(String, (Iterable[Int],Iterable[Int]))] = rdd1.cogroup(rdd2) // (a, (Iterable(1,3)), Iterable(11,33)) // (b, (Iterable(2)), Iterable(22)) lastResultCogroup.foreach( x => { println(x._1, x._2._1.mkString("-"), x._2._2.mkString("-")) }) // (a,1-3,11-33) // (b,2,22)
笛卡尔积。在类型T和U的数据集上调用时,返回(T,U)对(所有元素对)的数据集。
val result1 = List(("a", 1),("a", 3),("b",2)) val result2 = List(("a", 11),("a", 33),("b",22)) val rdd1: RDD[(String, Int)] = sc.makeRDD(result1) val rdd2: RDD[(String, Int)] = sc.makeRDD(result2) val value: RDD[((String,Int), (String, Int))] = rdd1.cartesian(rdd2) value.foreach(println) // 结果有9条记录
通过调用外部的脚本来执行RDD
减小分区数
将RDD中的分区数减少到numPartitions。filter大型数据集后,对于更有效地运行操作很有用。
重新分区,但是会负载均衡。
随机地重新随机排列RDD中的数据,以创建更多或更少的分区,并在整个分区之间保持平衡。这始终会拖曳网络上的所有数据。
重新进行shuffle,所有数据都参与shuffle,使数据负载均衡。
foldByKey(zeroValue)(seqOp)
该函数用于K/V做折叠,合并处理,与aggregate类似,第一个括号的参数应用于每个V值,第二个括号函数是聚合(例如:_+_)
partitionBy(partitioner)
按照指定的分区器,对RDD进行分区
差集
针对K/V数据的差集
案例:求平均值
val list = List(1,2,3,4,5,6,7,8,9,10) val rdd: RDD[Int] = sc.makeRdd(list) // rdd.aggregate(zeroValue:U)(seqOp: (U, T) => U, combOp: (U, U) => U) 第一个参数是初始化值 // seqOp,U的类型与我们在第一步中定义的初始值得类型相同。这里的T代表RDD中每个元素的值 val result_avg: (Int, Int) = rdd.aggregate((0, 0))( (sum_count, number) => (sum_count._1+number, sum_count._2+1), (a1, a2) => (a1._1 + a2._2, a1._2 + a2._2) ) println(result_avg._1 * 1D / result_avg._2)
RDD -> 集合
不包含任何业务处理逻辑,只把每个worker节点当中的executor当中执行的结果数据收集到客户端。
触发action操作
返回RDD当中的元素个数
取第一个元素
取前几个元素
与transformation算子中的sample的区别:
takeSample返回的是一个集合,并且返回的结果在driver端,sample返回的结果在每个worker中。
先排序,再取
保存在某个文件系统的某个目录
RDD中的元素是K/V,把K/V的数据以二进制的字节形式保存到磁盘。
好处是:把文件放入到磁盘,然后刷到内存中的时候会很快。
把对象保存
对K/V数据,统计key的个数
仅在类型(K,V)的RDD上可用。返回(K,Int)对的哈希图以及每个键的计数。
遍历
遍历每个分区
折叠
返回一个map
取前几个
针对K/V型的RDD进行查找