Flink

tech2022-08-20  63

实现UDF的目的是为了更加细粒度的控制流。

函数类(Function Classes)

Flink暴露了所有udf函数的接口。例如MapFunction、FilterFunction、ProcessFunction等等。

例子:自定义实现FilterFunction;

package com.dongk.flink.udf import com.dongk.flink.udf.flow.FlowSource import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import com.dongk.flink.udf.flow.Flow import org.apache.flink.api.common.functions.FilterFunction object UdfTest { def main(args: Array[String]): Unit = { //环境变量 val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.addSource(new FlowSource()) //使用自定的Filter val stream1 : DataStream[Flow] = stream.filter(new MyFilter("PVG")) stream1.print() env.execute("transform test") } } class MyFilter(portCode : String) extends FilterFunction[Flow]{ override def filter(flow : Flow): Boolean = { val portCode = flow.portCode if(portCode.equals(portCode)) return true else return false } }

富函数(Rich Functions)

“富函数” 是DataStream API提供的一个函数类接口,所有Filnk函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并且拥有一些生命周期方法,所以可以实现更加复杂的功能。

RichMapFunctionRichFlatMapFunctionRiceFilterFunction…

Rich Function 有一个生命周期的概念,典型的生命周期方法有:

1、open() 初始化方法;

2、close() 生命周期中最后调用的方法,做一些清理工作;

3、getRuntimeContext() 提供了RuntimeContext的一些信息,例如函数执行的并行度,任务的名字等。

例子:

package com.dongk.flink.udf import com.dongk.flink.udf.flow.FlowSource import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import com.dongk.flink.udf.flow.Flow import org.apache.flink.api.common.functions.{FilterFunction, RichMapFunction} import org.apache.flink.configuration.Configuration object UdfTest1 { def main(args: Array[String]): Unit = { //环境变量 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val stream = env.addSource(new FlowSource()) //使用自定的Filter val stream1 : DataStream[String] = stream.map(new MyMapper()) stream1.print() env.execute("transform test") } } /** * 第一个泛型表示输入的数据类型; * 第二个泛型表示输出的数据类型; * */ class MyMapper extends RichMapFunction[Flow,String]{ /** * * 1、在RichMapFunction子类创建的时候执行,只执行一次; * 2、在map之前执行 * */ override def open(parameters: Configuration): Unit = { //可以做一些初始化的工作 println("open open open open") } override def map(in: Flow): String = { return in.portCode } /** * * map程序执行完之后就会进行close操作 * */ override def close(): Unit = { //可以做一些释放数据库连接等操作 println("close close close close") } }
最新回复(0)