我有以下 rabbitMq 消费者:
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, MQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); sendNotificationIntoTopic(message); saveIntoDatabase(message); } };
可能会出现以下情况:
消息已成功发送到主题
与数据库的连接丢失,因此数据库插入失败。
结果我们有数据不一致。
预期结果要么两个操作都成功执行,要么根本没有执行。
任何解决方案我怎样才能实现它?
目前我有以下想法(请评论)
我们可以假设代理不会丢失任何消息。
我们必须订阅要发送的主题。
将条目保存到数据库中并设置status
值为“pending”的 字段
尝试向主题发送数据。如果发送成功 - 更新status
值为“成功”的字段
我们必须有一个计划作业,它必须检查具有挂起状态的行。目前可能有两种情况:
3.1 根本没有发送通知
3.2 发送了通知但存入数据库失败(概率很低但有可能)
所以我们必须以某种方式区分这两种情况:我们可以将来自主题的消息存储在集合中,作业可以检查消息是否被接受。因此,如果作业找到与数据库行对应的消息,我们必须将状态更新为“成功”。否则我们必须从数据库中删除条目。
我认为我的想法有一些弱点(例如,如果我们有多节点应用程序,我们必须将消息存储在 hazelcast(或类似物)中,但这是假设失败的额外点)
慕哥9229398
MM们
慕侠2389804
相关分类