猿问

Apache Flink - 事件时间窗口

我想在 Apache flink 中创建键控窗口,以便每个键的窗口在键的第一个事件到达后 n 分钟执行。是否可以使用事件时间特性来完成(因为处理时间取决于系统时钟,并且不确定第一个事件何时到达)。如果可能,请解释事件时间和水印的分配也给事件,并解释如何在 n 分钟后调用进程窗口函数。


以下是一部分代码,可以让您了解我目前在做什么:


            //Make keyed events so as to start a window for a key

            KeyedStream<SourceData, Tuple> keyedEvents = 

                    env.addSource(new MySource(configData),"JSON Source")

                    .assignTimestampsAndWatermarks(new MyTimeStamps())

                    .setParallelism(1)

                    .keyBy("service");



            //Start a window for windowTime time

            DataStream<ResultData> resultData=

                    keyedEvents

                    .timeWindow(Time.minutes(winTime))

                    .process(new ProcessEventWindow(configData))

                    .name("Event Collection Window")

                    .setParallelism(25);

那么,我将如何分配事件时间和水印,以便窗口遵循第一个事件的事件时间作为起点并在 10 分钟后执行(第一个事件的开始时间对于不同的键可能不同)。任何帮助将非常感激。


        /------------ ( window of 10 minutes )

Streams          |------------ ( window of 10 minutes )

            \------------ ( window of 10 minutes )

编辑:我用于分配时间戳和水印的类


public class MyTimeStamps implements AssignerWithPeriodicWatermarks<SourceData> {


    @Override

    public long extractTimestamp(SourceData element, long previousElementTimestamp) {


          //Will return epoch of currentTime

        return GlobalUtilities.getCurrentEpoch();

    }


    @Override

    public Watermark getCurrentWatermark() {

        // TODO Auto-generated method stub

        //Will return epoch of currentTime + 10 minutes

        return new Watermark(GlobalUtilities.getTimeShiftNMinutesEpoch(10));

    }


}


米琪卡哇伊
浏览 162回答 2
2回答

阿晨1998

我认为对于您的用例,最好使用ProcessFunction。您可以做的是在第一个事件到来时注册一个 EventTimeTimer。比在onTimer方法中发出结果。就像是:public class ProcessFunctionImpl extends ProcessFunction<SourceData, ResultData> {&nbsp; &nbsp; @Override&nbsp; &nbsp; public void processElement(SourceData value, Context ctx, Collector<ResultData> out)&nbsp; &nbsp; &nbsp; &nbsp; throws Exception {&nbsp; &nbsp; &nbsp; &nbsp; // retrieve the current aggregate&nbsp; &nbsp; &nbsp; &nbsp; ResultData current = state.value();&nbsp; &nbsp; &nbsp; &nbsp; if (current == null) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // first event arrived&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; current = new ResultData();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // register end of window&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 60 * 1000 /* 10 minutes */);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; // update the state's aggregate&nbsp; &nbsp; &nbsp; &nbsp; current += value;&nbsp; &nbsp; &nbsp; &nbsp; // write the state back&nbsp; &nbsp; &nbsp; &nbsp; state.update(current);&nbsp; &nbsp; }&nbsp; &nbsp; @Override&nbsp; &nbsp; public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultData> out)&nbsp; &nbsp; &nbsp; &nbsp; throws Exception {&nbsp; &nbsp; &nbsp; &nbsp; // get the state for the key that scheduled the timer&nbsp; &nbsp; &nbsp; &nbsp; ResultData result = state.value();&nbsp; &nbsp; &nbsp; &nbsp; out.collect(result);&nbsp; &nbsp; &nbsp; &nbsp; // reset the window state&nbsp; &nbsp; &nbsp; &nbsp; state.clear();&nbsp; &nbsp; }}

皈依舞

不久前我有一个关于事件时间窗口的类似问题。这是我的流的样子val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//Consumer Setupval stream = env.addSource(consumer)&nbsp; .assignTimestampsAndWatermarks(new WMAssigner)// Additional Setup herestream&nbsp; .keyBy { data => data.findValue("service") }&nbsp; .window(TumblingEventTimeWindows.of(Time.minutes(10)))&nbsp; .process { new WindowProcessor }&nbsp; //Sinks go here我的 WMAssigner 类看起来像这样(注意:这允许 1 分钟的乱序事件发生,如果您不想延迟,您可以扩展不同的时间戳提取器):class WMAssigner extends BoundedOutOfOrdernessTimestampExtractor[ObjectNode] (Time.seconds(60)) {&nbsp; override def extractTimestamp(element: ObjectNode): Long = {&nbsp; &nbsp; val tsStr = element.findValue("data").findValue("ts").toString replaceAll("\"", "")&nbsp; &nbsp; tsStr.toLong&nbsp; }}我想用于水印的时间戳是 data.ts 字段。我的窗口处理器:class WindowProcessor extends ProcessWindowFunction[ObjectNode,String,String,TimeWindow] {&nbsp; override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[String]): Unit = {&nbsp; &nbsp; val out = ""&nbsp; &nbsp; elements.foreach( value => {&nbsp; &nbsp; &nbsp; out = value.findValue("data").findValue("outData")&nbsp; &nbsp; }&nbsp; &nbsp; out.collect(out)&nbsp; }}如果有任何不清楚的地方,请告诉我
随时随地看视频慕课网APP

相关分类

Java
我要回答