慕盖茨5726191
2019-05-26 21:08
这个报错该怎么解决
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits .keyBy(new KeySelector<WikipediaEditEvent, String>() { @Override public String getKey(WikipediaEditEvent event) { return event.getUser(); } });
package org.myorg.quickstart; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent; import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource; /** * 并行度,第n个子任务> (用户,改动的字节数) * 4> (Artegia,3) */ public class WikipediaAnalysis { public static void main(String[] args) throws Exception{ // 创建一个Streaming程序运行的上下文 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // source部分--数据来源部分 DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource()); // 统计key KeyedStream<WikipediaEditEvent, String> keyedEdits = edits .keyBy((KeySelector<WikipediaEditEvent, String>) event -> { return event.getUser(); }); // 窗口 DataStream<Tuple2<String, Long>> result = keyedEdits // 每5秒 .timeWindow(Time.seconds(5)) // 指定一个初识值 .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) throws Exception { acc.f0 = event.getUser(); acc.f1 += event.getByteDiff(); return acc; } }); result.print(); env.execute(); } }
Flink入门
15548 学习 · 20 问题
相似问题