深圳幻海软件技术有限公司 欢迎您!

你管这个叫Dubbo?

2023-02-28

RPC框架的实现又到年初了,大家又要开始准备面试了。为了方便大家,我就写几篇面试相关的文章吧,这次是Dubbo相信很多小伙伴已经看了很多Dubbo的八股文了。比如,Dubbo支持哪些序列化框架,支持哪些注册中心,支持哪些集群容错策略,支持服务降级吗?但是你知道Dubbo服务导出和服务引入的过程吗?服

QQ3pj" id="hc9b0489-m3jQQ3pj">RPC框架的实现

又到年初了,大家又要开始准备面试了。为了方便大家,我就写几篇面试相关的文章吧,这次是Dubbo

相信很多小伙伴已经看了很多Dubbo的八股文了。比如,Dubbo支持哪些序列化框架,支持哪些注册中心,支持哪些集群容错策略,支持服务降级吗?但是你知道Dubbo服务导出和服务引入的过程吗?服务降级是如何实现的?等等

本文就从源码的角度来分享一下Dubbo的整个调用过程(放心,图示为主,辅助一少部分源码)

「RPC框架的实现基本上都是如下架构」

一个RPC调用的过程如下

  • 调用方发送请求后由代理类将调用的方法,参数组装成能进行网络传输的消息体
  • 调用方代理类将消息体发送到提供方
  • 提供方代理类将消息进行解码,得到调用的方法和参数
  • 提供方代理类执行相应的方法,并将结果返回

「协议,编解码,序列化的部分不是本文的重点,我就不分析了,有兴趣的可以看我之前的文章。」

首先来手写一个极简版的RPC框架,以便你对上面的流程有一个更深的认识

手写一个简单的PRC框架

