要初始化一个 Spark Streaming 程序,必须创建一个 StreamingContext 对象,该对象是 SparkStreaming 流处理的编程入口点。2.2版本SparkSession未整合StreamingContext,所以仍需单独创建。
Spark Streaming 初始化的主要工作是创建 Streaming Context 对象,通过创建函数的参数指明 Master Server,设定应用名称,指定 Spark Streaming 处理数据的时间间隔等。 一个JVM只能有一个StreamingContext启动。 StreamingContext停止后不能再启动。
Spark Streaming 程序开发流程:
一个StreamingContext定义之后,必须执行以下程序进行实时计算的执行
定义 StreamingContext通过 StreamingContext API 创建输入 DStream(Input DStream)来创建输入不同的数据源对DStream定义transformation和output等各种算子操作,来定义我们需要的各种实时计算逻辑。调用StreamingContext的start()方法,进行启动我们的实时处理数据。调用StreamingContext的awaitTermination()方法,来等待应用程序的终止。可以使用CTRL+C手动停止,或者就是让它持续不断的运行进行计算。也可以通过调用StreamingContext的stop()方法,来停止应用程序。其中 Seconds(1)表示批处理间隔。
注意:
创建 StreamingContext需要注意下面几个问题:
一个 JVM 只能有一个 SparkContext 启动。意味着应用程序中不应该出现两个 SparkContext。一个 JVM 同时只能有一个 StreamingContext 启动。但一个SparkContext可以创建多个 StreamingContext,只要上一个treamingContext 先用 stop(false)停止,再创建下一个即可。默认调用stop()方法时,会同时停止内部的SparkContext。如果只想仅关闭StreamingContext对象,设置stop()的可选参数为false。StreamingContext 停止后不能再启动。也就是说调用 stop()后不能再 start()。StreamingContext 启动之后,就不能再往其中添加任何计算逻辑了。也就是说执行 start()方法之后,不能再使 DStream 执行任何算子。一个SparkContext对象可以重复利用去创建多个StreamingContext对象,前提条件是前面的StreamingContext在后面StreamingContext创建之前关闭(不关闭SparkContext)。下面以代码的形式来简单介绍下创建StreamingContext来执行SparkStreaming的整个流程:
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ //创建StreamingContext val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf,Seconds(1)) // 指定数据源,创建连接至 hostname:port 的 DStreamCreate,如 localhost:9999 val lines = ssc.socketTextStream("localhost",9999) //把每一行通过空格符分割成多个字符 val words = lines.flatMap (_.split(" ")) //对输入的流进行操作 val pairs = words.map(x => (x, 1)) val wordCounts = pairs.reduceByKey(_+_) //打印该 DStream 生成的每个 RDD 中的字符 wordCounts.print() //通过start()启动消息采集和处理 ssc.start() //启动完成后就不能再做其它操作 //等待计算完成 ssc.awaitTermination()这样一个基本的Spark Streaming 运行流程就完成了。
