目录
wordcount 案例wordcount 案例解析
wordcount 案例
1.需求 使用 netcat 工具向 9999 端口不断的发送数据,通过 Spark Streaming 读取端口数据并统计不同单词出现的次数 2.添加依赖
<dependency
>
<groupId
>org
.apache
.spark
</groupId
>
<artifactId
>spark
-streaming_2
.11</artifactId
>
<version
>2.1.1</version
>
</dependency
>
3.编写代码
import org
.apache
.spark
.SparkConf
import org
.apache
.spark
.streaming
.{Seconds
, StreamingContext
}
object WordCount1
{
def
main(args
: Array
[String
]): Unit
= {
val conf
= new SparkConf().setMaster("local[2]").setAppName("WordCount1")
val ssc
= new StreamingContext(conf
, Seconds(3))
val sourceStream
= ssc
.socketTextStream("hadoop102",9999)
val result
= sourceStream
.flatMap(_
.split(" ")).map((_
, 1)).reduceByKey(_
+ _
)
result
.print()
ssc
.start()
ssc
.awaitTermination()
}
}
4.测试
hadoop102上启动 netcat
nc
-lk
9999
可以打包到 linux 启动我们的 wordcount, 也可以在 idea 直接启动.查看输出结果. 每 3 秒统计一次数据的输入情况. 注意: 日志太多, 可以把日志级别修改为ERROR
5.几点需要注意的
一旦StreamingContext已经启动, 则不能再添加添加新的 streaming computations一旦一个StreamingContext已经停止(StreamingContext.stop()), 他也不能再重启在一个 JVM 内, 同一时间只能启动一个StreamingContextstop() 的方式停止StreamingContext, 也会把SparkContext停掉. 如果仅仅想停止StreamingContext, 则应该这样: stop(false)一个SparkContext可以重用去创建多个StreamingContext, 前提是以前的StreamingContext已经停掉,并且SparkContext没有被停掉
wordcount 案例解析
Discretized Stream(DStream) 是 Spark Streaming 提供的基本抽象, 表示持续性的数据流, 可以来自输入数据, 也可以是其他的 DStream 转换得到. 在内部, 一个 DSteam 用连续的一系列的 RDD 来表示. 在 DStream 中的每个 RDD 包含一个确定时间段的数据.
对 DStream 的任何操作都会转换成对他里面的 RDD 的操作. 比如前面的 wordcount 案例, flatMap是应用在 line DStream 的每个 RDD 上, 然后生成了 words SStream 中的 RDD. 如下图所示:
对这些 RDD 的转换是有 Spark 引擎来计算的. DStream 的操作隐藏的大多数的细节, 然后给开发者提供了方便使用的高级 API.