Spark Streaming快速入门系列(2) | wordcount案例

tech2024-11-11  19

目录

wordcount 案例wordcount 案例解析

wordcount 案例

1.需求 使用 netcat 工具向 9999 端口不断的发送数据,通过 Spark Streaming 读取端口数据并统计不同单词出现的次数 2.添加依赖 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.1</version> </dependency> 3.编写代码 import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object WordCount1 { def main(args: Array[String]): Unit = { //1.创建streamingContext val conf = new SparkConf().setMaster("local[2]").setAppName("WordCount1") val ssc = new StreamingContext(conf, Seconds(3)) //2.从数据源创建一个流:socket rdd队列 自定义接收器, kafka(重点) val sourceStream = ssc.socketTextStream("hadoop102",9999) //3.对流做各种转换 val result = sourceStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) //4.行动算子:print foreach foreachRDD result.print() //把结果打印在控制台 //5.启动流 ssc.start() //6.阻止主线程退出 ssc.awaitTermination() } } 4.测试 hadoop102上启动 netcat nc -lk 9999 可以打包到 linux 启动我们的 wordcount, 也可以在 idea 直接启动.查看输出结果. 每 3 秒统计一次数据的输入情况. 注意: 日志太多, 可以把日志级别修改为ERROR 5.几点需要注意的 一旦StreamingContext已经启动, 则不能再添加添加新的 streaming computations一旦一个StreamingContext已经停止(StreamingContext.stop()), 他也不能再重启在一个 JVM 内, 同一时间只能启动一个StreamingContextstop() 的方式停止StreamingContext, 也会把SparkContext停掉. 如果仅仅想停止StreamingContext, 则应该这样: stop(false)一个SparkContext可以重用去创建多个StreamingContext, 前提是以前的StreamingContext已经停掉,并且SparkContext没有被停掉

wordcount 案例解析

Discretized Stream(DStream) 是 Spark Streaming 提供的基本抽象, 表示持续性的数据流, 可以来自输入数据, 也可以是其他的 DStream 转换得到. 在内部, 一个 DSteam 用连续的一系列的 RDD 来表示. 在 DStream 中的每个 RDD 包含一个确定时间段的数据.

对 DStream 的任何操作都会转换成对他里面的 RDD 的操作. 比如前面的 wordcount 案例, flatMap是应用在 line DStream 的每个 RDD 上, 然后生成了 words SStream 中的 RDD. 如下图所示:

对这些 RDD 的转换是有 Spark 引擎来计算的. DStream 的操作隐藏的大多数的细节, 然后给开发者提供了方便使用的高级 API.

最新回复(0)