Flink的学习

tech2026-03-01  0

1.0 Flink的简介

Flink是什么

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. 快速灵巧

为什么选择Flink

流数据更真实地反映了我们的生活方式传统的数据架构是基于有限数据集的(因为批处理数据更简单)我们的目标是低延迟 高吞吐 结果的准确性和良好的容错性

lambda架构:先快速的得到一个近似的结果 在用batch layer慢一点得到正确的结果

流(stream)和微批(micro-batching) 底层的架构不一样


问题:如果要开始 处理 一开始 没有想过要处理的的数据,没有太大的办法.

因为flink是替代原架构的spark streaming的部分


2.0 快速上手

批处理流处理(并行度 运行时的概念)每一步是一个并行度还是怎么样 这是一个问题 可以往下看 在回头来看看

3.0 Flink部署

3.1 Standalone

晚上再测试一下

单节点的standalone 还有 多节点的standalone的 都需要测试的

3.2 Yarn模式

Session-cluster : 适合规模小 执行时间短的作业Per-Job-cluster : 适合规模大 长时间运行的作业

Session-cluster模式:

​ 开启命令: ./yarn-session.sh -s 2 -jm 1024 -tm 1024 -nm test -d

​ 启动命令: ./flink run -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT-jar-with- dependencies.jar --host localhost –port 7777

Per-Job-cluster:比启动yarn-session 直接

3.3 Kubernetes部署

4.0 Flink运行架构


什么是solt 怎么划分的


4.1 Flink运行时的组件

JobManager TaskManager ResourceManager Dispatcher

资源就是TaskManager上的slots

4.2 任务提交流程

抽象的任务提交(下图):

4.3 任务调度原理

怎么实现并行计算 同时处理 有两个map(数据并行) 并行的任务,需要占用多少slot 一个任务的最大的并行度 一个流处理程序,到底包含多少个任务

4.3.5 任务链

两个要求:one to one 和 要相同的并行度

5.0 Flink 流处理API

5.1 Environment

底层调用的代码不一样 但是使用的时候 我们直接使用getExecutionEnvironment即可

5.2 Source

5.3 Transform

mapflatMapFilterKeyBy(键控流) : DataStream–>KeyedStream滚动聚合算子(针对的是每一个key , Rolling Aggregation) : sum() min() max() minBy() maxBy() 其中min 和 minBy 的区别就是: minBy会保存下除了相关字段的其他条件 ; 但是min不会,只会关注相关字段 Reduce (一般聚合)split 和 select (分流)Connext 和 CoMap|CoFlatMap (合流) 其实就是两个判断条件,但是这两个条件都是以流的形式存在,所以需要这两个流合并 连接两条流(两条路不一定要一样的结构) 也只能是两条流 连接两条流之后就已经变成ConnectedStream类型 Union(可以处理多条流 连接的流的类型必须是一样的 )

5.4 支持的数据类型

基础数据类型Java和scala元组(Tuples)scale样例类(case classes)Java简单对象(POJOS)其他(Arrays Lists Maps Enums)

5.5 实现UDF函数–更细粒度的控制流

函数类(Function Classes) Flink暴露了所有udf函数的接口 匿名函数(Lambda Functions)富函数(Rich Functions) 可以获取运行环境的上下文和状态,并拥有一些生命周期的方法(状态编程)

5.6 Sink

kafkaRedisElasticSearchJDBC 自定义sink

https://github.com/wushengran/FlinkTutorial.git

6.0 Flink中的Window

总结 :

主要就是Flink中的窗口函数的使用 可以使用简单的timewindow 来进行滚动或者滑动窗口 或者 countwindow 还是比较简单的 (也可以使用window() 比较一般的方法 具体的方法可以在代码上查看)

然后还有一些可以使用的其他api

重要的有两点 第一: 当开窗的时候 后面一定需要使用Window function 来进行聚合 (增量聚合函数 或 全窗口函数)


