深圳幻海软件技术有限公司 欢迎您!

性能优化之Hystrix请求合并&自实现简化版本

2023-02-28

背景介绍在业务开发过程中,存在这样的场景:程序接收到数据后,调用其他接口再将数据转发出去;如果接收一条转发一条,效率是比较低的,所以一个思路是先将数据缓存起来,缓存到一定数量后一次性转发出去。有优点就有缺点,需要根据业务场景进行考量:在QPS较小的情况下,达到阈值的等待时间较长,造成数据延迟较大在应

背景介绍

在业务开发过程中,存在这样的场景:程序接收到数据后,调用其他接口再将数据转发出去;如果接收一条转发一条,效率是比较低的,所以一个思路是先将数据缓存起来,缓存到一定数量后一次性转发出去。

有优点就有缺点,需要根据业务场景进行考量:

  • 在QPS较小的情况下,达到阈值的等待时间较长,造成数据延迟较大
  • 在应用发布的时候,缓存的数据存在丢失的可能性
  • 在应用非正常down掉的情况下,缓存的数据存在丢失的可能性

下面内容是对Hystrix请求合并及根据Hystrix请求合并原理自定义实现的简化版本。

Hystrix请求合并

什么是请求合并

Without Collapsing

QQ">without collapsing

With Collapsing

with collapsing

请求合并设计思路

design

Hystrix使用示例

示例采用Spring-Boot编写,下面代码拷贝到工程中可以直接运行。

添加依赖

下面是spring与hystrix集成的依赖pom。

pom

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
    <version>2.1.6.RELEASE</version>
</dependency>
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.hystrix.EnableHystrix;

