flink传递参数给函数

tech2022-09-24  111

官网地址batch

可以使用构造函数或withParameters(Configuration)方法将参数传递给函数。这些参数将作为功能对象的一部分进行序列化,并交付给所有并行任务实例。

另请参阅有关如何将命令行参数传递给函数的最佳实践指南。

一、使用构造函数方式

DataSet<Integer> toFilter = env.fromElements(1, 2, 3); toFilter.filter(new MyFilter(2)); private static class MyFilter implements FilterFunction<Integer> { private final int limit; public MyFilter(int limit) { this.limit = limit; } @Override public boolean filter(Integer value) throws Exception { return value > limit; } }

二、withParameters(Configuration)方式

这个方法将会携带一个Configuration对象作为参数,这个参数将会传递给Rich Function的open方法(关于Rich Function参见:rich function)。Configuration对象是一个Map,存储Key/Value键值对.

DataSet<Integer> toFilter = env.fromElements(1, 2, 3); Configuration config = new Configuration(); config.setInteger("limit", 2); toFilter.filter(new RichFilterFunction<Integer>() { private int limit; @Override public void open(Configuration parameters) throws Exception { limit = parameters.getInteger("limit", 0); } @Override public boolean filter(Integer value) throws Exception { return value > limit; } }).withParameters(config);

三、使用全局的the ExecutionConfig方式

参数可以被所有的rich function获得

Configuration conf = new Configuration(); conf.setString("mykey","myvalue"); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(conf); public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { private String mykey; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); Configuration globConf = (Configuration) globalParams; mykey = globConf.getString("mykey", null); } // ... more here ...

​​​​​​​

最新回复(0)