6.1 Window

window是一种切割无限数据为有限块进行处理的手段 因为我们真实情况中,对数据其实是有时间有要求的(譬如 1 个月 一年)这个时候的数据是还没有过来的,数据来了之后,判断时间,放入不同的桶里面 window 类型: CountWindow : 滚动计数窗口滑动计数窗口 TimeWindow : 滚动时间窗口(Tumbling Windows) window size特点:时间对齐,窗口长度固定,没有重叠(数据只属于一个窗口)范围是 前闭后开"[ )"相当于特殊的滑动窗口(size=slide) 滑动时间窗口(Sliding Windows) window size 和 window slide特点:时间对齐,窗口长度固定,可以有重叠window size 有几个window slide 那么数据就同时属于几个窗口 会话窗口(Session Windows) session gap特点:时间无对齐当两个数据之间的时间间隔大于等于window gap的时候,后一个数据就属于新的窗口window(EventTimeSessionWindows.withGap(Time.second(10))) 上述的方法都可以使用window的一般函数来进行调用

6.2 Window API

window()接收的输入参数是一个 window Assigner 之后用窗口函数来返回成为DataStream

window Function

增量聚合函数 来一个处理一次,保留当前状态(sum|max|reduceFunction|aggregateFunction) 全窗口函数 先收集所有的数据,等到计算的时候才会遍历所有数据*(相当于把所有的数据存储为状态)*apply(可以拿到Windows的信息)|process

其他API

trigger() —— 触发器,定义 window 什么时候关闭,触发计算并输出结果evitor() —— 移除器,定义移除某些数据的逻辑allowedLateness() —— 允许处理迟到的数据(触发计算但是不关闭窗口 快速的输出一个结果 但是之后时间允许继续更新数据)sideOutputLateData() —— 将迟到的数据放入侧输出流getSideOutput() —— 获取侧输出流



7.0 时间语义与watermark

总结 :

操作方式简单来讲就是先引入eventtime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 再对DataStream进行操作(引入watermark) dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[](Time.second(1)){ override def extractTimestamp(element: SensorReading): Long = { element.timestamp * 1000 } })

7.1 Flink中时间语义

Event time : 事件事件 , 最有意义Ingestion time : 数据进入Flink的时间Processing time : 处理数据的时间

7.2 EventTime的引入

val env = StreamExecutionEnvironment.getExecutionEnvironment // 从调用时刻开始给env创建的每一个stream追加时间特征 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

7.3 WaterMark

7.4 EventTime在window中的使用

乱序 : Flink接收到的事件的先后顺序不是严格按照时间的Event Time顺序排序的(事件时间的语义下才有意义)Watermark(水位线) : 相当于延迟关窗的时机 做到了关窗和放数据的分离watermark的传递 : 上游的多个watermark传递到这里时,选用最小的watermark ; 传递到下游的时候,全部发送一样的watermark周期性和非周期性的watermarkwatermark的设定 : maxOutOfOrderness 可能设定的不是特别理想 但是还是需要有一个权衡 一般的话 可以先调成小一点

flink里面的思想 : 你需要自己做权衡


7.4.1 offset 间隔的起始

底层有公式 " timestamp - (timestamp -offset + size) % size"


7.5 Flink状态管理

7.5.1 Flink中的状态

由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态可以认为状态就是一个本地变量,可以被任务的业务逻辑访问Flink 会进行状态管理,包括状态一致性、故障处理以及高效存储和访问,以便开发人员可以专注于应用程序的逻辑

7.5.2 算子状态(Operator State)

比较受限制 总是储存为列表的状态

7.5.3 键控状态(Keyed State)

最常用的方式,按照key来保存状态 就不需要一定是列表的形式

7.5.4 状态后端(State Backends)


总结 :

