RocketMQ源码解读——message顺序发送
文章出处:原文地址
RocketMQ源码解读——message顺序发送
前言
RocketMQ提供了两种顺序级别:
普通顺序消息:Producer将相关联的消息发送到相同的消息队列。
严格顺序消息:在 普通顺序消息 的基础上,Consumer严格顺序消费。
目前已知的严格顺序消息
,只使用在数据库binlog
同步中。
Producer顺序
普通顺序消息是使用MessageQueueSelector
来实现的:
SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId);
以上的例子来自rocketmq的官方文档,把orderId
作为选择队列的条件,让相同的orderId
进入相同的队列。
这里的源码就不看了,就是使用我们提供的MessageQueueSelector
来选择一个我们想要的队列,然后发送消息。
Consumer顺序
consumer在严格顺序消费时,通过三把锁保证严格顺序消费:
Broker消息队列锁
(分布式锁):集群模式下,Consumer从Broker获得该锁后,才能进行消息拉取、消费。广播模式下,Consumer无需该锁(对于广播模式的Consumer来说,所有的队列在自己眼里就是属于自己的,所以不需要一个锁)。Consumer消息队列锁
(本地锁):Consumer获得该锁才能操作消息队列。Consumer消息处理队列消费锁
(本地锁) :Consumer获得该锁才能消费消息队列
RocketMQ的消息拉取之前已经说过了是通过一个队列,不停地从里面去PullRequest来进行消息拉取的。初始化一条队列的拉取是在rebalance过程中进行的,我们看一下updateProcessQueueTableInRebalance
:
List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { //如果是严格顺序,则尝试锁住远端的队列,如果没锁住,下次rebalance的时候还会尝试获取锁,但是本次就continue掉了 if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); //初始化拉取操作 PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } //拉取 this.dispatchPullRequest(pullRequestList);
继续看一下如何锁住队列的:
public boolean lock(final MessageQueue mq) { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); if (findBrokerResult != null) { LockBatchRequestBody requestBody = new LockBatchRequestBody(); requestBody.setConsumerGroup(this.consumerGroup); requestBody.setClientId(this.mQClientFactory.getClientId()); requestBody.getMqSet().add(mq); try { // 请求Broker获得指定消息队列的分布式锁 Set<MessageQueue> lockedMq = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); // 获取远端的锁定的队列,然后锁住本地的processQueue for (MessageQueue mmqq : lockedMq) { ProcessQueue processQueue = this.processQueueTable.get(mmqq); if (processQueue != null) { processQueue.setLocked(true); processQueue.setLastLockTimestamp(System.currentTimeMillis()); } } //如果远端锁定的队列里包含要锁的这个队列,则返回成功 boolean lockOK = lockedMq.contains(mq); return lockOK; } catch (Exception e) { } } return false; }
这里有个问题,我们看到:最终是否锁定成功的判断是远端锁定的队列是否包含我们希望锁住的队列。这个判断是否有问题呢?如果远端锁住这个mq的并不是当前的consumer,而是另外一个consumer呢?
这里我们看一下获取锁定队列的条件其实有三个:consumerGroup
、mQClientFactory.getClientId()
和MessageQueue
。这里是根据自己的clientId去获取的远端锁定队列,也就是获取当前consumer的锁定队列,所以不会出现上面的问题。
锁有过期时间:
public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } }
客户端会启动一个定时任务去刷新锁定时间,而一旦某个锁定了队列的consumer挂掉,也可以释放锁,保证这个队列不会一直处在锁死的状态。我们看一下lockAll:
//去远端锁住队列 Set<MessageQueue> lockOKMQSet = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); for (MessageQueue mq : lockOKMQSet) { ProcessQueue processQueue = this.processQueueTable.get(mq); if (processQueue != null) { if (!processQueue.isLocked()) { } processQueue.setLocked(true); processQueue.setLastLockTimestamp(System.currentTimeMillis()); } } for (MessageQueue mq : mqs) { if (!lockOKMQSet.contains(mq)) { //释放远端释放了锁,但是本地仍然持有锁的队列 ProcessQueue processQueue = this.processQueueTable.get(mq); if (processQueue != null) { processQueue.setLocked(false); } } }
先去获取一下broker的地址,然后去批量的锁住队列和解锁远端已经释放锁的队列。
移除消息队列
集群模式下,Consumer移除自己的消息队列时,会让Broker解锁该队列:
// 同步队列的消费进度并移除 this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq); this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); // 集群模式下并且是严格顺序时,解锁对队列的锁定 if (this.defaultMQPushConsumerImpl.isConsumeOrderly() && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { try { //尝试锁住本地的queue if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) { try { //解锁mq(broker上处理) return this.unlockDelay(mq, pq); } finally { //解锁本地的processQueue pq.getLockConsume().unlock(); } } else { //这个process queue正在被消费 pq.incTryUnlockTimes(); } } catch (Exception e) { } return false; } return true;
先获取消息队列消费锁。避免和消息队列消费冲突。如果获取锁失败,则移除消息队列失败,等待下次重新分配消费队列时(Rebalance),再进行移除。
如果未获得锁而进行移除,则可能出现正在消费消息时就把队列移除掉了。如果消费出问题,没法保证严格顺序。
看一下unlockDelay
(延迟解锁Broker消息队列锁,当消息处理队列不存在消息,则直接解锁):
//解锁Broker消息队列锁。如果消息处理队列存在剩余消息,则延迟解锁Broker消息队列锁。 if (pq.hasTempMessage()) { this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() { @Override public void run() { RebalancePushImpl.this.unlock(mq, true); } }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS); } else { this.unlock(mq, true); } return true;
消费消息
主要看一下ConsumeRequest
(在ConsumeMessageOrderlyService
中),代码有点多,分段来看,最开始是获取MessageQueue
的锁:
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
仍然是一个本地锁,MessageQueue
是远端队列的一个抽象,这个地方的锁保证这个队列只被当前线程消费。
然后是校验锁状态的部分:
//如果这个队列没有被锁定,则提交分布式锁,下次消费 if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && !this.processQueue.isLocked()) { log.warn("the message queue not locked, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } //如果这个队列的锁过期了,同样提交分布式锁,下次消费 if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && this.processQueue.isLockExpired()) { log.warn("the message queue lock expired, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; }
然后是消费延时的部分,这里是考虑到一次消费的时间不能太长,如果提交锁花费了时间比较长,则会提交一个优先级比较低的消费消息的任务:
//这里这个beginTime是在进入循环之前的时间 long interval = System.currentTimeMillis() - beginTime; if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); break; }
然后获取要消费的消息:
//和并发消费获得消息不同。并发消费请求在请求创建时,已经设置好消费哪些消息。 final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
然后是初始化上下文等,最终执行消费逻辑的代码我们看一下:
this.processQueue.getLockConsume().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
这里是先获取锁,后消费(this.processQueue.getLockConsume().lock()
),之前我们在移除队列的地方说过,为什么移除前要获取锁,就是这个原因:因为消费时要先获取锁,如果移除时获取锁失败,说明这个队列正在被消费,需要过一会再移除。
处理消费结果
主要是ConsumeMessageOrderlyService.this.processConsumeResult
,ConsumeOrderlyStatus
有四种状态:
SUCCESS
:消费成功但不提交。ROLLBACK
:消费失败,消费回滚。COMMIT
:消费成功提交并且提交。SUSPEND_CURRENT_QUEUE_A_MOMENT
:消费失败,挂起消费队列一会会,稍后继续消费。
在并发消费场景时,如果消费失败,Consumer会将消费失败消息发回到 Broker重试队列,跳过当前消息,等待下次拉取该消息再进行消费。
但是在完全严格顺序消费消费时,这样做显然不行。也因此,消费失败的消息,会挂起队列一会会,稍后继续消费。
不过消费失败的消息一直失败,也不可能一直消费。当超过消费重试上限时,Consumer会将消费失败超过上限的消息发回到Broker死信队列。
看下源码:
if (context.isAutoCommit()) { switch (status) { case COMMIT: case ROLLBACK: log.warn("the message queue consume result is illegal, we think you want to ack these message {}", consumeRequest.getMessageQueue()); case SUCCESS: //提交消费进度 commitOffset = consumeRequest.getProcessQueue().commit(); // 统计 this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); break; case SUSPEND_CURRENT_QUEUE_A_MOMENT: //统计 this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); //这个方法中会把消费次数大于最大次数的消息发到broker中的死信队列。 if (checkReconsumeTimes(msgs)) { //设置可以再次消费(重新放回map中) consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); //提交延迟消费请求,一会再消费(及挂起一会) this.submitConsumeRequestLater( consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; } else { //如果消息消费次数大于最大次数且被成功放入死信队列,则提交进度,不需要挂起 commitOffset = consumeRequest.getProcessQueue().commit(); } break; default: break; } } else { switch (status) { case SUCCESS: this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); break; case COMMIT: commitOffset = consumeRequest.getProcessQueue().commit(); break; case ROLLBACK: // 设置消息重新消费 consumeRequest.getProcessQueue().rollback(); this.submitConsumeRequestLater( consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; break; case SUSPEND_CURRENT_QUEUE_A_MOMENT: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); if (checkReconsumeTimes(msgs)) { consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); this.submitConsumeRequestLater( consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; } break; default: break; } }
这里,自动提交和非自动提交差别不大,我们看一下checkReconsumeTimes
方法:
boolean suspend = false; if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) { MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes())); //如果发回失败,则返回中断,需要下次再处理 if (!sendMessageBack(msg)) { suspend = true; msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); } } else { suspend = true; msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); } } } return suspend;
这个方法中,校验消息的是否可以再次消费,如果不可以,尝试发回broker的死信队列,如果可以重新消费,返回挂起(suspend = true)。
我们看一下sendMessageBack
方法:
try { // max reconsume times exceeded then send to dead letter queue. Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody()); String originMsgId = MessageAccessor.getOriginMessageId(msg); MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); newMsg.setFlag(msg.getFlag()); MessageAccessor.setProperties(newMsg, msg.getProperties()); MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes())); MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes())); newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg); return true; } catch (Exception e) { log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e); } return false;
我们看一下这里,这里并不是直接发送给私信队列,而是发送给RETRY队列,这部分代码逻辑在Broker里,需要我确认一下,应该是Broker识别到一个消息的当前已消费次数 >= 最大可消费次数时,会直接把消息放入到死信队列而不是重试队列。
ProccessQueue相关方法解析
先看属性:
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>(); /** * A subset of msgTreeMap, will only be used when orderly consume */ private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();
msgTreeMap
是所有消息,consumingMsgOrderlyTreeMap
是正在被消费的消息的副本。
rollback:
this.lockTreeMap.writeLock().lockInterruptibly(); try { //回滚消费中的消息,重新把消息放回主map中,然后清理备份的map this.msgTreeMap.putAll(this.consumingMsgOrderlyTreeMap); this.consumingMsgOrderlyTreeMap.clear(); } finally { this.lockTreeMap.writeLock().unlock(); }
commit:
//返回offset,清理consumingMsgOrderlyTreeMap Long offset = this.consumingMsgOrderlyTreeMap.lastKey(); msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size()); for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) { msgSize.addAndGet(0 - msg.getBody().length); } this.consumingMsgOrderlyTreeMap.clear(); if (offset != null) { return offset + 1; }
makeMessageToCosumeAgain:
for (MessageExt msg : msgs) { //把一批消息放回到主map里,并且从备份map中移除 this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset()); this.msgTreeMap.put(msg.getQueueOffset(), msg); }