DStream(Discretized Stream)作为Spark Streaming的基础抽象,它代表持续性的数据流。 数据流来源
这些数据流既可以通过外部输入源赖获取,也可以通过现有的Dstream的transformation操作来获得。内部实现:
在内部实现上,DStream由一组时间序列上连续的RDD来表示。每个RDD都包含了自己特定时间间隔内的数据流。每个DStream中的每个RDD都包含来自一定间隔的数据,如下图
示例 在DStream上使用的任何操作都会转换为针对底层RDD的操作。
例如:在如下代码中,flatMap操作应用于行DStream的每个RDD上 从而产生words DStream的RDD。 object WordCount{ def main(args:Array[String]):Unit={ val sparkConf =newSparkConf().setMaster("local[2]").setAppName("WordCount") val sc=newStreamingContext(sparkConf,Seconds(1)) val lines = sc.socketTextStream("localhost",9999) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x =>(x,1)).reduceByKey(_+_) wordCounts.print() sc.start() sc.awaitTermination() } }DStream为数据流,内部按时间分隔成一个个的batch,一个batch为一个处理单位
每次只处理当前批次的数据,前一批次的数据如果已经处理完了,则数据已经没了,否则参考下一条上一批次的数据没处理完,下一批次的数据来了,会阻塞等待前一批次的数据处理完每个batch内部包含多个block,具体数量由batch产生的时间间隔和block产生的时间间隔决定
batch interval/block interval = 一个DStream中block的数量每个RDD中的每一个block对应一个RDD的一个分区,对应一个executor中的task线程
如下图所示
Dstream分为3种
Input DStreamTransformations DStreamOutput DStream实现把一个DStream转换生成一个新的DStream,延迟加载不会触发任务的执行
TransformationMeaningmap(func)对DStream中的各个元素进行func函数操作,然后返回一个新的DStreamflatMap(func)与map方法类似,只不过各个输入项可以被输出为零个或多个输出项filter(func)过滤出所有函数func返回值为true的DStream元素并返回一个新的DStreamrepartition(numPartitions)增加或减少DStream中的分区数,从而改变DStream的并行度union(otherStream)将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream.count()通过对DStream中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStreamreduce(func)对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream.countByValue()对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数reduceByKey(func, [numTasks])利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStreamjoin(otherStream, [numTasks])输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W))类型的DStreamcogroup(otherStream, [numTasks])输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStreamtransform(func)通过RDD-to-RDD函数作用于DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDDupdateStateByKey(func)根据key的之前状态值和key的新值,对key进行更新,返回一个新状态的DStreamreduceByKeyAndWindow窗口函数操作,实现按照window窗口大小来进行计算输出算子操作,触发任务的真正运行
Output OperationMeaningprint()打印到控制台saveAsTextFiles(prefix, [suffix])保存流的内容为文本文件,文件名为"prefix-TIME_IN_MS[.suffix]".saveAsObjectFiles(prefix, [suffix])保存流的内容为SequenceFile,文件名为 “prefix-TIME_IN_MS[.suffix]”.saveAsHadoopFiles(prefix, [suffix])保存流的内容为hadoop文件,文件名为 “prefix-TIME_IN_MS[.suffix]”.foreachRDD(func)对Dstream里面的每个RDD执行func