写代码的时候 主要的结构是 environment --> source --> transform --> sink我们可以操作的是 source 和 transform 和 sink source 上 可以获取不同的资源 集合 文件 套接字 kafka 自定义 transform 上可以进行不同的操作 一般的算子(keyBy map)还有进行各样的转换 --> keyedStream window watermark sink 上可以写入不同的地方 kafka Redis ElasticSearch jdbc 自定义

8.0 ProcessFunction API

本身继承了富函数 可以获取上下文 和 生命周期函数

8.1 KeyedProcessFunction

processElement(v: IN, ctx: Context, out: Collector[OUT]),

流中的每一个元素都会调用这个方法,调用结果将会放在Collector数据类型中输出Context可以访问元素的时间戳,元素的key,以及TimerService时间服务Context可以访问元素的时间戳,元素的key,以及TimerService时间服务

onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]) 是一个回调函数

参数timestamp为定时器所设定的触发的时间戳。Collector为输出结果的集合。OnTimerContext和processElement的Context参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。

8.2 TimerService 和 定时器(Timers)

Context和OnTimerContext所持有的TimerService对象拥有以下方法:

currentProcessingTime(): Long 返回当前处理时间currentWatermark(): Long 返回当前watermark的时间registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。当定时器timer触发时,会执行回调函数onTimer()。注意定时器timer只能在keyed streams上面使用。

8.3 侧输出流(SideOutput)

val monitoredReadings: DataStream[SensorReading] = readings .process(new FreezingMonitor) monitoredReadings .getSideOutput(new OutputTag[String]("freezing-alarms")) .print() readings.print() class FreezingMonitor extends ProcessFunction[SensorReading, SensorReading] { // 定义一个侧输出标签 lazy val freezingAlarmOutput: OutputTag[String] = new OutputTag[String]("freezing-alarms") override def processElement(r: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = { // 温度在32F以下时,输出警告信息 if (r.temperature < 32.0) { ctx.output(freezingAlarmOutput, s"Freezing Alarm for ${r.id}") } // 所有数据直接常规输出到主流 out.collect(r) } }

8.4 CoProcessFunction

对于两条输入流,DataStream API提供了CoProcessFunction这样的low-level操作。CoProcessFunction提供了操作每一个输入流的方法: processElement1()和processElement2()。

类似于ProcessFunction,这两种方法都通过Context对象来调用。这个Context对象可以访问事件数据,定时器时间戳,TimerService,以及side outputs。CoProcessFunction也提供了onTimer()回调函数。

9.0 状态编程和容错机制

9.1 有状态的算子和应用程序

算子状态 (operator state) 列表状态(List state)联合列表状态(Union list state)广播状态(Broadcast state) 键控状态(keyed state) ValueState[T]保存单个的值,值的类型为TListState[T]保存一个列表,列表里的元素的数据类型为TMapState[K, V]保存Key-Value对ReducingState[T]AggregatingState[I, O] 如果要用到keyedState 就一定要在keyedStream上 否则就会报错 这里的意思就是可以使用lazy来修饰 否则会报错 val sensorData: DataStream[SensorReading] = ... val keyedData: KeyedStream[SensorReading, String] = sensorData.keyBy(_.id) val alerts: DataStream[(String, Double, Double)] = keyedData .flatMap(new TemperatureAlertFunction(1.7)) class TemperatureAlertFunction(val threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)] { private var lastTempState: ValueState[Double] = _ override def open(parameters: Configuration): Unit = { val lastTempDescriptor = new ValueStateDescriptor[Double]("lastTemp", classOf[Double]) lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor) } override def flatMap(reading: SensorReading, out: Collector[(String, Double, Double)]): Unit = { val lastTemp = lastTempState.value() val tempDiff = (reading.temperature - lastTemp).abs if (tempDiff > threshold) { out.collect((reading.id, reading.temperature, tempDiff)) } this.lastTempState.update(reading.temperature) } }

9.2 状态一致性

