如何将消息保存到数据库并将响应发送到主题最终一致?

我有以下 rabbitMq 消费者:

Consumer consumer = new DefaultConsumer(channel) {    @Override
     public void handleDelivery(String consumerTag, Envelope envelope, MQP.BasicProperties properties, byte[] body) throws IOException {            String message = new String(body, "UTF-8");
            sendNotificationIntoTopic(message);
            saveIntoDatabase(message);
     }
};

可能会出现以下情况:

  1. 消息已成功发送到主题

  2. 与数据库的连接丢失,因此数据库插入失败。

结果我们有数据不一致。

预期结果要么两个操作都成功执行,要么根本没有执行。

任何解决方案我怎样才能实现它?

聚苯乙烯

目前我有以下想法(请评论)

我们可以假设代理不会丢失任何消息。

我们必须订阅要发送的主题。

  1. 将条目保存到数据库中并设置status值为“pending”的 字段

  2. 尝试向主题发送数据。如果发送成功 - 更新status值为“成功”的字段

  3. 我们必须有一个计划作业,它必须检查具有挂起状态的行。目前可能有两种情况:
    3.1 根本没有发送通知
    3.2 发送了通知但存入数据库失败(概率很低但有可能)

    所以我们必须以某种方式区分这两种情况:我们可以将来自主题的消息存储在集合中,作业可以检查消息是否被接受。因此,如果作业找到与数据库行对应的消息,我们必须将状态更新为“成功”。否则我们必须从数据库中删除条目。

我认为我的想法有一些弱点(例如,如果我们有多节点应用程序,我们必须将消息存储在 hazelcast(或类似物)中,但这是假设失败的额外点)


侃侃无极
浏览 179回答 3
3回答

慕哥9229398

这是尝试取消确认模式 https://servicecomb.apache.org/docs/distributed_saga_3/的示例 ,它应该能够处理您的问题。您应该容忍通过队列重复提交数据的机会。这是一个例子:定义抽象操作并为操作分配 ID 和时间戳。将状态Pending写入数据库(可以和1一样的步骤)编写一个侦听器,轮询数据库中所有状态为挂起且早于“超时”的操作对于每个挂起的操作,通过具有分配 ID 的队列发送数据。接收方应该知道 ID,如果 ID 已被处理,则不会发生任何事情。6A。如果您需要 100% 确认操作已完成,您需要第二个队列,接收方将在其中发布消息 ID - DONE。如果不需要这种一致性,请跳过此步骤。或者,它可以发布 ID -Failed 失败原因。6B。提交方要么等待来自 6A 的消息,要么通过将状态 DONE 写入数据库来完成操作。一旦 sertine 超时已过或某个重试限制已过。您将状态写入操作 FAIL。您可以通过 ID 回滚将消息发送到接收方操作。请注意,所有这些步骤都不涉及技术事务。您可以使用非事务性数据库执行此操作。我写的是尝试取消确认模式的变体,其中每个消息接收者都应该知道如何管理自己的数据。

MM们

如果有足够的时间来修改设计,建议使用类似 JTA 的 API 来管理 2phase 提交。甚至 weblogic 和 WebSphere 也支持用于两阶段提交的 XA 资源。如果时间线较少,建议执行以下操作以减少失败间隔。发送数据主题(不提交)(incase topic down, retry to be perform with a interval)将数据写入数据库提交数据库提交主题只有当第 4 步失败时才会发生这里失败。这将导致再次发送相同的消息。所以接收系统会收到重复的消息。在JMS2.0 结构中,每条消息都有唯一的messageID 和CorrelationID。所以找到重复项有点直截了当(但这将在接收系统中处理)这两种情况也适用于集群环境。严格针对您的情况,认为以下步骤可能有助于解决您的问题为您的主题订阅一个侦听器 listener-1。过程-1为消息 msg-1 添加状态为“待发送”的数据库条目向主题发送消息 msg-1。在任何主题失败的情况下重试发送如果在某些重试后步骤 2 失败,process-1 必须在发送任何新消息之前重新发送 msg-1 或回滚步骤 1听众-1使用订阅的侦听器,从主题读取参考(meesageID/correlationID),并将数据库状态更新为已发送,并从主题读取/删除消息。如果参考读取成功并且数据库更新失败,主题仍然有消息。所以下一次读取将更新数据库。Incase 数据库更新成功但消息删除失败。听众将再次阅读并尝试更新已经完成的消息。所以验证后可以忽略。Incase listener 本身宕机,topic 将有消息,直到 listener 阅读消息。在此之前,SENT 消息将处于“待发送”状态。

慕侠2389804

在侦听器中保存数据库行,其中包含字段 staus='pending'另一个作业(独立线程)将从数据库中获取所有待处理的行,并对每一行进行以下操作:2.1 将数据发送到主题2.2 保存到数据库中如果我们在第 1 步失败- 一切正常 - 数据处于一致状态,因为作业不会知道该数据的任何信息如果我们在步骤 2.1 上失败了——没问题,下一个作业调用将尝试处理它如果我们在步骤 2.2 上失败了——如果我们在这里失败了——这意味着下一个作业调用将再次处理相同的数据。乍一看你可以认为这是一个问题。但是您的消费者必须是幂等的——这意味着它必须了解消息已经被处理并跳过处理。此要求是所有消息代理都保证消息将至少传递一次的结果。因此,无论如何,我们的消费者都必须为重复的消息做好准备。没问题了。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java