Spark Core 快速入门系列(八)RDD 的分布式共享变量

tech2022-08-16  159

文章目录

RDD 的分布式共享变量什么是闭包分发闭包累加器广播变量

RDD 的分布式共享变量

目标

1,理解闭包以及 Spark 分布式运行代码的根本原理 2,理解累加变量的使用场景 3,理解广播的使用场景

什么是闭包

闭包是一个必须要理解, 但是又不太好理解的知识点, 先看一个小例子

@Test def test(): Unit = { val areaFunction = closure() val area = areaFunction(2) println(area) } def closure(): Int => Double = { val factor = 3.14 val areaFunction = (r: Int) => math.pow(r, 2) * factor areaFunction }

上述例子中, closure方法返回的一个函数的引用, 其实就是一个闭包, 闭包本质上就是一个封闭的作用域, 要理解闭包, 是一定要和作用域联系起来的.

能否在 test 方法中访问 closure 定义的变量?

@Test def test(): Unit = { println(factor) } def closure(): Int => Double = { val factor = 3.14 }

有没有什么间接的方式?

@Test def test(): Unit = { val areaFunction = closure() areaFunction() } def closure(): () => Unit = { val factor = 3.14 val areaFunction = () => println(factor) areaFunction }

什么是闭包?

val areaFunction = closure() areaFunction()

通过 closure 返回的函数 areaFunction 就是一个闭包, 其函数内部的作用域并不是 test 函数的作用域, 这种连带作用域一起打包的方式, 我们称之为闭包, 在 Scala 中

Scala 中的闭包本质上就是一个对象, 是 FunctionX 的实例

分发闭包

sc.textFile("dataset/access_log_sample.txt") .flatMap(item => item.split("")) .collect()

上述这段代码中, flatMap 中传入的是另外一个函数, 传入的这个函数就是一个闭包, 这个闭包会被序列化运行在不同的 Executor 中

class MyClass { val field = "Hello" def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) } }

这段代码中的闭包就有了一个依赖, 依赖于外部的一个类, 因为传递给算子的函数最终要在 Executor 中运行, 所以需要 序列化 MyClass 发给每一个 Executor, 从而在 Executor 访问 MyClass 对象的属性

总结

1,闭包就是一个封闭的作用域, 也是一个对象 2,Spark 算子所接受的函数, 本质上是一个闭包, 因为其需要封闭作用域, 并且序列化自身和依赖, 分发到不同的节点中运行

累加器

一个小问题

var count = 0 val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]") val sc = new SparkContext(config) sc.parallelize(Seq(1, 2, 3, 4, 5)) .foreach(count += _) println(count)

上面这段代码是一个非常错误的使用, 请不要仿照, 这段代码只是为了证明一些事情

先明确两件事, var count = 0 是在 Driver 中定义的, foreach(count += _) 这个算子以及传递进去的闭包运行在 Executor 中

这段代码整体想做的事情是累加一个变量, 但是这段代码的写法却做不到这件事, 原因也很简单, 因为具体的算子是闭包, 被分发给不同的节点运行, 所以这个闭包中累加的并不是 Driver 中的这个变量

全局累加器

Accumulators(累加器) 是一个只支持 added(添加) 的分布式变量, 可以在分布式环境下保持一致性, 并且能够做到高效的并发.

原生 Spark 支持数值型的累加器, 可以用于实现计数或者求和, 开发者也可以使用自定义累加器以实现更高级的需求

val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]") val sc = new SparkContext(config) val counter = sc.longAccumulator("counter") sc.parallelize(Seq(1, 2, 3, 4, 5)) .foreach(counter.add(_)) // 运行结果: 15 println(counter.value)

注意点:

Accumulator 是支持并发并行的, 在任何地方都可以通过 add 来修改数值, 无论是 Driver 还是 Executor只能在 Driver 中才能调用 value 来获取数值

在 WebUI 中关于 Job 部分也可以看到 Accumulator 的信息, 以及其运行的情况

累计器件还有两个小特性, 第一, 累加器能保证在 Spark 任务出现问题被重启的时候不会出现重复计算. 第二, 累加器只有在 Action 执行的时候才会被触发.

val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]") val sc = new SparkContext(config) val counter = sc.longAccumulator("counter") sc.parallelize(Seq(1, 2, 3, 4, 5)) .map(counter.add(_)) // 这个地方不是 Action, 而是一个 Transformation // 运行结果是 0 println(counter.value)

自定义累加器