一致性就是说 : 计算结果要保证准确 (数据既不丢失也不重复–>exactly-once)状态一致性分类 : (内部 : 依靠checkpoint来实现) AT-MOST-ONCE(最多一次)AT-LEAST-ONCE(至少一次)EXACTLY-ONCE(精确一次) end-to-end 状态一致性: (外部 : ) source 端 : 可重设数据的读取位置sink 端 : 从故障时 , 数据不会重复写入外部系统 幂等写入 : ( hashmap|set 等) (有K-V结构)事务写入 : 按照checkpoint捆绑来进行 预写日志(WAL)两阶段提交(2PC) : 先写入数据但是先不提交事务 Exactly-once 两阶段提交 有预提交和正式提交的两个阶段sink 完成Snapshot不以为着可以正式提交事务,有可能前面的checkpoint 没有完成 , 必须要要等jobmanager收到所有的checkpoint完成 并通知其他子任务 , sink才可以正式提交通知下游 , (此时sink可以再开一个新事务来继续接受数据)就会通知关闭此事务 , 可以消费数据了.

9.3 检查点

快照的时间 : 所有任务都恰好处理完一个相同的输入数据的时候

source要重置发生故障之前的授予数据

基于Chandy-Lamport算法的分布式快照 : 将检查点的保存和数据处理分离开 , 不暂停整个应用

Savepoints : 自定义的镜像保存功能

9.4 选择一个状态后端

状态的存储 维护以及访问,有一个可插入的组件决定,这个组件就是State Backends

那就需要保存,保存就是checkpoint ,还要有容错机制

MemoryStateBackend

内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上;而将checkpoint存储在JobManager的内存中。快速 低延迟 但不稳定一般用作测试和开发

FsStateBackend

将checkpoint存到远程的持久化文件系统(FileSystem)上。而对于本地状态,跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上。同时拥有内存级别的本地访问的速度,和更好的容错保证 但是内存可能会抗不住

RocksDBStateBackend

将所有状态序列化后,存入本地的RocksDB中存储。 RocksDB是一种kv结构的存储方法 可以存储足够的状态 但是访问速度太慢支持增量化状态写入

10.0 Table API 和 SQL

10.1 整体介绍


import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors._

这三个要隐式转换的 要有印象 不然会忘记了


Table API 和 Flink SQL 是什么

Flink对批处理和流处理提供了统一 的上层的APITable API 是一套内嵌在 Java 和 Scala 语言中的查询API,它允许以非常直观的方式组合来自一些关系运算符的查询Flink 的 SQL 支持基于实现了 SQL 标准的 Apache Calcite

引入的依赖

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.10.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>1.10.1</version> </dependency>

10.2 API调用

catalog : 高层级目录 catalog.database.table

版本不同有 oldPlanner 和 BlinkPlanner的区别 : blink批流统一

连接外部系统的时候 source 和 sink 其实差不多

创建表

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SQMuX52L-1599569101105)(https://raw.githubusercontent.com/tanzhongjingyue/OSS/master/img/image-20200904140838828.png)] [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kzka9xCI-1599569101107)(https://raw.githubusercontent.com/tanzhongjingyue/OSS/master/img/image-20200904140919877.png)]

输出表: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZpG2duuT-1599569101108)(https://raw.githubusercontent.com/tanzhongjingyue/OSS/master/img/image-20200904141006912.png)]

10.3 流处理中的特殊概念

更新模式

table转换成DataStream 有两种模式 : Append Mode和 Retract Mode

流处理和关系代数的区别

流被转换成动态表(Dynamic Tables) , 进行连续查询 , 生成新的动态表 , 再转换回流 Append-only 流Retract 流 两条流add消息和retract消息 Upsert 流 一条流(按照key来修改)upsert消息和delete消息

10.3.4 时间特性

引入的时间还不是很完善可以先了解一下

10.4 窗口(Windows)

时间语义,要配合窗口操作才能发挥作用group Windows 根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数 over Windows 针对每个输入行,计算相邻行范围内的聚合

10.4.1 分组窗口(Group Windows)

val table = input .window([w: GroupWindow] as 'w) // 定义窗口,别名 w .groupBy('w, 'a) // 以属性a和窗口w作为分组的key .select('a, 'b.sum) // 聚合字段b的值,求和 或者,还可以把窗口的相关信息,作为字段添加到结果表中: val table = input .window([w: GroupWindow] as 'w) .groupBy('w, 'a) .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count)

