猿问

SlidingWindows Python Apache Beam 复制数据

问题


每次系统从带有滑动窗口的 pubsub 收到消息时,它都会被复制


代码


 | 'Parse dictionary' >> beam.Map(lambda elem: (elem['Serial'], int(elem['Value'])))    

 | 'window' >> beam.WindowInto(window.SlidingWindows(30, 15),accumulation_mode=AccumulationMode.DISCARDING)

 | 'Count' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())

输出


如果我只从 pub/sub 发送一条消息并尝试在滑动窗口完成后使用代码打印我所拥有的:


class print_row2(beam.DoFn):

    def process(self, row=beam.DoFn.ElementParam, window=beam.DoFn.WindowParam,timestamp=beam.DoFn.TimestampParam):

        print row, timestamp2str(float(window.start)), timestamp2str(float(window.end)),timestamp2str(float(timestamp))

结果


('77777', 120.0) 2018-11-16 08:21:15.000 2018-11-16 08:21:45.000 2018-11-16 08:21:45.000

('77777', 120.0) 2018-11-16 08:21:30.000 2018-11-16 08:22:00.000 2018-11-16 08:22:00.000

如果我在'window' >> beam.WindowInto(window.SlidingWindows(30, 15))只收到一次之前打印消息


“图形模式”下的流程:


  time: ----t+00---t+15---t+30----t+45----t+60------>

             :      :      :       :       :

  w1:        |=X===========|       :       :

  w2:               |==============|       :

  ...

消息 X 在滑动窗口开始时只发送了一次,应该只接收一次,但正在接收两次


我已经尝试了两个 AccumulationMode 值,也尝试了 trigger=AftyerWatermark 但我无法解决问题。


可能有什么问题?


额外的


使用 FixedWindows 这是我的海豚的正确代码:


| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))

| 'Speed Average' >> beam.GroupByKey()

| "Calculating average" >> beam.CombineValues(beam.combiners.MeanCombineFn())

要么


| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 30))

| "Calculating average" >> beam.CombinePerKey(beam.combiners.MeanCombineFn())


慕尼黑的夜晚无繁华
浏览 143回答 2
2回答

阿晨1998

我有完全相同的问题,但是在java中。我有一个持续时间为 10 秒和步长为 3 秒的窗口。当从我订阅的 mqtt 主题发出事件时,它看起来像我运行的 ParDo 函数,并向所有三个“构造”窗口发出第一个也是唯一的事件。X 是我以随机时间戳发送的事件:2020-09-15T21:17:57.292Z  time: ----t+00---t+15---t+30----t+45----t+60------>             :      :      :       :       :  w1:        |X============|       :       :  w2:               |X=============|       :  w3:                      |X==============|  ...甚至为它们分配了相同的时间戳!!我一定真的做错了什么。我将 Scala 2.12 和 BEAM 2.23 与 Direct Runner 一起使用。[提示]:我在 processElement 函数中使用状态!每个键 + 窗口保持状态的位置。也许那里有错误?我将尝试在没有状态的情况下对其进行测试。更新:删除了状态字段,并将单个事件分配给一个窗口。
随时随地看视频慕课网APP

相关分类

Python
我要回答