DataStream<Integer> dataStream = //... dataStream.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } }); flatMap 单词分隔
dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } }); filter 保留value=0的元素
dataStream.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } }); keyby
dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple reduce 求和
keyedStream.reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } }); fold A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ..
DataStream<String> result = keyedStream.fold("start", new FoldFunction<Integer, String>() { @Override public String fold(String current, Integer value) { return current + "-" + value; } }); Aggregations
keyedStream.sum(0); keyedStream.sum("key"); keyedStream.min(0); keyedStream.min("key"); keyedStream.max(0); keyedStream.max("key"); keyedStream.minBy(0); keyedStream.minBy("key"); keyedStream.maxBy(0); keyedStream.maxBy("key"); Window Join
dataStream.join(otherStream) .where(<key selector>).equalTo(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new JoinFunction () {...}); Interval Join
// this will join the two streams so that // key1 == key2 && leftTs - 2 < rightTs < leftTs + 2 keyedStream.intervalJoin(otherKeyedStream) .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound .upperBoundExclusive(true) // optional .lowerBoundExclusive(true) // optional .process(new IntervalJoinFunction() {...}); Split
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() { @Override public Iterable<String> select(Integer value) { List<String> output = new ArrayList<String>(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; } });