开发者可以通过自定义累加器来实现更多类型的累加器, 累加器的作用远远不只是累加, 比如可以实现一个累加器, 用于向里面添加一些运行信息

package com.spark.core import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.util.AccumulatorV2 import org.junit.Test import scala.collection.mutable class Accumulator { /** * RDD -> (1,2,3,4,5) —> Set(1,2,3,4,5) */ @Test def acc(): Unit ={ Logger.getLogger("org").setLevel(Level.ERROR) val conf = new SparkConf().setMaster("local[6]").setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) val accumulator = new NumAccumulator //TODO 注册给spark,源码有提示 sc.register(accumulator,"num") sc.parallelize(Seq("1","2","3")).foreach(println) println("--------------------") println(accumulator.value) sc.stop() } } class NumAccumulator extends AccumulatorV2[String,Set[String]]{ //创建可变的集合 private val nums : mutable.Set[String] = mutable.Set() /** * 告诉 Spark 框架 , 这个累加器对象是否是空的 * @return */ override def isZero: Boolean = { nums.isEmpty } //TODO 提供给 Spark 框架一个拷贝的累加器 override def copy(): AccumulatorV2[String, Set[String]] = { val newAccumulator = new NumAccumulator() nums.synchronized{ newAccumulator.nums ++= this.nums } newAccumulator } //TODO 帮助 Spark 框架 , 清理累加器的内容 override def reset(): Unit = { nums.clear() } /** * 外部传入要累加的内容,在这个方法中进行累加 * @param v */ override def add(v: String): Unit = { nums += v } //TODO 将另一个同类型的累加器合并到该累加器中并更新其状态,即*应该就地合并。 /** * 累加器在进行累加的时候,可能每个分布式都有一个实例 * 在最后Driver进行一次合并,把所有的实例的内容合并起来,会调用这个merge方法进行合并 * @param other */ override def merge(other: AccumulatorV2[String, Set[String]]): Unit = { nums ++= other.value } /** * 提供给外部累加结果 * 为什么一定给不可变的,因为外部有可能再进行修改,如果是是可变的集合,其外部的修改会影响内部的值 * @return */ override def value: Set[String] = { nums.toSet } }

注意点:

可以通过继承 AccumulatorV2 来创建新的累加器有几个方法需要重写 reset 方法用于把累加器重置为 0add 方法用于把其它值添加到累加器中merge 方法用于指定如何合并其他的累加器 value 需要返回一个不可变的集合, 因为不能因为外部的修改而影响自身的值

广播变量

目标

1,理解为什么需要广播变量, 以及其应用场景 2,能够通过代码使用广播变量

广播变量的作用

广播变量允许开发者将一个 Read-Only 的变量缓存到集群中每个节点中, 而不是传递给每一个 Task 一个副本.

集群中每个节点, 指的是一个机器每一个 Task, 一个 Task 是一个 Stage 中的最小处理单元, 一个 Executor 中可以有多个 Stage, 每个 Stage 有多个 Task

所以在需要跨多个 Stage 的多个 Task 中使用相同数据的情况下, 广播特别的有用

广播变量的API

方法名描述id唯一标识value广播变量的值unpersist在 Executor 中异步的删除缓存副本destroy销毁所有此广播变量所关联的数据和元数据toString字符串表示

使用广播变量的一般套路

可以通过如下方式创建广播变量

val b = sc.broadcast(1)

如果 Log 级别为 DEBUG 的时候, 会打印如下信息

DEBUG BlockManager: Put block broadcast_0 locally took 430 ms DEBUG BlockManager: Putting block broadcast_0 without replication took 431 ms DEBUG BlockManager: Told master about block broadcast_0_piece0 DEBUG BlockManager: Put block broadcast_0_piece0 locally took 4 ms DEBUG BlockManager: Putting block broadcast_0_piece0 without replication took 4 ms

创建后可以使用 value 获取数据

b.value

获取数据的时候会打印如下信息

DEBUG BlockManager: Getting local block broadcast_0 DEBUG BlockManager: Level for block broadcast_0 is StorageLevel(disk, memory, deserialized, 1 replicas)

广播变量使用完了以后, 可以使用 unpersist 删除数据

b.unpersist

删除数据以后, 可以使用 destroy 销毁变量, 释放内存空间

b.destroy

销毁以后, 会打印如下信息

DEBUG BlockManager: Removing broadcast 0 DEBUG BlockManager: Removing block broadcast_0_piece0 DEBUG BlockManager: Told master about block broadcast_0_piece0 DEBUG BlockManager: Removing block broadcast_0

使用 value 方法的注意点

方法签名 value: T

在 value 方法内部会确保使用获取数据的时候, 变量必须是可用状态, 所以必须在变量被 destroy 之前使用 value 方法, 如果使用 value 时变量已经失效, 则会爆出以下错误

org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed (destroy at <console>:27) at org.apache.spark.broadcast.Broadcast.assertValid(Broadcast.scala:144) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:69) ... 48 elided

