深圳幻海软件技术有限公司 欢迎您!

Java服务限流算法

2023-02-28

一、概述限流其实就是对服务的请求做一下QPS的控制,对于有些免登录的接口需要做一下访问的限制,不能无限制的去请求接口,不然的话会给服务器造成很大的压力,而且我们也希望一些接口做一下控制,控制请求量,这样我们就可以做一个plugin对服务做限流操作,超出限流就返回请求失败,保证系统的稳定运行。主要概念

一、概述

限流其实就是对服务的请求做一下QPS的控制,对于有些免登录的接口需要做一下访问的限制,不能无限制的去请求接口,不然的话会给服务器造成很大的压力,而且我们也希望一些接口做一下控制,控制请求量,这样我们就可以做一个plugin对服务做限流操作,超出限流就返回请求失败,保证系统的稳定运行。主要概念就是阈值以及拒绝策略,实际中需要用到限流的的比如,验证码,白名单,当然也有容器的限流,比如nginx就是比较常用的,可以做一下简单的处理。

二、限流算法类型

几种算法的使用,一些基础代码如下

限流代码基础类

@RequestLimiter

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Order(Ordered.HIGHEST_PRECEDENCE)
public @interface RequestLimiter {
    /**
     * 限流类型 ,具体见枚举类 RequestLimitType
     */
    RequestLimitType type() default RequestLimitType.TOKEN;

    /**
     * 限流访问数
     */
    int limitCount() default 100;

    /**
     * 限流时间段
     */
    long time() default 60;

    /**
     * 限流时间段 时间单位
     */
    TimeUnit unit() default TimeUnit.SECONDS;

    /**
     * 漏出或者生成令牌时间间隔,单位 毫秒  (当type为TOKEN、LEAKY_BUCKET时生效)
     */
    long period() default 1000;

    /**
     * 每次生成令牌数或者漏出水滴数  (当type为TOKEN、LEAKY_BUCKET时生效)
     */
    int limitPeriodCount() default 10;

}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.

LimitKeyConstant

public class LimitKeyConstant {
    /**
     * 令牌桶键名
     */
    public static final String QPS_TOKEN = "request:limit:qps:tokenBucket:";

    /**
     * 漏桶键名
     */
    public static final String QPS_LEAKY_BUCKET = "request:limit:qps:leakyBucket:";

    /**
     * 固定窗口键名
     */
    public static final String QPS_FIXED_WINDOW = "request:limit:qps:fixedWindow:";

    /**
     * 滑动窗口键名
     */
    public static final String QPS_SLIDE_WINDOW = "request:limit:qps:slideWindow:";
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.

RequestLimitType

public enum RequestLimitType {
    /**
     * 令牌算法
     */
    TOKEN(1, "令牌算法"),
    /**
     * 漏桶算法
     */
    LEAKY_BUCKET(2, "漏桶算法"),

    /**
     * 固定窗口
     */
    FIXED_WINDOW(3, "固定窗口"),
    /**
     * 滑动窗口
     */
    SLIDE_WINDOW(4, "滑动窗口");

    private Integer type;
    private String desc;

    RequestLimitType(Integer type, String desc) {
        this.type = type;
        this.desc = desc;
    }

    public Integer getType() {
        return type;
    }