Table API提供了一组具有特定语义的预定义Window类,这些类会被转换为底层DataStream或DataSet的窗口操作

滚动窗口

// Tumbling Event-time Window(事件时间字段rowtime) .window(Tumble over 10.minutes on 'rowtime as 'w) // Tumbling Processing-time Window(处理时间字段proctime) .window(Tumble over 10.minutes on 'proctime as 'w) // Tumbling Row-count Window (类似于计数窗口,按处理时间排序,10行一组) .window(Tumble over 10.rows on 'proctime as 'w)

滑动窗口

// Sliding Event-time Window .window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w) // Sliding Processing-time window .window(Slide over 10.minutes every 5.minutes on 'proctime as 'w) // Sliding Row-count window .window(Slide over 10.rows every 5.rows on 'proctime as 'w)

会话窗口

// Session Event-time Window .window(Session withGap 10.minutes on 'rowtime as 'w) // Session Processing-time Window .window(Session withGap 10.minutes on 'proctime as 'w)

写法 :

table的api的写法SQL的写法

10.4.2 Over Windows

无界的over Windows

// 无界的事件时间over window (时间字段 "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w) //无界的处理时间over window (时间字段"proctime") .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w) // 无界的事件时间Row-count over window (时间字段 "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w) //无界的处理时间Row-count over window (时间字段 "rowtime") .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)

有界的over Windows api操作 :

// 有界的事件时间over window (时间字段 "rowtime",之前1分钟) .window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w) // 有界的处理时间over window (时间字段 "rowtime",之前1分钟) .window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w) // 有界的事件时间Row-count over window (时间字段 "rowtime",之前10行) .window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w) // 有界的处理时间Row-count over window (时间字段 "rowtime",之前10行) .window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)

10.4.3 SQL中窗口的定义

我们已经了解了在Table API里window的调用方式,同样,我们也可以在SQL中直接加入窗口的定义和使用。

SELECT COUNT(amount) OVER ( PARTITION BY user ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM Orders

10.4.4 代码练习

todo 要补充

10.5 函数(Functions)

10.5.1 系统内置函数

比较函数逻辑函数算数函数字符串函数时间函数 interval ‘10’ second | 10.seconds聚合函数

10.5.2 UDF(User-denfined Functions)

注册用户自定义函数UDF

SQL里面 先注册 才可以使用

标量函数(Scalar Functions)

只能输出单值

扩展基类 ScalaFunction , 写public的eval方法

class HashCode( factor: Int ) extends ScalarFunction { def eval( s: String ): Int = { s.hashCode * factor } }

表函数

返回任意数量的行(输出一张表)

扩展TableFunction , 写public的eval方法

class Split(separator: String) extends TableFunction[(String, Int)]{ def eval(str: String): Unit = { str.split(separator).foreach( word => collect((word, word.length)) ) }}

聚合函数

把一个表的数据聚合成一个标量值继承AggregateFunction抽象类实现三个方法 : createAccumulator()accumulate()getValue()

表聚合函数

可以把一个表中数据,聚合为具有多行和多列的结果表TableAggregateFunction 抽象类来实现 createAccumulator()accumulate()emitValue()
一对一 , 相当于 map一对多 , 相当于flatMap多对一 , 聚合函数多多多 , 表聚合函数

11.0 Flink CEP 简介

11.1 什么是复杂事件处理CEP

11.2 Flink CEP

最新回复(0)