如何仅在第一个元素上在 CustomTrigger 中启动处理时间计时器?

我正在Trigger为我的应用程序使用带有自定义功能的 GlobalWindow。根据要求,在 Trigger 函数中,我只需要在窗口中的第一个元素上启动一个处理时间计时器。


我尝试使用变量来实现它firstEventflag。像这样。


.window(GlobalWindows.create())

.trigger(new Trigger<ImpactEventObject, GlobalWindow>() {

    Boolean firstEventflag = false;


    @Override

    public TriggerResult onElement(ImpactEventObject impactEventObject, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {

        if (!firstEventflag) {

            firstEventflag = true;

            triggerContext.registerProcessingTimeTimer(

                triggerContext.getCurrentProcessingTime() + 20000);

        }

        return TriggerResult.CONTINUE;

    }


    @Override

    public TriggerResult onProcessingTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {

    return TriggerResult.FIRE;

}

但这失败了,因为今天我发现firstEventflag每次创建新窗口时都没有初始化变量,它取决于正在处理窗口的子任务,这意味着不同的窗口可以共享同一个变量,firstEventflag从而使这个逻辑实际上毫无用处。鉴于此,我该如何解决我的问题?


慕仙森
浏览 129回答 1
1回答

函数式编程

通过查看CountTrigger 这里的源代码想出了一种方法。GlobalWindow我们可以用 a来保持元素的数量ReducingStateDescriptor。并在此计数为 1 时启动计时器,这意味着 - 仅在第一个元素上启动计时器。public class CustomTrigger extends Trigger<GenericObject, GlobalWindow> {private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);@Overridepublic TriggerResult onElement(ImpactEventObject impactEventObject, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {&nbsp; &nbsp; ReducingState<Long> count = triggerContext.getPartitionedState(stateDesc);&nbsp; &nbsp; count.add(1L);&nbsp; &nbsp; if (count.get() == 1) {&nbsp; &nbsp; &nbsp; &nbsp; triggerContext.registerProcessingTimeTimer(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; triggerContext.getCurrentProcessingTime() + 20000);&nbsp; &nbsp; }&nbsp; &nbsp; return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {&nbsp; &nbsp; return TriggerResult.FIRE;}@Overridepublic TriggerResult onEventTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {&nbsp; &nbsp; return null;}@Overridepublic void clear(GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {&nbsp; &nbsp; triggerContext.deleteProcessingTimeTimer(triggerContext.getCurrentProcessingTime());}private static class Sum implements ReduceFunction<Long> {&nbsp; &nbsp; private static final long serialVersionUID = 1L;&nbsp; &nbsp; @Override&nbsp; &nbsp; public Long reduce(Long value1, Long value2) throws Exception {&nbsp; &nbsp; &nbsp; &nbsp; return value1 + value2;&nbsp; &nbsp; }}}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java