    public String getDesc() {
        return desc;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.

RequestLimitAspect

@Slf4j
@Aspect
@Component
public class RequestLimitAspect {
    @Autowired
    private RequestLimitFactory factory;


    /**
     * 切入点
     */
    @Pointcut(value = "@annotation(com.common.limit.annotation.RequestLimiter)")
    public void requestLimit(){
        // 切入点方法
    }

    /**
     * 前置切点
     *
     * @param
    @Before("requestLimit()")
    public void doBefore(JoinPoint joinPoint){
        RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
        HttpServletRequest request = (HttpServletRequest) requestAttributes.resolveReference(RequestAttributes.REFERENCE_REQUEST);
        Signature signature = joinPoint.getSignature();
        MethodSignature methodSignature = (MethodSignature) signature;
        Method targetMethod = methodSignature.getMethod();
        RequestLimiter limiter = targetMethod.getAnnotation(RequestLimiter.class);
        RequestLimitService service = factory.build(limiter.type());
        if (service != null) {
            RequestLimitParam param = new RequestLimitParam();
            param.setLimiter(limiter);
            param.setKey(signature.getName());
            if (service.checkRequestLimit(param)) {
                throw new LimitException("请求过于频繁,请稍后再重试!");
            }
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.

RequestLimitFactory

@Slf4j
@Component
public class RequestLimitFactory implements ApplicationContextAware {
    private static final Map<RequestLimitType, RequestLimitService> MAP = new ConcurrentHashMap<>();

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        try {
            applicationContext.getBeansOfType(RequestLimitService.class).values().forEach(service -> MAP.put(service.getType(), service));
        } catch (Exception e) {
            log.error("初始化限流策略异常", e);
        }
    }

    /**
     * 构建service
     *
     * @param type 限流类型
     * @return
    public RequestLimitService build(RequestLimitType type){
        return MAP.get(type);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.

RequestLimitService

public interface RequestLimitService {
    /**
     * 检测是否限流
     *
     * @param param 限流参数
     * @return
    boolean checkRequestLimit(RequestLimitParam param);

    /**
     * 获取当前限流类型
     *
     * @return
    RequestLimitType getType();

    /**
     * 获取带注解方法列表
     *
     * @param resourcePatternResolver 资源查询
     * @param limitType               注解类型
     * @param scanPackage             扫描包路径
     * @return
    default List<RequestLimitParam> getTokenLimitList(ResourcePatternResolver resourcePatternResolver, RequestLimitType limitType,
                                                      String scanPackage){
        try {
            List<RequestLimitParam> list = new ArrayList<>();
            Resource[] resources = resourcePatternResolver.getResources(ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX + scanPackage +
                    "/**/*.class");
            MetadataReaderFactory metaReader = new CachingMetadataReaderFactory();
            for (Resource resource : resources) {
                MetadataReader reader = metaReader.getMetadataReader(resource);
                AnnotationMetadata annotationMetadata = reader.getAnnotationMetadata();

                Set<MethodMetadata> annotatedMethods = annotationMetadata.getAnnotatedMethods(RequestLimiter.class.getCanonicalName());
                annotatedMethods.forEach(methodMetadata -> {
                    RequestLimiter limiter = methodMetadata.getAnnotations().get(RequestLimiter.class).synthesize();
                    if (!limitType.equals(limiter.type())) {
                        return;
                    }
                    RequestLimitParam param = new RequestLimitParam();
                    param.setKey(methodMetadata.getMethodName());
                    param.setLimiter(limiter);
                    list.add(param);
                });
            }
            return list;
        } catch (IOException e) {
            return Collections.emptyList();
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.

固定时间窗口算法

图解

介绍

其实就是原子计数法,就是在固定时间内,允许请求量是多少,每次请求就在计数器上加1,设置计数器的过期时间,当计数器的阈值达到限流配置的数时候,就执行拒绝策略,超过了时间,计数器就会重新归0。

比如上图中,会限制在每秒限制请求数为2,就是在每秒的时间会限制请求为2,但是会出现极端的情况,比如在前一个时间段中的前500ms和后500ms,请求数都是2,这样就会看到在这一秒内是有4个请求的,这就是会出现请求的问题,当然这也是最简单的限流算法。

代码

@Slf4j
@Service
public class FixedWindowRateLimitServiceImpl implements RequestLimitService {

    @Autowired
    private RedisConnectionFactory factory;

    @Override
    public boolean checkRequestLimit(RequestLimitParam param){
        String key = LimitKeyConstant.QPS_FIXED_WINDOW + param.getKey();
        RequestLimiter limiter = param.getLimiter();
        RedisAtomicInteger atomicCount = new RedisAtomicInteger(key, factory);
        int count = atomicCount.getAndIncrement();
        if (count == 0) {
            atomicCount.expire(limiter.time(), limiter.unit());
        }
        log.info("FixedWindowRateLimitServiceImpl time:{} unit:{} allow visit {} ", limiter.time(), limiter.unit(), limiter.limitCount());
        // 检测是否到达限流值
        if (count >= limiter.limitCount()) {
            log.info("FixedWindowRateLimitServiceImpl limit controller key:{},time:{},name:{} to visit :{}", key, limiter.time(),
                    limiter.unit().name(), limiter.limitCount());
            return true;
        } else {
            return false;
        }
    }

    @Override
    public RequestLimitType getType(){
        return RequestLimitType.FIXED_WINDOW;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.

滑动时间窗口算法

图解

介绍

滑动时间窗口算法,其实就是对固定窗口的改进,知道了固定时间窗口会出现极端的情况,那滑动就在下一个临界的时候,进行处理时间,其实就是在某一段时间进行处理时间。

比如上图中每 500ms 滑动一次窗口,可以发现窗口滑动的间隔越短,时间窗口的临界突变问题发生的概率也就越小,不过只要有时间窗口的存在,还是有可能发生时间窗口的临界突变问题。

这个是记录下所有的请求时间点,新请求先判断最近指定时间范围内的请求数量是否超过指定阈值,来确定是否达到限流,虽然没有时间窗口突变的问题,限流比较准确,但是要记录下每次请求的时间点,所以占用的内存较多。

代码

@Slf4j
@Service
public class SlideWindowRateLimitServiceImpl implements RequestLimitService {
    @Autowired
    private RedisService redisService;

    @Override
    public boolean checkRequestLimit(RequestLimitParam param){
        String key = LimitKeyConstant.QPS_SLIDE_WINDOW + param.getKey();
        RequestLimiter limiter = param.getLimiter();
        long current = System.currentTimeMillis();
        long duringTime = limiter.unit().toMillis(limiter.time());
        Long count = redisService.setCount(key, current - duringTime, current);
        // 清除有效期外的数据
        redisService.setRemoveRangeByScore(key, 0, current - duringTime - 1f);

        log.info("SlideWindowRateLimitServiceImpl time:{} unit:{} allow visit {}", limiter.time(), limiter.unit(), limiter.limitCount());
        // 检测是否到达限流值
        if (count != null && count >= limiter.limitCount()) {
            log.info("SlideWindowRateLimitServiceImpl limit controller key:{},time:{},name:{} to visit :{}", key, limiter.time(),
                    limiter.unit().name(), limiter.limitCount());
            return true;
        } else {
            redisService.setAdd(key, UUID.randomUUID().toString(), current);
            return false;
        }
    }

    @Override
    public RequestLimitType getType(){
        return RequestLimitType.SLIDE_WINDOW;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.

漏桶算法

图解

介绍

此算法就是定义一个桶的容量,然后每次的请求过来都放在桶里面,一直等到桶满了以后就会执行拒绝策略,然后在桶不满的情况下,会按照固定的速率去执行请求,其实就是按照固定流速去执行请求,保证单位时间内的执行请求量是固定的。

漏桶就是按照某一个请求的稳定的速度处理发来的请求数量,可以很好地保证系统的稳定运行,只能平稳处理请求,这也是他的一个缺点,不能处理面对突然来的高的请求量,会导致请求一直处于哎队列等待中,不能面对高并发下的请求处理,比较保守的处理逻辑

代码

@Slf4j
@Service
public class LeakyBucketRateLimitServiceImpl implements RequestLimitService {
    @Autowired
    private ResourcePatternResolver resourcePatternResolver;

    @Autowired
    private RedisService redisService;

    @Resource(name = Constants.THREAD_POOL_TASK_BEAN_NAME)
    private ThreadPoolTaskScheduler executor;

    @Value("${limit.scan.package}")
    private String scanPackage;

    @Override
    public boolean checkRequestLimit(RequestLimitParam requestLimitParam) {
        String key = LimitKeyConstant.QPS_LEAKY_BUCKET + requestLimitParam.getKey();
        Long size = redisService.listSize(key);
        if (size != null && size >= requestLimitParam.getLimiter().limitCount()) {
            log.info("LeakyBucketRateLimitServiceImpl limit key:{}", requestLimitParam.getKey());
            return true;
        } else {
            log.info("LeakyBucketRateLimitServiceImpl not full,limit key:{} ,current size:{},total size:{}", requestLimitParam.getKey(),
                    size, requestLimitParam.getLimiter().limitCount());
            redisService.listLeftPush(key, UUID.randomUUID().toString());
            return false;
        }
    }

    /**
     * 定数流出令牌
     */
    @PostConstruct
    public void init() {
        List<RequestLimitParam> list = this.getTokenLimitList(resourcePatternResolver, RequestLimitType.LEAKY_BUCKET, scanPackage);
        if (list.isEmpty()) {
            log.info("LeakyBucketRateLimitServiceImpl annotation is empty,end current task pool");
            return;
        }
        list.forEach(requestLimitDTO -> {
            executor.scheduleAtFixedRate(() -> {
                String key = LimitKeyConstant.QPS_LEAKY_BUCKET + requestLimitDTO.getKey();
                //截取List在start和end之间的元素处key列表
                redisService.listTrim(key, requestLimitDTO.getLimiter().limitPeriodCount(), -1);
                log.info("LeakyBucketRateLimitServiceImpl limit key:{},limitPeriodCount:{}", key,
                        requestLimitDTO.getLimiter().limitPeriodCount());
            }, requestLimitDTO.getLimiter().period());
        });
    }

    @Override
    public RequestLimitType getType() {
        return RequestLimitType.LEAKY_BUCKET;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.

令牌算法

图解

介绍

此算法也是对于漏桶的算法的改进,这个逻辑是桶里面有一个阈值,按照一定的速率进行在桶里面存放令牌,直到令牌满了,就不在新增令牌,然后请求每次来就去桶中获取令牌,获取到了,就进行处理,没有令牌则执行拒绝策略

这个算法其实原理类似于生产者,消费者的模型,生产者按照一定的速度生成令牌,消费者可以消费数据,相对来说,这个是比较好用的

代码

@Slf4j
@Service
public class TokenBucketRateLimitServiceImpl implements RequestLimitService {
    @Autowired
    private ResourcePatternResolver resourcePatternResolver;

    @Autowired
    private RedisService redisService;

    @Resource(name = Constants.THREAD_POOL_TASK_BEAN_NAME)
    private ThreadPoolTaskScheduler executor;

    @Value("${limit.scan.package}")
    private String scanPackage;


    @Override
    public boolean checkRequestLimit(RequestLimitParam param){
        Object pop = redisService.listRightPop(LimitKeyConstant.QPS_TOKEN + param.getKey());
        RequestLimiter limiter = param.getLimiter();
        log.info("TokenBucketRateLimitServiceImpl limit period {} ms create {} total token,max token num is:{}", limiter.period(),
                limiter.limitPeriodCount(), limiter.limitCount());
        if (pop == null) {
            log.info("TokenBucketRateLimitServiceImpl limit is empty key:{}", param.getKey());
            return true;
        } else {
            return false;
        }
    }

    @PostConstruct
    public void init(){
        // 扫描出所有使用了自定义注解并且限流类型为令牌算法的方法信息
        List<RequestLimitParam> list = this.getTokenLimitList(resourcePatternResolver, RequestLimitType.TOKEN, scanPackage);
        if (list.isEmpty()) {
            log.info("TokenBucketRateLimitServiceImpl annotation is empty,end current task pool");
            return;
        }
        // 每个接口方法更具注解配置信息提交定时任务,生成令牌进令牌桶
        list.forEach(limit -> executor.scheduleAtFixedRate(() -> {
            String key = LimitKeyConstant.QPS_TOKEN + limit.getKey();
            Long tokenSize = redisService.listSize(key);
            int size = tokenSize == null ? 0 : tokenSize.intValue();
            if (size >= limit.getLimiter().limitCount()) {
                return;
            }
            // 判断添加令牌数量
            int addSize = Math.min(limit.getLimiter().limitPeriodCount(), limit.getLimiter().limitCount() - size);
            List<String> addList = new ArrayList<>(addSize);
            for (int index = 0; index < addSize; index++) {
                addList.add(UUID.randomUUID().toString());
            }
            redisService.listLeftPushAll(key, addList);
        }, limit.getLimiter().period()));
    }

    @Override
    public RequestLimitType getType(){
        return RequestLimitType.TOKEN;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.

三,总结

其实这几种算法,不能说哪一个是最好的,只能说是要的业务逻辑是什么样的,选择合适的限流算法来满足自己的业务实现,没有最优,只有最合适。