深圳幻海软件技术有限公司 欢迎您!

如何在 SpringBoot 项目中控制 RocketMQ消费线程数量

2023-02-28

1背景最近在新项目开发中遇到一个有趣的问题,如何在SpringBoot项目中控制RocketMQ消费线程数量。如何设置单个topic消费线程的最小数量和最大数量,用来区分不同topic吞吐量不同。我们先介绍一下RocketMQ消息监听再来说明RocketMQ消费线程。2RocketMQ消息监听设置消

1 背景

最近在新项目开发中遇到一个有趣的问题,如何在 SpringBoot 项目中控制 RocketMQ 消费线程数量。如何设置单个 topic 消费线程的最小数量和最大数量,用来区分不同 topic 吞吐量不同。

我们先介绍一下 RocketMQ 消息监听再来说明 RocketMQ 消费线程。

2 RocketMQ 消息监听

设置消费者组为 my_consumer_group,监听 TopicTest 队列,并使用并发消息监听器MessageListenerConcurrently

1public class Consumer {
 2
 3    public static void main(String[] args) throws InterruptedException, MQClientException {
 4        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");
 5        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 6        consumer.subscribe("TopicTest", "*");
 7        consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
 8        consumer.registerMessageListener(new MessageListenerConcurrently() {
 9            @Override
10            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
11                ConsumeConcurrentlyContext context) {
12                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
13                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
14            }
15        });
16        consumer.start();
17        System.out.printf("Consumer Started.%n");
18    }
19}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.

3 RocketMQ 中连接结构图

QQB3Sb14" style="text-align: center;">

4 消费监听器

接口:org.apache.rocketmq.client.consumer.listener.MessageListener

有两个子接口:

- 顺序消费:MessageListenerOrderly
- 并发消费: MessageListenerConcurrently
  • 1.
  • 2.

4.1 MessageListenerConcurrently

作用:consumer并发消费消息的监听器

比如,在 quick start 中,就是使用的并发消费消息监听器:​

1 consumer.registerMessageListener(new MessageListenerConcurrently() {
2        @Override
3        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
4            ConsumeConcurrentlyContext context) {
5            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
6            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
7        }
8    });
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.

方法返回值,是个枚举:

1 package org.apache.rocketmq.client.consumer.listener;
 2
 3/**
 4 * 并发消费mq消息结果
 5 */
 6public enum ConsumeConcurrentlyStatus {
 7
 8    /**
 9     * Success consumption
10     * 成功消费
11     */
12    CONSUME_SUCCESS,
13
14    /**
15     * Failure consumption,later try to consume
16     * 失败消费,稍后尝试消费
17     *
18     *
19     * 如果 {@link MessageListener}返回的消费结果为 RECONSUME_LATER,则需要将这些消息发送给Broker延迟消息。
20     * 如果给broker发送消息失败,将延迟5s后提交线程池进行消费。
21     *
22     * RECONSUME_LATER的消息发送入口: MQClientAPIImpl#consumerSendMessageBack,
23     * 命令编码: {@link org.apache.rocketmq.common.protocol.RequestCode#CONSUMER_SEND_MSG_BACK}
24     */
25    RECONSUME_LATER;
26}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.

画外音:

当前,我们在具体开发中,肯定不会直接使用这种方式来写consumer。

常用的Consumer实现是:基于 推 的consumer:DefaultMQPushConsumer

4.2 MessageListenerOrderly

作用:consumer顺序消费消息的监听器

5 消费线程池

5.1 DefaultMQPushConsumer

作用:基于 推 的consumer消费者

5.2 注册并发消息监听器

org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#registerMessageListener

当使用这个方法注册消息监听器时,实际上会把这个病发消息监听器设置到 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#messageListenerInner属性中。

5.3 设置 consumer 消费 service

可选有两种:​

并发消费的service

顺序消费的service

当consumer在启动的时,会使用MessageListener具体实现类型进行判断:

MessageListener 就有并发和顺序两种,所以service也有两种。

1public synchronized void start() throws MQClientException {
 2        switch (this.serviceState) {
 3            case CREATE_JUST:
 4
 5                // 省略一部分代码...........
 6
 7                // 根据注册的监听器类型[并发消息监听器/顺序执行消息监听器],来确定使用哪种消费服务.   
 8                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
 9                    this.consumeOrderly = true;
10                    this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
11                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
12                    this.consumeOrderly = false;
13                    this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
14                }
15                this.consumeMessageService.start();
16
17                // 省略一部分代码..........
18                this.serviceState = ServiceState.RUNNING;
19                break;
20            case RUNNING:
21            case START_FAILED:
22            case SHUTDOWN_ALREADY:
23                throw new MQClientException("The PushConsumer service state not OK, maybe started once");
24            default:
25                break;
26        }
27
28        // 省略一部分代码..........
29    }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.

如果使用的是并发消费的话,使用 ConsumeMessageConcurrentlyService :

在实例化的时候,会创建一个线程池:

1// 无界队列,并且不可配置容量.那 DefaultMQPushConsumer#consumeThreadMax 配置就毫无意义了.
2this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
3this.consumeExecutor = new ThreadPoolExecutor(
4    this.defaultMQPushConsumer.getConsumeThreadMin(), // 默认20
5    this.defaultMQPushConsumer.getConsumeThreadMax(), // 默认64
6    1000 * 60,
7    TimeUnit.MILLISECONDS,
8    this.consumeRequestQueue,
9    new ThreadFactoryImpl("ConsumeMessageThread_"));
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.

consumer消费线程池参数:

  • 默认最小消费线程数 20
  • 默认最大消费线程数 64
  • keepAliveTime = 60*1000      单位:秒
  • 队列:new LinkedBlockingQueue<>()​ 无界队列
  • 线程名称:前缀是:ConsumeMessageThread_

注意:因为线程池使用的是无界队列,那么设置的最大线程数,其实没有什么意义。

5.4 修改线程池线程数

上面我们已经知道了,设置线程池的最大线程数是没什么用的。

那我们其实可以设置线程池的最小线程数,来修改consumer消费消息时的线程池大小。

1public static void main(String[] args) throws InterruptedException, MQClientException {
 2        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
 3
 4        consumer.setConsumeThreadMin(30);
 5        consumer.setConsumeThreadMax(64);
 6
 7        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 8        consumer.subscribe("TopicTest", "*");
 9        consumer.registerMessageListener(new MessageListenerConcurrently() {
10
11            @Override
12            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
13                ConsumeConcurrentlyContext context) {
14                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
15                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
16            }
17        });
18        consumer.start();
19        System.out.printf("Consumer Started.%n");
20    }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.

注意:consumeThreadMin​ 如果大于64,则也需要设置 consumeThreadMax 参数,因为有个校验:

-修改线程池线程数-SpringBoot版

如果consumer是使用spring boot进行集成的,则可以这样设置消费者线程数: