深圳幻海软件技术有限公司 欢迎您!

消息队列堆积太多,下游处理不过来怎么办呢?

2023-02-28

作为后端程序员日常工作中难免会遇到要跟消息队列打交道的时候,而且在当下微服务的场景下,很多服务的性能不是我们自己能控制的。这不阿粉最近就遇到了一个场景,由于上游服务流量增加,发送到消息队列的消息增多,阿粉在处理消息的时候需要依赖下游的一个服务,可是谁想到下游的服务效率太差,消息太多处理不过来,CPU

作为后端程序员日常工作中难免会遇到要跟消息队列打交道的时候,而且在当下微服务的场景下,很多服务的性能不是我们自己能控制的。

这不阿粉最近就遇到了一个场景,由于上游服务流量增加,发送到消息队列的消息增多,阿粉在处理消息的时候需要依赖下游的一个服务,可是谁想到下游的服务效率太差,消息太多处理不过来,CPU 居高不下。

看过我们昨天文章的小伙伴应该都知道,这个时候我们就需要进行限流了,为了避免将下游的服务打垮,我们来进行单机限流操作。这里我们来模拟一下操作过程,首先我们通过一段伪代码来模拟大流量,然后通过配置 sentinel 的控制台来配置规则从而实现单机 QPS 20 的限制。

创建 SpringBoot 服务

首先我们创建一个 SpringBoot 服务,在 pom.xml 文件中增加下面的配置。

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
  </dependency> 
<dependency>
      <groupId>com.alibaba.csp</groupId>
      <artifactId>sentinel-core</artifactId>
      <version>1.8.4</version>
</dependency>
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.

然后我们提供一个对外的 http 接口,通过访问接口来触发我们的限流代码,接口代码如下:

代码如下:

package com.example.demo.controller;

import com.alibaba.csp.sentinel.SphO;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * <br>
 * <b>Function:</b><br>
 * <b>Author:</b>@author ziyou<br>
 * <b>Date:</b>2022-05-08 12:56<br>
 * <b>Desc:</b>无<br>
 */
@RestController
public class LoginController {

  @GetMapping(value = "/login")
  public void login(String username, String password) {
    System.out.println("login");
    //模拟一百万条消息
    for (int i = 0; i < 1000000; i++) {
      boolean entry = false;
      try {
        entry = SphO.entry("HelloWorld");
        while (!entry) {
          try {
            Thread.sleep(50);
            System.out.println("entry false");
            entry = SphO.entry("HelloWorld");
          } catch (InterruptedException e) {

          }
        }
        System.out.println("entry true");
      } catch (Exception e) {

      } finally {
        if (entry) {
          SphO.exit();
        }
      }
    }
  }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.

调用接口过后,通过循环一百万次来模拟大流量,这里我们要解释以下几个内容

  • SphO.entry("HelloWorld"):是 Sentinel 的资源控制器,"HelloWord" 是资源的名称,资源 是 sentinel 的一个很重要的概念,所有的限流都是针对资源的操作;SphO.entry() 返回值是布尔值,为 true 表示资源可用,没有被限流,为 false 表示资源被限流;
  • 这里模拟在被限流了过后,程序等待一段时间,再去判断是否限流,只有在资源未被限流的时候,才能继续处理;
  • 在 finally 里面需要进行 SphO.exit(); 操作,当被限流了以后,也就是SphO.entry() == true 后一定要执行 SphO.exit(); 否则代码会创建多个Entry 对象,程序运行时间长了过后会导致内存泄露,引发 FullGC。

这个时候我们启动一个服务,调用一下接口,可以看到效果如下,很快就会运行完,并没有达到限流的效果,那是因为我们此刻还没有配置限流规则,所以没有触发到限流的逻辑。

配置 sentinel 控制台

接下来我们安装一下 sentinel 的控制台,通过控制台来配置限流规则,从而达到限流的目的,控制台的搭建很简单,我们通过官方地址下载指定版本的 jar 然后本地运行即可。通过地址 https://github.com/alibaba/Sentinel/releases/download/1.8.4/sentinel-dashboard-1.8.4.jar 进行下载。

然后通过命令java -Dserver.port=8081 -Dcsp.sentinel.dashboard.server=localhost:8081 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard-1.8.4.jar 运行即可,如下所示:

这里我们通过指定 8081 端口,用于访问 sentinel,启动成功过后,通过浏览器我们可以进行登录,默认的初始账号和密码都是 sentinel。

因为上面的命令我们指定了 sentinel-dashboard 项目,所以默认只会看到 sentinel-dashboard 这个项目,这个时候我们需要,修改代码,在 pom.xml 中增加下面的配置。

<dependency>
      <groupId>com.alibaba.csp</groupId>
      <artifactId>sentinel-transport-simple-http</artifactId>
      <version>1.8.4</version>
</dependency>
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

然后在 JVM 的启动参数中增加-Dcsp.sentinel.dashboard.server=localhost:8081 指明 sentinel 的地址和端口号,再启动我们的应用。启动完过后,我们要手动调用一下接口,然后就可以看到我们的程序项目连接到 sentinel 了。不过此时只是我们程序和 sentinel 连接成功,还没有限流规则,接下来我们要配置一下限流规则。

按照上图配置好了过后,我们再调用一次接口,可以看到此时我们的处理速度明显慢了下来,每秒只有 20 个 QPS 能获取资源了,至此我们基于 sentinel 的单机限流QPS 20 的目标完成。