使用 destroy 方法的注意点

方法签名 destroy(): Unit

destroy 方法会移除广播变量, 彻底销毁掉, 但是如果你试图多次 destroy 广播变量, 则会爆出以下错误

org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed (destroy at <console>:27) at org.apache.spark.broadcast.Broadcast.assertValid(Broadcast.scala:144) at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:107) at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:98) ... 48 elided

广播变量的使用场景

假设我们在某个算子中需要使用一个保存了项目和项目的网址关系的 Map[String, String] 静态集合, 如下

val pws = Map("Apache Spark" -> "http://spark.apache.org/", "Hadoop" -> "http://hadoop.apache.org/") val websites = sc.parallelize(Seq("Apache Spark", "Hadoop")).map(pws).collect

上面这段代码是没有问题的, 可以正常运行的, 但是非常的低效, 因为虽然可能 pws 已经存在于某个 Executor 中了, 但是在需要的时候还是会继续发往这个 Executor, 如果想要优化这段代码, 则需要尽可能的降低网络开销

可以使用广播变量进行优化, 因为广播变量会缓存在集群中的机器中, 比 Executor 在逻辑上更 “大”

val pwsB = sc.broadcast(pws) val websites = sc.parallelize(Seq("Apache Spark", "Hadoop")).map(pwsB.value).collect

上面两段代码所做的事情其实是一样的, 但是当需要运行多个 Executor (以及多个 Task) 的时候, 后者的效率更高

扩展

正常情况下使用 Task 拉取数据的时候, 会将数据拷贝到 Executor 中多次, 但是使用广播变量的时候只会复制一份数据到 Executor 中, 所以在两种情况下特别适合使用广播变量

一个 Executor 中有多个 Task 的时候一个变量比较大的时候

而且在 Spark 中还有一个约定俗称的做法, 当一个 RDD 很大并且还需要和另外一个 RDD 执行 join 的时候, 可以将较小的 RDD 广播出去, 然后使用大的 RDD 在算子 map 中直接 join, 从而实现在 Map 端 join

val acMap = sc.broadcast(myRDD.map { case (a,b,c,b) => (a, c) }.collectAsMap) val otherMap = sc.broadcast(myOtherRDD.collectAsMap) myBigRDD.map { case (a, b, c, d) => (acMap.value.get(a).get, otherMap.value.get(c).get) }.collect

一般情况下在这种场景下, 会广播 Map 类型的数据, 而不是数组, 因为这样容易使用 Key 找到对应的 Value 简化使用

全套代码

package com.spark.core import org.apache.spark.{SparkConf, SparkContext} import org.junit.Test class Broadcast { /** * 占用资源较大,有十个对应的value */ @Test def bc1(): Unit ={ // 数据,假设数据很大,大概100兆 val v = Map("Spark" -> "http://spark.apache.org" ,"Hadoop" -> "http://hadoop.apache.org" ) val conf = new SparkConf().setMaster("local[6]").setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) //将其中的spark 和 hadoop 转为对应的网址 val r = sc.parallelize(Seq("Spark", "Hadoop")) val results = r.map(x => v(x)).collect() println(results) } @Test def bc2(): Unit ={ // 数据,假设数据很大,大概100兆 val v = Map("Spark" -> "http://spark.apache.org" ,"Hadoop" -> "http://hadoop.apache.org" ) val conf = new SparkConf().setMaster("local[6]").setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) //TODO 创建广播 val bc = sc.broadcast(v) //将其中的spark 和 hadoop 转为对应的网址 val r = sc.parallelize(Seq("Spark", "Hadoop")) /** * 在算子使用广播变量代替直接引用集合,指挥复制和executor一样的数量 * 在使用广播之前 复制了map 了 task数量份 * 在使用广播之后,复制次数和executor 数量一致 * */ val results = r.map(x => bc.value(x)) results.collect().foreach(println) sc.stop() } }

总结

1,广播变量用于将变量缓存在集群中的机器中, 避免机器内的 Executors 多次使用网络拉取数据 2,广播变量的使用步骤: (1) 创建 (2) 在 Task 中获取值 (3) 销毁

最新回复(0)