封装网络请求对象

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RpcRequest implements Serializable {

    private String interfaceName;
    private String methodName;
    private Class<?>[] paramTypes;
    private Object[] parameters;
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.

根据interfaceName可以确定需要调用的接口,methodName和paramTypes则可以确定要调用接口的方法名,定位到具体的方法,传入参数即可调用方法

封装调用接口

封装接口到api模块,producer端写实现逻辑,consumer端写调用逻辑

public interface HelloService {

    String sayHello(String content);
}
public interface UpperCaseService {

    String toUpperCase(String content);
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.

开发producer端

public class HelloServiceImpl implements HelloService {

    @Override
    public String sayHello(String content) {
        return "hello " + content;
    }
}
public class UpperCaseServiceImpl implements UpperCaseService {

    @Override
    public String toUpperCase(String content) {
        return content.toUpperCase();
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.

ServiceMap保存了producer端接口名和接口实现类的映射关系,这样可以根据请求对象的接口名,找到对应的实现类

public class ServiceMap {

    // 接口名 -> 接口实现类
    private static Map<String, Object> serviceMap = new HashMap<>();

    public static void registerService(String serviceKey, Object service) {
        serviceMap.put(serviceKey, service);
    }

    public static Object lookupService(String serviceKey) {
        return serviceMap.get(serviceKey);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.

为了提高服务端的并发度,我们将每一个请求的处理过程放到线程池中

@Slf4j
public class RequestHandler implements Runnable {

    private Socket socket;

    public RequestHandler(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try (ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
             ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream())) {
            RpcRequest rpcRequest = (RpcRequest) inputStream.readObject();
            Object service = ServiceMap.lookupService(rpcRequest.getInterfaceName());
            Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
            Object result = method.invoke(service, rpcRequest.getParameters());
            outputStream.writeObject(result);
        } catch (Exception e) {
            log.error("invoke method error", e);
            throw new RuntimeException("invoke method error");
        }
    }

}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.

启动服务端

public class RpcProviderMain {

    private static final ExecutorService executorService = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {

        HelloService helloService = new HelloServiceImpl();
        UpperCaseService upperCaseService = new UpperCaseServiceImpl();
        // 将需要暴露的接口注册到serviceMap中
        ServiceMap.registerService(HelloService.class.getName(), helloService);
        ServiceMap.registerService(UpperCaseService.class.getName(), upperCaseService);

        ServerSocket serverSocket = new ServerSocket(8080);

        while (true) {
            // 获取一个套接字(阻塞)。所以为了并行,来一个请求,开一个线程处理
            // 为了复用线程,用了threadPool
            final Socket socket = serverSocket.accept();
            executorService.execute(new RequestHandler(socket));
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.

开发consumer端

前面说过,我们要通过动态代理对象解耦方法调用和网络调用,所以接下来我们就写一下动态代理对象的实现逻辑

生成一个代理对象的过程很简单

实现InvocationHandler接口,在invoke方法中增加代理逻辑

调用Proxy.newProxyInstance方法生成代理对象,3个参数分别是ClassLoader,代理对象需要实现的接口数组,InvocationHandler接口实现类

当执行代理执行实现的接口方法时,会调用到InvocationHandler#invoke,这个方法中增加了代理逻辑哈。

public class ConsumerProxy {

    public static <T> T getProxy(final Class<T> interfaceClass, final String host, final int port) {

        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass}, new ConsumerInvocationHandler(host, port));
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.

可以看到代理对象的主要功能就是组装请求参数,然后发起网络调用

@Slf4j
public class ConsumerInvocationHandler implements InvocationHandler {

    private String host;
    private Integer port;

    public ConsumerInvocationHandler(String host, Integer port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        try (Socket socket = new Socket(host, port);
             ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
             ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream())) {
            RpcRequest rpcRequest = RpcRequest.builder()
                    .interfaceName(method.getDeclaringClass().getName())
                    .methodName(method.getName())
                    .paramTypes(method.getParameterTypes())
                    .parameters(args).build();
            outputStream.writeObject(rpcRequest);
            Object result = inputStream.readObject();
            return result;
        } catch (Exception e) {
            log.error("consumer invoke error", e);
            throw new RuntimeException("consumer invoke error");
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.

此时我们只需要通过ConsumerProxy#getProxy方法,就能很方便的获取到代理对象。通过代理对象调用远程方法和调用本地方法一样方便

public class RpcConsumerMain {

    public static void main(String[] args)  {

        // 因为这是一个小demo,就不拆分多模块了
        // 这个HelloService是通过网络调用的HelloServiceImpl,而不是本地调用
        HelloService helloService = ConsumerProxy.getProxy(HelloService.class, "127.0.0.1", 8080);
        // hello world
        System.out.println(helloService.sayHello("world"));
        UpperCaseService upperCaseService = ConsumerProxy.getProxy(UpperCaseService.class, "127.0.0.1", 8080);
        // THIS IS CONTENT
        System.out.println(upperCaseService.toUpperCase("this is content"));
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.

至此我们已经把一个RPC框架最核心的功能就实现了,是不是很简单。「其实Dubbo的源码也很简单,只不过增加了很多扩展功能,所以大家有时候会认为比较难。」

所以我们就来分析一下核心的扩展功能。比如Filter,服务降级,集群容错等是如何实现的?其他的扩展功能,比如支持多种注册中心,支持多种序列化框架,支持多种协议,基本不会打交道,所以就不浪费时间了

从前面的图示我们知道,代理类在服务调用和响应过程中扮演着重要的角色。「在Dubbo中,代理类有个专有名词叫做Invoker,而Dubbo中就是通过对这个Invoker不断进行代理增加各种新功能的」

Dubbo服务导出

「当第三方框架想和Spring整合时,有哪些方式?」

实现BeanFactoryPostProcessor接口(对BeanFactory进行扩展)

实现BeanPostProcessor接口(对Bean的生成过程进行扩展)

Dubbo也不例外,当Dubbo和Spring整合时,会往容器中注入2个BeanPostProcessor,作用如下

ServiceAnnotationBeanPostProcessor,将@Service注解的类封装成ServiceBean注入容器 ReferenceAnnotationBeanPostProcessor,将@Reference注解的接口封装成ReferenceBean注入容器

所以服务导出和服务引入肯定和ServiceBean和ReferenceBean的生命周期有关。

「ServiceBean实现了ApplicationListener接口,当收到ContextRefreshedEvent事件时(即Spring容器启动完成)开始服务导出。」

服务导出比较重要的2个步骤就是

将服务注册到zk(我们后面的分析,注册中心都基于zk哈)

将服务对象包装成Invoker,并保存在一个map中,key为服务名,value为Invoker对象

「当收到请求时,根据服务名找到Invoker对象,Invoker对象根据方法名和参数反射执行方法,然后将结果返回。」

这里留个小问题,反射执行方式效率会很低,那么在Dubbo中还有哪些解决方案呢?

从图中可以看到AbstractProxyInvoker被其他Invoker进行代理了,而这些Invoker是用来执行Filter的,一个Invoker代理类执行一个Filter,层层进行代理

「如下图为Dubbo收到请求层层调用的过程」

Dubbo服务引入

前面我们已经推断出来服务导出和ReferenceBean有关。我们来看看具体在哪个阶段?ReferenceBean实现了FactoryBean接口,并重写了getObject方法,在这个方法中进行服务导出。因此我们推断服务导出的时机是ReferenceBean被其他对象注入时

public Object getObject() {
    return get();
}
  • 1.
  • 2.
  • 3.

接下来就是从注册中心获取服务地址,构建Invoker对象,并基于Invoker对象构建动态代理类,赋值给接口。

最终能发起网络调用的是DubboInvoker,而这个Invoker被代理了很多层,用来实现各种扩展功能。

服务降级

第一个就是服务降级,什么是服务降级呢?

「当服务可不用时,我们不希望抛出异常,而是返回特定的值(友好的提示等),这时候我们就可以用到服务降级。」

dubbo中有很多服务降级策略,简单举几个例子

force: 代表强制使用 Mock 行为,在这种情况下不会走远程调用 fail: 只有当远程调用发生错误时才使用 Mock 行为

假如有如下一个controller,调用DemoService获取值,但是DemoService并没有启动

@RestController
public class DemoController {

    @Reference(check = false, mock = "force:return mock")
    private DemoService demoService;

    @RequestMapping("hello")
    public String hello(@RequestParam("msg") String msg) {
        return demoService.hello(msg);
    }

}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

可以看到直接返回mock字符串(也并不会发生网络调用)

将@Reference的mock属性改为如下,再次调用

@RestController
public class DemoController {

    @Reference(check = false, mock = "fail:return fail")
    private DemoService demoService;

    @RequestMapping("hello")
    public String hello(@RequestParam("msg") String msg) {
        return demoService.hello(msg);
    }

}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

会发起网络调用,调用失败,然后返回fail。

「dubbo中的服务降级只用了MockClusterInvoker这一个类来实现,因此相对于Hystrix等功能很简单,实现也很简单,如下图。」

当Reference不配置mock属性或者属性为false时,表示不进行降级,直接调用代理对象即可

以属性以force开头时,表示直接进行降级,都不会发生网络调用

其他请求就是在进行网络失败后才进行降级

集群容错

过了服务降级这一层,接下来就到了集群容错了。

dubbo中有很多集群容错策略

容错策略

解释

代理类

AvailableCluster

找到一个可用的节点,直接发起调用

AbstractClusterInvoker匿名内部类

FailoverCluster

失败重试(默认)

FailoverClusterInvoker

FailfastCluster

快速失败

FailfastClusterInvoker

FailsafeCluster

安全失败

FailsafeClusterInvoker

FailbackCluster

失败自动恢复

FailbackClusterInvoker

ForkingCluster

并行调用

ForkingClusterInvoker

BroadcastCluster

广播调用

BroadcastClusterInvoker

Failover Cluster:失败自动切换,当出现失败,重试其它服务器。通常用于读操作,但重试会带来更长延迟。

Failfast Cluster:快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。

Failsafe Cluster:失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。

Failback Cluster:失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。

Forking Cluster:并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=”2″ 来设置最大并行数。

Broadcast Cluster:广播调用所有提供者,逐个调用,任意一台报错则报错 。通常用于通知所有提供者更新缓存或日志等本地资源信息。

「读操作建议使用 Failover 失败自动切换,默认重试两次其他服务器。写操作建议使用 Failfast 快速失败,发一次调用失败就立即报错。」

不知道你发现没?「换集群容错策略就是换DubboInvoker的代理类」

集群容错相关的代理类都有一个共同的属性RegistryDirectory,这个是一个很重要的组件,它用List保存了服务提供者对应的所有Invoker。

「更牛逼的是这个List是动态变化的,当服务提供者下线时,会触发相应的事件,调用方会监听这个事件,并把对应的Invoker删除,这样后续就不会调用到下线的服务了。当有新的服务提供者时,会触发生成新的Invoker。」

当一个服务的多个Invoker摆在我们面前时,该选择哪个来调用呢?这就不得不提到负载均衡策略了。

负载均衡策略实现类

解释

RandomLoadBalance

随机策略(默认)

RoundRobinLoadBalance

轮询策略

LeastActiveLoadBalance

最少活跃调用数

ConsistentHashLoadBalance

一致性hash策略

「我们只需要通过合适的负载均衡策略来选择即可」

和服务端类似类似,最终能发送网络请求的Invoker还会被Filter对应的Invoker类所代理,一个Filter一个代理类,层层代理。

如下图为Dubbo发送请求时层层调用的过程

好了,Dubbo一些比较重要的扩展点就分享完了,整个请求响应的基本过程也串下来了!