我FlinkKafkaConsumer011订阅了一个主题。我希望apply在每个 kafka 消费者消息上处理 ( ),因此自定义在每个元素FooTrigger上返回TriggerResult.FIRE。
以下代码有效,我只是对timeWindowAll(Time.minutes(1)). 看起来我做错了什么。
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// create a Kafka consumer
FlinkKafkaConsumer011<Foo> consumer = new FlinkKafkaConsumer011<>(
"topic",
new Foo.FooSchema(),
props); // Properties object
// create Kafka consumer data source
DataStream<FooTuple> trades = env.addSource(consumer)
.timeWindowAll(Time.minutes(1))
.trigger(new FooTrigger())
.evictor(new FooEvictor())
.apply(new CreateFoos());
当年话下
相关分类