猿问

如何在不使用 Dataflow 且不使用 Python 中的 ACK 的情况下排除 Pubsub

我有一个用例,我想在不确认消息的情况下阅读 Pubsub 中的消息。当我不确认传递的消息时,我需要有关如何排除“重复消息”的可能性的帮助,这些消息将保留在 Pubsub 存储中。

我想到的解决方案:

  1. 将拉取的消息存储在 Datastore 中,看看它们是否相同。

  2. 在运行时存储拉取的消息并检查我的消息是否重复 O(n) 时间复杂度和空间复杂度 O(n)。

  3. 将拉取的消息存储在文件中,并比较来自文件中消息的新传入消息。

  4. 使用数据流并排除可能性(最不期望)

我认为 Pubsub 中没有类似于 Kafka 的偏移功能。

您在这件事上建议的最佳方法/或我可以使用的任何其他替代方法是什么?

我正在使用 python google-cloud-pubsub_v1 创建一个 python 客户端并从 Pubsub 中提取消息。

我正在共享代码,这是提取数据的逻辑

subscription_path = subscriber.subscription_path(

    project_id, subscription_name)

    NUM_MESSAGES = 3


    # The subscriber pulls a specific number of messages.

    response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)


    for received_message in response.received_messages:

        print(received_message.message.data)


守着一只汪
浏览 206回答 1
1回答

月关宝盒

听起来 Pub/Sub 可能不是适合这项工作的工具。似乎您正在尝试将 Pub/Sub 用作持久数据存储,这不是预期的用例。Acking 是 Cloud Pub/Sub 消息生命周期的基本组成部分。如果 Pub/Sub 消息在提供的消息保留期(不能超过 7 天)后未被确认,则会被删除。我建议您考虑使用像Cloud Spanner这样的 SQL 数据库。然后,您可以为每条消息生成一个 uuid,将其用作重复数据删除的主键,并以事务方式更新数据库以确保没有重复。如果您提供有关您计划如何处理重复数据删除消息的更多信息,我可能会为您提供更好的答案。
随时随地看视频慕课网APP

相关分类

Python
我要回答