问题
每次系统从带有滑动窗口的 pubsub 收到消息时,它都会被复制
代码
| 'Parse dictionary' >> beam.Map(lambda elem: (elem['Serial'], int(elem['Value'])))
| 'window' >> beam.WindowInto(window.SlidingWindows(30, 15),accumulation_mode=AccumulationMode.DISCARDING)
| 'Count' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
输出
如果我只从 pub/sub 发送一条消息并尝试在滑动窗口完成后使用代码打印我所拥有的:
class print_row2(beam.DoFn):
def process(self, row=beam.DoFn.ElementParam, window=beam.DoFn.WindowParam,timestamp=beam.DoFn.TimestampParam):
print row, timestamp2str(float(window.start)), timestamp2str(float(window.end)),timestamp2str(float(timestamp))
结果
('77777', 120.0) 2018-11-16 08:21:15.000 2018-11-16 08:21:45.000 2018-11-16 08:21:45.000
('77777', 120.0) 2018-11-16 08:21:30.000 2018-11-16 08:22:00.000 2018-11-16 08:22:00.000
如果我在'window' >> beam.WindowInto(window.SlidingWindows(30, 15))只收到一次之前打印消息
“图形模式”下的流程:
time: ----t+00---t+15---t+30----t+45----t+60------>
: : : : :
w1: |=X===========| : :
w2: |==============| :
...
消息 X 在滑动窗口开始时只发送了一次,应该只接收一次,但正在接收两次
我已经尝试了两个 AccumulationMode 值,也尝试了 trigger=AftyerWatermark 但我无法解决问题。
可能有什么问题?
额外的
使用 FixedWindows 这是我的海豚的正确代码:
| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| 'Speed Average' >> beam.GroupByKey()
| "Calculating average" >> beam.CombineValues(beam.combiners.MeanCombineFn())
要么
| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))
| "Calculating average" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
阿晨1998
相关分类