查看消息队列中消息的列表
bin/kakfa-topics.sh --list --zookeeper localhost:2181
需求:有一个图书店铺,需要知道在促销期间每一本书买了多少
指定消费的位置,从哪里开始消费呢
consumer.setStartFromEarliest();
在上下文中设置source
env.addSource(consumer);
注册内存表
写sql
非常重要的知识点:回退更新:
tenv.toRetractStream(result,Row.class).print();
env.execute();
回退更新的写法
kafka相关集成+添加到source;注意start from earliest