Flink是什么
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. 快速灵巧为什么选择Flink
流数据更真实地反映了我们的生活方式传统的数据架构是基于有限数据集的(因为批处理数据更简单)我们的目标是低延迟 高吞吐 结果的准确性和良好的容错性lambda架构:先快速的得到一个近似的结果 在用batch layer慢一点得到正确的结果
流(stream)和微批(micro-batching) 底层的架构不一样
问题:如果要开始 处理 一开始 没有想过要处理的的数据,没有太大的办法.
因为flink是替代原架构的spark streaming的部分
晚上再测试一下
单节点的standalone 还有 多节点的standalone的 都需要测试的
Session-cluster模式:
开启命令: ./yarn-session.sh -s 2 -jm 1024 -tm 1024 -nm test -d
启动命令: ./flink run -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT-jar-with- dependencies.jar --host localhost –port 7777
Per-Job-cluster:比启动yarn-session 直接
略
什么是solt 怎么划分的
JobManager TaskManager ResourceManager Dispatcher
资源就是TaskManager上的slots
抽象的任务提交(下图):
两个要求:one to one 和 要相同的并行度
https://github.com/wushengran/FlinkTutorial.git
总结 :
主要就是Flink中的窗口函数的使用 可以使用简单的timewindow 来进行滚动或者滑动窗口 或者 countwindow 还是比较简单的 (也可以使用window() 比较一般的方法 具体的方法可以在代码上查看)
然后还有一些可以使用的其他api
重要的有两点 第一: 当开窗的时候 后面一定需要使用Window function 来进行聚合 (增量聚合函数 或 全窗口函数)
window()接收的输入参数是一个 window Assigner 之后用窗口函数来返回成为DataStream
window Function
增量聚合函数 来一个处理一次,保留当前状态(sum|max|reduceFunction|aggregateFunction) 全窗口函数 先收集所有的数据,等到计算的时候才会遍历所有数据*(相当于把所有的数据存储为状态)*apply(可以拿到Windows的信息)|process其他API
trigger() —— 触发器,定义 window 什么时候关闭,触发计算并输出结果evitor() —— 移除器,定义移除某些数据的逻辑allowedLateness() —— 允许处理迟到的数据(触发计算但是不关闭窗口 快速的输出一个结果 但是之后时间允许继续更新数据)sideOutputLateData() —— 将迟到的数据放入侧输出流getSideOutput() —— 获取侧输出流总结 :
操作方式简单来讲就是先引入eventtime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 再对DataStream进行操作(引入watermark) dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[](Time.second(1)){ override def extractTimestamp(element: SensorReading): Long = { element.timestamp * 1000 } })flink里面的思想 : 你需要自己做权衡
底层有公式 " timestamp - (timestamp -offset + size) % size"
总结 :
写代码的时候 主要的结构是 environment --> source --> transform --> sink我们可以操作的是 source 和 transform 和 sink source 上 可以获取不同的资源 集合 文件 套接字 kafka 自定义 transform 上可以进行不同的操作 一般的算子(keyBy map)还有进行各样的转换 --> keyedStream window watermark sink 上可以写入不同的地方 kafka Redis ElasticSearch jdbc 自定义processElement(v: IN, ctx: Context, out: Collector[OUT]),
流中的每一个元素都会调用这个方法,调用结果将会放在Collector数据类型中输出Context可以访问元素的时间戳,元素的key,以及TimerService时间服务Context可以访问元素的时间戳,元素的key,以及TimerService时间服务onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]) 是一个回调函数
参数timestamp为定时器所设定的触发的时间戳。Collector为输出结果的集合。OnTimerContext和processElement的Context参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。Context和OnTimerContext所持有的TimerService对象拥有以下方法:
currentProcessingTime(): Long 返回当前处理时间currentWatermark(): Long 返回当前watermark的时间registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。当定时器timer触发时,会执行回调函数onTimer()。注意定时器timer只能在keyed streams上面使用。对于两条输入流,DataStream API提供了CoProcessFunction这样的low-level操作。CoProcessFunction提供了操作每一个输入流的方法: processElement1()和processElement2()。
类似于ProcessFunction,这两种方法都通过Context对象来调用。这个Context对象可以访问事件数据,定时器时间戳,TimerService,以及side outputs。CoProcessFunction也提供了onTimer()回调函数。
快照的时间 : 所有任务都恰好处理完一个相同的输入数据的时候
source要重置发生故障之前的授予数据
基于Chandy-Lamport算法的分布式快照 : 将检查点的保存和数据处理分离开 , 不暂停整个应用
Savepoints : 自定义的镜像保存功能
状态的存储 维护以及访问,有一个可插入的组件决定,这个组件就是State Backends
那就需要保存,保存就是checkpoint ,还要有容错机制
MemoryStateBackend
内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上;而将checkpoint存储在JobManager的内存中。快速 低延迟 但不稳定一般用作测试和开发FsStateBackend
将checkpoint存到远程的持久化文件系统(FileSystem)上。而对于本地状态,跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上。同时拥有内存级别的本地访问的速度,和更好的容错保证 但是内存可能会抗不住RocksDBStateBackend
将所有状态序列化后,存入本地的RocksDB中存储。 RocksDB是一种kv结构的存储方法 可以存储足够的状态 但是访问速度太慢支持增量化状态写入这三个要隐式转换的 要有印象 不然会忘记了
Table API 和 Flink SQL 是什么
Flink对批处理和流处理提供了统一 的上层的APITable API 是一套内嵌在 Java 和 Scala 语言中的查询API,它允许以非常直观的方式组合来自一些关系运算符的查询Flink 的 SQL 支持基于实现了 SQL 标准的 Apache Calcite引入的依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.10.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>1.10.1</version> </dependency>catalog : 高层级目录 catalog.database.table
版本不同有 oldPlanner 和 BlinkPlanner的区别 : blink批流统一
连接外部系统的时候 source 和 sink 其实差不多
创建表
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SQMuX52L-1599569101105)(https://raw.githubusercontent.com/tanzhongjingyue/OSS/master/img/image-20200904140838828.png)] [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kzka9xCI-1599569101107)(https://raw.githubusercontent.com/tanzhongjingyue/OSS/master/img/image-20200904140919877.png)]
输出表: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZpG2duuT-1599569101108)(https://raw.githubusercontent.com/tanzhongjingyue/OSS/master/img/image-20200904141006912.png)]
更新模式
table转换成DataStream 有两种模式 : Append Mode和 Retract Mode
流处理和关系代数的区别
流被转换成动态表(Dynamic Tables) , 进行连续查询 , 生成新的动态表 , 再转换回流 Append-only 流Retract 流 两条流add消息和retract消息 Upsert 流 一条流(按照key来修改)upsert消息和delete消息Table API提供了一组具有特定语义的预定义Window类,这些类会被转换为底层DataStream或DataSet的窗口操作
滚动窗口
// Tumbling Event-time Window(事件时间字段rowtime) .window(Tumble over 10.minutes on 'rowtime as 'w) // Tumbling Processing-time Window(处理时间字段proctime) .window(Tumble over 10.minutes on 'proctime as 'w) // Tumbling Row-count Window (类似于计数窗口,按处理时间排序,10行一组) .window(Tumble over 10.rows on 'proctime as 'w)滑动窗口
// Sliding Event-time Window .window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w) // Sliding Processing-time window .window(Slide over 10.minutes every 5.minutes on 'proctime as 'w) // Sliding Row-count window .window(Slide over 10.rows every 5.rows on 'proctime as 'w)会话窗口
// Session Event-time Window .window(Session withGap 10.minutes on 'rowtime as 'w) // Session Processing-time Window .window(Session withGap 10.minutes on 'proctime as 'w)写法 :
table的api的写法SQL的写法无界的over Windows
// 无界的事件时间over window (时间字段 "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w) //无界的处理时间over window (时间字段"proctime") .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w) // 无界的事件时间Row-count over window (时间字段 "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w) //无界的处理时间Row-count over window (时间字段 "rowtime") .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)有界的over Windows api操作 :
// 有界的事件时间over window (时间字段 "rowtime",之前1分钟) .window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w) // 有界的处理时间over window (时间字段 "rowtime",之前1分钟) .window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w) // 有界的事件时间Row-count over window (时间字段 "rowtime",之前10行) .window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w) // 有界的处理时间Row-count over window (时间字段 "rowtime",之前10行) .window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)我们已经了解了在Table API里window的调用方式,同样,我们也可以在SQL中直接加入窗口的定义和使用。
SELECT COUNT(amount) OVER ( PARTITION BY user ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM Orderstodo 要补充
注册用户自定义函数UDF
SQL里面 先注册 才可以使用标量函数(Scalar Functions)
只能输出单值
扩展基类 ScalaFunction , 写public的eval方法
class HashCode( factor: Int ) extends ScalarFunction { def eval( s: String ): Int = { s.hashCode * factor } }表函数
返回任意数量的行(输出一张表)
扩展TableFunction , 写public的eval方法
class Split(separator: String) extends TableFunction[(String, Int)]{ def eval(str: String): Unit = { str.split(separator).foreach( word => collect((word, word.length)) ) }}聚合函数
把一个表的数据聚合成一个标量值继承AggregateFunction抽象类实现三个方法 : createAccumulator()accumulate()getValue()表聚合函数
可以把一个表中数据,聚合为具有多行和多列的结果表TableAggregateFunction 抽象类来实现 createAccumulator()accumulate()emitValue()略
略
略