@EnableHystrix
@SpringBootApplication
public class Application {
    public static void main(String[] args){
        SpringApplication.run(Application.class, args);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.

使用示例

HystrixController

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

@RestController
public class HystrixController {
    @Resource
    private HystrixService hystrixService;

    @RequestMapping("/byid")
    public Long byId(Long id) throws InterruptedException, ExecutionException {
        Future<Long> future = hystrixService.byId(id);
        return future.get();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.

HystrixService

import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.Future;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixProperty;

@Service
public class HystrixService {
    @HystrixCollapser(batchMethod="byIds",scope= com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
            collapserProperties={
                @HystrixProperty(name="maxRequestsInBatch",value="10"),
                @HystrixProperty(name="timerDelayInMilliseconds",value="1000")
    })
    public Future<Long> byId(Long id){
        return null;
    }

    @HystrixCommand
    public List<Long> byIds(List<Long> ids){
        System.out.println(ids);
        return ids;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.

测试类

发送请求进行验证。

import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class HystrixTest {

    public static void main(String[] args) throws Exception{
        CloseableHttpClient httpClient = HttpClients.custom().setMaxConnPerRoute(100).build();
        String url = "http://localhost:8086/byid?id=";
        ExecutorService executorService = Executors.newFixedThreadPool(20);

        int requestCount = 20;
        for(int i = 0;i < requestCount;i++){
            final int id = i;
            executorService.execute(new Runnable() {
                @Override
                public void run(){
                    try{
                        HttpGet httpGet = new HttpGet(url+ id);
                        HttpResponse response = httpClient.execute(httpGet);
                        System.out.println(response);
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            });
        }

        Thread.sleep(1000*10);
        executorService.shutdown();
        httpClient.close();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.

简化版本实现

由于Hystrix已不再维护,同时考虑到Hystrix使用RxJava的学习门槛,根据HystrixCollapser设计思路及常见业务功能需求实现了一个简化版本。

实现

RequestCollapserFactory

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class RequestCollapserFactory {
    private static RequestCollapserFactory factory;
    private static final int MAXBATCHSIZE = 20;
    private static final long DELAY = 1000L;
    private static final long PERIOD = 500L;
    private ConcurrentHashMap<String,RequestCollapser> collapsers;
    private ScheduledExecutorService executor;

    private RequestCollapserFactory(){
        collapsers = new ConcurrentHashMap<>();
        ThreadFactory threadFactory = new ThreadFactory() {
            final AtomicInteger counter = new AtomicInteger();
            @Override
            public Thread newThread(Runnable r){
                Thread thread = new Thread(r, "RequestCollapserTimer-" + counter.incrementAndGet());
                thread.setDaemon(true);
                return thread;
            }
        };
        executor = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), threadFactory);
    }

    public static RequestCollapserFactory getInstance(){
        if(factory != null){
            return factory;
        }
        synchronized (RequestCollapserFactory.class){
            if(factory != null){
                return factory;
            }
            factory = new RequestCollapserFactory();
        }
        return factory;
    }

    public RequestCollapser getRequestCollapser(String key,RequestBatch requestBatch){
        return getRequestCollapser(key,requestBatch,MAXBATCHSIZE,DELAY,PERIOD);
    }
    public RequestCollapser getRequestCollapser(String key,RequestBatch requestBatch,int maxBatchSize){
        return getRequestCollapser(key,requestBatch,maxBatchSize,DELAY,PERIOD);
    }
    public RequestCollapser getRequestCollapser(String key,RequestBatch requestBatch,int maxBatchSize,long delay, long period){
        RequestCollapser collapser = collapsers.get(key);
        if(collapser != null){
            return collapser;
        }

        synchronized (collapsers){
            collapser = collapsers.get(key);
            if(collapser != null){
                return collapser;
            }
            collapser = new RequestCollapser(requestBatch,maxBatchSize,delay,period,executor);
            collapsers.put(key,collapser);
            return collapser;
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.

RequestCollapser

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class RequestCollapser {
    private int maxBatchSize;
    private long delay;
    private long period;
    private LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue();
    private ScheduledExecutorService executor;
    private RequestBatch requestBatch;
    private AtomicBoolean timerscheduled = new AtomicBoolean();
    protected RequestCollapser(RequestBatch requestBatch,int maxBatchSize,long delay,long period,ScheduledExecutorService executor){
        if(requestBatch == null){
            throw new IllegalArgumentException("requestBatch can not be null");
        }
        this.maxBatchSize = maxBatchSize;
        this.delay = delay;
        this.period = period;
        this.executor = executor;
        this.requestBatch = requestBatch;
    }

    public List<Object> submitRequest(Object obj,boolean ifFullThenBatchExecute){
        if(timerscheduled.compareAndSet(false,true)){
            this.startSchedule();
        }
        List<Object> objectList = null;
        synchronized (queue){
            if(obj instanceof Collection){
                queue.addAll((Collection)obj);
            }else {
                queue.offer(obj);
            }

            if(queue.size() >= this.maxBatchSize){
                objectList = new LinkedList<>();
                queue.drainTo(objectList);
            }
        }

        if(!ifFullThenBatchExecute){
            return objectList;
        }

        this.doBatch(objectList);
        return null;
    }

    private boolean doBatch(List<Object> objectList){
        if(objectList == null){
            return true;
        }
        try{
            return requestBatch.batch(objectList);
        }catch (Throwable t){
            t.printStackTrace();
        }
        return false;
    }

    private void startSchedule(){
        Runnable r = new Runnable() {
            @Override
            public void run(){
                List<Object> objectList = null;
                synchronized (queue){
                    if(queue.size() > 0) {
                        objectList = new LinkedList<>();
                        queue.drainTo(objectList);
                    }
                }
                doBatch(objectList);
            }
        };

        this.executor.scheduleAtFixedRate(r,this.delay,this.period, TimeUnit.MILLISECONDS);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.

RequestBatch

import java.util.List;

public interface RequestBatch {
    boolean batch(List<Object> objectList);
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

验证测试

RequestCollapserTest

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class RequestCollapserTest {
    private static AtomicInteger counter = new AtomicInteger();
    private static long delay = 1;
    private static long period = 1;
    private static int maxBatchSize = 20;
    private static int requestCount = 50000;
    private static RequestCollapserFactory factory = RequestCollapserFactory.getInstance();
    private static RequestBatch requestBatch = new RequestBatch() {
        @Override
        public boolean batch(List<Object> objectList){
            int size = objectList.size();
            counter.addAndGet(size);
            System.out.println(counter + ":::::" + size + ":::::" + objectList);
            return true;
        }
    };

    public static void main(String[] args) throws Exception{
        //sync();
        async();
    }

    public static void async() throws Exception{
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        CountDownLatch countDownLatch = new CountDownLatch(requestCount);
        for(int i = 0;i < requestCount;i++){
            final int id = i;
            executorService.execute(new Runnable() {
                @Override
                public void run(){
                    try{
                        RequestCollapser requestCollapser =
                                factory.getRequestCollapser("1",requestBatch,maxBatchSize,delay,period);
                        requestCollapser.submitRequest(id,true);
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                    countDownLatch.countDown();
                }
            });
        }

        executorService.shutdown();
        countDownLatch.await();
        Thread.sleep(1000);
        System.out.println(counter.get());
    }

    public static void sync() throws Exception{
        for(int i = 0;i < requestCount;i++){
            final int id = i;
            RequestCollapser requestCollapser =
                    factory.getRequestCollapser("1",requestBatch,maxBatchSize,delay,period);
            requestCollapser.submitRequest(id,true);
        }
        Thread.sleep(1000);
        System.out.println(counter.get());
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.