慕盖茨5726191
2019-05-26 21:08

这个报错该怎么解决
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
.keyBy(new KeySelector<WikipediaEditEvent, String>() {
@Override
public String getKey(WikipediaEditEvent event) {
return event.getUser();
}
});
package org.myorg.quickstart;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
/**
* 并行度,第n个子任务> (用户,改动的字节数)
* 4> (Artegia,3)
*/
public class WikipediaAnalysis {
public static void main(String[] args) throws Exception{
// 创建一个Streaming程序运行的上下文
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// source部分--数据来源部分
DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());
// 统计key
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
.keyBy((KeySelector<WikipediaEditEvent, String>) event -> {
return event.getUser();
});
// 窗口
DataStream<Tuple2<String, Long>> result = keyedEdits
// 每5秒
.timeWindow(Time.seconds(5))
// 指定一个初识值
.fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) throws Exception {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
result.print();
env.execute();
}
}Flink入门
15694 学习 · 20 问题
相似问题