Flink

tech2025-04-04  14

基础的转换算子是无法访问事件的时间戳信息和水位线信息的。

基于此,DataStream API 提供了一系列的Low-Level的转换算子。可以访问事件戳,watermarker以及注册定时事件。

Flink 提供了8个Process Function。

ProcessFunction:最底层的;KeyedProcessFunction:处理keyBy之后的流。CoProcessFunction:处理collect操作之后的流。ProcessJoinFunction:处理join操作之后的流。BroadcastProcessFunction:广播的KeyedBroadcastProcessFunction:处理keyBy之后广播的流。ProcessWindowFunction:窗口函数ProcessAllWindowFunction:全窗口函数

KeyedProcessFunction

keyedProcessFunction用来操作KeyedStream; 所有的Process Function都继承自RichFunction接口,都有open()、close()和getRuntimeContext()等方法。而KeyedProcessFunction[KEY,IN,OUT]还额外提供了两个方法:

TimeService和定时器(Timers)

Context 和 OnTimerContext 所持有的 TimerService对象拥有以下方法:

当定时器 timer 触发时,会执行回调函数 onTimer()。注意定时器 timer 只能在 Keyed Streams 上面使用。

Emitting to Side Outputs(侧输出)

大部分的 DataStream API的算子的输出是单一输出,也就是某种类型的数据流。除了 split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。

process function 的 side outputs 功能可以产生多条流,并且这些流的数据类型不一样。一个side output 可以定义为 OutputTag[X]对象, X是输出流的数据类型。process function 可以通过 Context 对象发射一个事件到一个或者多个 side outputs。

例子

package com.dongk.flink.processfunction import com.dongk.flink.stream.SensorReading import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction} import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream} import org.apache.flink.util.Collector import org.apache.flink.streaming.api.scala._ /** * * 侧输出流例子 * * 判断传感器温度、低于某个值,输出到一个流。其余正常的输出到一个流 **/ object ProcessFunctionTest1 { def main(args: Array[String]): Unit = { //环境变量 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) /** * * 1、实现10s分钟的单词计数; * 2、利用socket数据 * nc -lk 7777 * */ val socketStream : DataStream[String] = env.socketTextStream("localhost",7777) val stream1 : DataStream[SensorReading] = socketStream.map(text => { val datas = text.split(",") SensorReading(datas(0).trim,datas(1).toLong,datas(2).trim.toDouble) }) val stream2 = stream1.process(new FreezingAlter) //主流 stream2.print() stream2.getSideOutput(new OutputTag[String]("freezing")).print() env.execute("process function output ") } } /** * 低温预警 * * 第一个泛型:输入的数据类型 * 第二个泛型:主输出流的输出类型(不是侧输出流的输出类型) * */ class FreezingAlter extends ProcessFunction[SensorReading,SensorReading]{ lazy val alterOutput : OutputTag[String] = new OutputTag[String]("freezing") override def processElement(value : SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = { //输出到侧输出流 if(value.temperature < 32.0){ ctx.output(alterOutput,"freezing alter for" + value.id) }else{ //输出到主流 out.collect(value) } } }

CoProcessFunction

对于两条输入流,DataStream API 提供了CoProcessFunction这样的low-level操作。CoProcessFunction 提供了操作每一个输入流的方法:processElement1() 和 processElement()。

类似于ProcessFunction,这两种方法都通过Context对象来调用。这个Context对象可以访问事件数据,定时器事件戳,TimerService,以及side outputs。

CoProcessFunction 也提供了 onTimer() 回调函数。

最新回复(0)