大家好,我是君哥。今天聊一聊 RocketMQ 的顺序消息实现机制。
在有些场景下,使用 MQ 需要保证消息的顺序性,比如在电商系统中,用户提交订单、支付订单、订单出库这 3 个消息应该保证顺序性,如下图:
对于 RocketMQ 来说,主要是通过 Producer 和 Consumer 来保证消息顺序的。
1、Producer
下面代码是 Producer 发送顺序消息的官方示例:
public static void main(String[] args) throws UnsupportedEncodingException {
try {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
Message msg =
new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
跟发送并发消息不一样的是,发送消息时传入了 MessageQueueSelector,这里可以指定消息发送到固定的 MessageQueue。
注意:上面的代码把 orderId 相同的消息都会发送到同一个 MessageQueue,这样同一个 orderId 的消息是有序的,这也叫做局部有序。对应的另一种是全局有序,这需要把所有的消息都发到同一个 MessageQueue。
下面再来看一下发送的代码:
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//省略部分逻辑
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
List<MessageQueue> messageQueueList =
mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
Message userMessage = MessageAccessor.cloneMessage(msg);
String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
userMessage.setTopic(userTopic);
mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
} catch (Throwable e) {
throw new MQClientException("select message queue threw exception.", e);
}
//省略部分逻辑
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else {
throw new MQClientException("select message queue return null.", null);
}
}
//省略部分逻辑
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
可以看到,在发送的时候,使用 MessageQueueSelector 选择一个 MessageQueue,然后发送消息到这个 MessageQueue。对于并发消息,这里不传 MessageQueueSelector,如果发送方法没有指定 MessageQueue,就会按照默认的策略选择一个。
2、Consumer
以 RocketMQ 推模式为例,消费者会注册一个监听器,进行消息的拉取和消费处理,下面的 UML 类图显示了调用关系:
上图中包含了对顺序消息和对并发消息的处理。其中 MessageListenerOrderly 和 ConsumeMessageOrderlyService 对顺序消息进行处理。跟并发消息不一样的是,顺序消息定义了一个 MessageQueueLock 类,这个类保存了每个 MessageQueue 对应的锁,代码如下:
private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();
- 1.
下面代码是顺序消费的官方示例:
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
下面看一下顺序消息的消费端处理逻辑。
(1)注册监听
上面的代码定义了顺序消息监听器 MessageListenerOrderly,并且注册到 DefaultMQPushConsumer,这个注册同时也注册到了 DefaultMQPushConsumerImpl。
(2)PushConsumer 初始化
在 DefaultMQPushConsumerImpl 类初始化的时候,会判断注册的 MessageListener 是不是 MessageListenerOrderly,如果是,就把 consumeOrderly 变量设置为 true,以此来标记是顺序消息拉取还是并发消息拉取。然后把 ConsumeMessageService 初始化为 ConsumeMessageOrderlyService。
(3)锁定 mq
要保证消息的顺序性,就需要保证同一个 MessageQueue 只能被同一个 Consumer 消费。
ConsumeMessageOrderlyService 初始化的时候,会启动一个定时任务,周期性(默认 20s)地向 Broker 发送锁定消息(请求类型 LOCK_BATCH_MQ),Broker 收到后,就会把 MessageQueue、group 和 clientId 进行绑定,这样其他客户端就不能从这个 MessageQueue 拉取消息。
注意:Broker 的锁定是有过期时间的,默认 60s,可以配置,锁定过期后,有可能被其他 Consumer 进行消费。
Broker 端锁结构如下图:
(4)拉取消息
消费者启动时,启动消费拉取线程 PullMessageService,里面死循环不停地从 Broker 拉取消息。这里调用了 DefaultMQPushConsumerImpl 类的 pullMessage 方法。这里拉取消息的逻辑跟并发消息逻辑是一样的。
拉取到消息后,调用 PullCallback 的 onSuccess 方法处理结果,这里调用了 ConsumeMessageOrderlyService 的 submitConsumeRequest 方法,里面用线程池提交了 ConsumeRequest 线程。
PullCallback pullCallback = new PullCallback() {
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
//省略
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
//省略
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
//省略
}
//省略
break;
//省略
}
}
}
//省略
};
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
上面拉取到消息后,先把消息放到了 ProcessQueue,然后调用了 submitConsumeRequest 方法。跟并发消息处理方式不同的是,submitConsumeRequest 方法并没有处理拉取到的消息,而真正处理的时候是从 ProcessQueue 获取。
(5)处理消息
处理消息的逻辑在 ConsumeMessageOrderlyService 的内部类 ConsumeRequest,这是一个线程类,run 方法如下:
public void run() {
//省略部分逻辑
//1.获取到 MessageQueueLock 对应的锁
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
//省略延后执行的逻辑
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
//2.从 processQueue 拉取消息
List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
//省略部分逻辑
boolean hasException = false;
try {
//3.获取处理锁
this.processQueue.getConsumeLock().lock();
//4.执行消费处理逻辑
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue), e);
hasException = true;
} finally {
//5.释放处理锁
this.processQueue.getConsumeLock().unlock();
}
//省略部分逻辑
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
//省略部分逻辑
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
上面的代码总结一下,Consumer 消费消息的逻辑如下:
- 对 MessageQueueLock 进行加锁,这样就保证只有一个线程在处理当前 MessageQueue。
- 从 ProcessQueue 拉取一批消息。
- 获取 ProcessQueue 锁,这样保证了只有当前线程可以进行消息处理,同时也可以防止 Rebalance 线程把当前处理的 MessageQueue 移除掉。
- 执行消费处理逻辑。
- 释放 ProcessQueue 处理锁;6.processConsumeResult 方法更新消息偏移量。
注意:ProcessQueue 中的锁是 ReentrantLock。
3、重试
跟并发消息不一样的是,顺序消息消费失败后并不会把消息发送到 Broker,而是直接在 Consumer 端进行重试,如果重试次数超过了最大重试次数(16 次),则发送到 Broker,Broker 则将消息推入死信队列。如下图:
4、总结
RocketMQ 顺序消息的原理是在 Producer 端把一批需要保证顺序的消息发送到同一个 MessageQueue,Consumer 端则通过加锁的机制来保证消息消费的顺序性,Broker 端通过对 MessageQueue 进行加锁,保证同一个 MessageQueue 只能被同一个 Consumer 进行消费。
根据实现原理可以看到,RocketMQ 的顺序消息可能存在两个坑:
- 有顺序性的消息需要发送到同一个 MessageQueue,可能导致单个 MessageQueue 消息量很大,而 Consumer 端消费的时候只能单线程消费,很可能导致当前 MessageQueue 消息积压。
- 如果顺序消息 MessageQueue 所在的 broker 挂了,这时 Producer 只能把消息发送到其他 Broker 的 MessageQueue 上,而如果新的 MessageQueue 被其他 Consumer 消费,这样两个 Consumer 消费的消息就不能保证顺序性了。如下图:
Broker1 发生故障,把订单出库的消息发送到了 Broker2,由 Consumer2 来进行消费,消息顺序很可能会错乱。