手记

RPC框架原理简述:从实现一个简易RPCFramework说起

摘要:

  本文阐述了RPC框架与远程调用的产生背景,介绍了RPC的基本概念和使用背景,之后手动实现了简易的RPC框架并佐以实例进行演示,以便让各位看官对RPC有一个感性、清晰和完整的认识,最后讨论了RPC框架几个较为重要问题。总之,RPC框架的精髓在于动态代理和反射,通过它们使得远程调用“本地化”,对用户透明且友好。


版权声明:

本文原创作者:书呆子Rico
作者博客地址:http://blog.csdn.net/justloveyou_/


一. 引子

  上学时我们写得应用大都比较简单,基本上都属于单体应用,服务调用也局限于本地,如下所示:

// 服务接口public interface HelloService {

    String hello(String name);

    String hi(String msg);
}// 服务本地实现public class HelloServiceImpl implements HelloService{
    @Override
    public String hello(String name) {        return "Hello " + name;
    }    @Override
    public String hi(String msg) {        return "Hi, " + msg;
    }
}// 服务本地调用public class Main {
    public static void main(String[] args) {
        HelloService helloService = new HelloServiceImpl();
        helloServiceProxy.hello("Panda");
        helloServiceProxy.hi("Panda");
    }/** Output
           hello : Hello rico
           hi : Hi, panda
    **/  }

  我们写这样的单体应用来学习、做实验正常且合理,但是在生产环境中,单体应用在各方面的性能上和可维护性方面就远远不能满足需求了。应用内各项业务互相纠缠、耦合性太大,不利于后期的维护和升级,主要表现在以下两点上:

  • 可用性低。所有鸡蛋都放在同一个篮子里,一旦有问题导致单体应用挂掉,所有业务都不能访问,稳定性要求难以满足;

  • 不利于各业务团队进行合作,开发效率低。单体应用各业务耦合度太高,不同业务团队开发进度和实现细节不尽相同,难以高效协作。


  将不同的业务拆分到多个应用中,让不同的应用分别承担不同的功能是解决这些问题的必杀技。将不同业务分拆到不同的应用后,不但可以大幅度提升系统的稳定性还有助于丰富技术选型,进一步保证系统的性能。总的来说,从单体应用到分布式多体应用是系统升级必经之路。

  当一个单体应用演化成多体应用后,远程调用就粉墨登场了。在一个应用时,相互通信直接通过本地调用就可完成,而变为多体应用时,相互通信就得依赖远程调用了,这时一个高效稳定的RPC框架就显得非常必要了。可能有的同学会觉得,没必要非得用RPC框架啊,简单的HTTP调用不是也可以实现远程通信吗?确实,简单的HTTP调用确实也可以实现远程通信,但是它不是那么的合适,原因有二:

  • RPC远程调用像本地调用一样干净简洁,但其他方式对代码的侵入性就比较强;

  • 一般使用RPC框架实现远程通信效率比其他方式效率要高一些。


  当我们踏入公司尤其是大型互联网公司就会发现,公司的系统都由成千上万大大小小的服务组成,各服务部署在不同的机器上,由不同的团队负责。这时就会有两个很关键的问题:

  • 要搭建一个新服务,免不了需要依赖已有的服务,而现在已有的服务都在远端,怎么调用?

  • 其它团队想使用我们的新服务,我们的服务该怎么发布以便他人调用?


  下文将对RPC框架的基本原理进行介绍,并对这两个问题展开探讨,同时参考前辈的博文《RPC框架几行代码就够了》手写一个简易RPC框架以加深对PRC原理的理解。


二. RPC 框架介绍

  对于多体应用,由于各服务部署在不同机器,服务间的调用免不了网络通信过程,服务消费方每调用一个服务都要写一坨网络通信相关的代码,不仅复杂而且极易出错。如果有一种方式能让我们像调用本地服务一样调用远程服务,而让调用者对网络通信这些细节透明,那么将大大解放程序员的双手,大幅度提高生产力。比如,服务消费方在执行helloService.hi(“Panda”)时,实质上调用的是远端的服务。这种方式其实就是RPC(Remote Procedure Call Protocol),在各大互联网公司中被广泛使用,如阿里巴巴的HSF、Dubbo(开源)、Facebook的Thrift(开源)、Google GRPC(开源)、Twitter的Finagle(开源)等。

  RPC的主要功能目标是让构建分布式计算(应用)更容易,在提供强大的远程调用能力时不损失本地调用的语义简洁性。为实现该目标,RPC框架需提供一种透明调用机制让使用者不必显式的区分本地调用和远程调用。要让网络通信细节对使用者透明,我们需要对通信细节进行封装,下面是一个RPC的经典调用的流程,并且反映了所涉及到的一些通信细节:

     

 (1). 服务消费方(client)以本地调用方式调用服务;
 (2). client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体;
 (3). client stub找到服务地址,并将消息发送到服务端;
 (4). server stub收到消息后进行解码;
 (5). server stub根据解码结果 反射调用 本地的服务;
 (6). 本地服务执行并将结果返回给server stub;
 (7). server stub将返回结果打包成消息并发送至消费方;
 (8). client stub接收到消息,并进行解码;
 (9). 服务消费方得到最终结果。

  RPC框架就是要将2~8这些步骤封装起来,让用户对这些细节透明,使得远程方法调用看起来像调用本地方法一样。


三. RPC框架简易实现及其实例分析

(1).服务端

  服务端提供客户端所期待的服务,一般包括三个部分:服务接口,服务实现以及服务的注册暴露三部分,如下:

  • 服务接口

public interface HelloService {

    String hello(String name);

    String hi(String msg);
}

  • 服务实现

public class HelloServiceImpl implements HelloService{
    @Override
    public String hello(String name) {        return "Hello " + name;
    }    @Override
    public String hi(String msg) {        return "Hi, " + msg;
    }
}

  • 服务暴露:只有把服务暴露出来,才能让客户端进行调用,这是RPC框架功能之一。

public class RpcProvider {
    public static void main(String[] args) throws Exception {
        HelloService service = new HelloServiceImpl();        // RPC框架将服务暴露出来,供客户端消费
        RpcFramework.export(service, 1234);
    }
}

(2).客户端

  客户端消费服务端所提供的服务,一般包括两个部分:服务接口和服务引用两个部分,如下:

  • 服务接口:与服务端共享同一个服务接口

public interface HelloService {

    String hello(String name);

    String hi(String msg);
}123456

  • 服务引用:消费端通过RPC框架进行远程调用,这也是RPC框架功能之一

public class RpcConsumer {
    public static void main(String[] args) throws Exception {        // 由RpcFramework生成的HelloService的代理
        HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234);
        String hello = service.hello("World");
        System.out.println("客户端收到远程调用的结果 : " + hello);
    }
}

(3).RPC框架原型实现

  RPC框架主要包括两大功能:一个用于服务端暴露服务,一个用于客户端引用服务。

  • 服务端暴露服务

    /**
     * 暴露服务
     *
     * @param service 服务实现
     * @param port    服务端口
     * @throws Exception
     */
    public static void export(final Object service, int port) throws Exception {        if (service == null) {            throw new IllegalArgumentException("service instance == null");
        }        if (port <= 0 || port > 65535) {            throw new IllegalArgumentException("Invalid port " + port);
        }
        System.out.println("Export service " + service.getClass().getName() + " on port " + port);        // 建立Socket服务端
        ServerSocket server = new ServerSocket(port);        for (; ; ) {            try {                // 监听Socket请求
                final Socket socket = server.accept();                new Thread(new Runnable() {                    @Override
                    public void run() {                        try {                            try {                                /* 获取请求流,Server解析并获取请求*/
                                // 构建对象输入流,从源中读取对象到程序中
                                ObjectInputStream input = new ObjectInputStream(
                                    socket.getInputStream());                                try {

                                    System.out.println("\nServer解析请求 : ");
                                    String methodName = input.readUTF();
                                    System.out.println("methodName : " + methodName);                                    // 泛型与数组是不兼容的,除了通配符作泛型参数以外
                                    Class<?>[] parameterTypes = (Class<?>[])input.readObject();
                                    System.out.println(                                        "parameterTypes : " + Arrays.toString(parameterTypes));
                                    Object[] arguments = (Object[])input.readObject();
                                    System.out.println("arguments : " + Arrays.toString(arguments));                                    /* Server 处理请求,进行响应*/
                                    ObjectOutputStream output = new ObjectOutputStream(
                                        socket.getOutputStream());                                    try {                                        // service类型为Object的(可以发布任何服务),故只能通过反射调用处理请求
                                        // 反射调用,处理请求
                                        Method method = service.getClass().getMethod(methodName,
                                            parameterTypes);
                                        Object result = method.invoke(service, arguments);
                                        System.out.println("\nServer 处理并生成响应 :");
                                        System.out.println("result : " + result);
                                        output.writeObject(result);
                                    } catch (Throwable t) {
                                        output.writeObject(t);
                                    } finally {
                                        output.close();
                                    }
                                } finally {
                                    input.close();
                                }
                            } finally {
                                socket.close();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

  从该RPC框架的简易实现来看,RPC服务端逻辑是:首先创建ServerSocket负责监听特定端口并接收客户连接请求,然后使用Java原生的序列化/反序列化机制来解析得到请求,包括所调用方法的名称、参数列表和实参,最后反射调用服务端对服务接口的具体实现并将得到的结果回传至客户端。至此,一次简单PRC调用的服务端流程执行完毕。


  • 客户端引用服务

    /**
     * 引用服务
     *
     * @param <T>            接口泛型
     * @param interfaceClass 接口类型
     * @param host           服务器主机名
     * @param port           服务器端口
     * @return 远程服务,返回代理对象
     * @throws Exception
     */
    @SuppressWarnings("unchecked")    public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)        throws Exception {        if (interfaceClass == null) {            throw new IllegalArgumentException("Interface class == null");
        }        // JDK 动态代理的约束,只能实现对接口的代理
        if (!interfaceClass.isInterface()) {            throw new IllegalArgumentException(                "The " + interfaceClass.getName() + " must be interface class!");
        }        if (host == null || host.length() == 0) {            throw new IllegalArgumentException("Host == null!");
        }        if (port <= 0 || port > 65535) {            throw new IllegalArgumentException("Invalid port " + port);
        }
        System.out.println(            "Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);        // JDK 动态代理
        T proxy = (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(),            new Class<?>[] {interfaceClass}, new InvocationHandler() {                // invoke方法本意是对目标方法的增强,在这里用于发送RPC请求和接收响应
                @Override
                public Object invoke(Object proxy, Method method, Object[] arguments)                    throws Throwable {                    // 创建Socket客户端,并与服务端建立链接
                    Socket socket = new Socket(host, port);                    try {                        /* 客户端像服务端进行请求,并将请求参数写入流中*/
                        // 将对象写入到对象输出流,并将其发送到Socket流中去
                        ObjectOutputStream output = new ObjectOutputStream(
                            socket.getOutputStream());                        try {                            // 发送请求
                            System.out.println("\nClient发送请求 : ");
                            output.writeUTF(method.getName());
                            System.out.println("methodName : " + method.getName());
                            output.writeObject(method.getParameterTypes());
                            System.out.println("parameterTypes : " + Arrays.toString(method
                                .getParameterTypes()));
                            output.writeObject(arguments);
                            System.out.println("arguments : " + Arrays.toString(arguments));                            /* 客户端读取并返回服务端的响应*/
                            ObjectInputStream input = new ObjectInputStream(
                                socket.getInputStream());                            try {
                                Object result = input.readObject();                                if (result instanceof Throwable) {                                    throw (Throwable)result;
                                }
                                System.out.println("\nClient收到响应 : ");
                                System.out.println("result : " + result);                                return result;
                            } finally {
                                input.close();
                            }
                        } finally {
                            output.close();
                        }
                    } finally {
                        socket.close();
                    }
                }
            });        return proxy;
    }

  从该RPC框架的简易实现来看,RPC客户端逻辑是:首先创建Socket客户端并与服务端建立链接,然后使用Java原生的序列化/反序列化机制将调用请求发送给客户端,包括所调用方法的名称、参数列表将服务端的响应返回给用户即可。至此,一次简单PRC调用的客户端流程执行完毕。特别地,从代码实现来看,实现透明的PRC调用的关键就是 动态代理,这是RPC框架实现的灵魂所在。


  • RPC原型实现

public class RpcFramework {
    /**
     * 暴露服务
     *
     * @param service 服务实现
     * @param port    服务端口
     * @throws Exception
     */
    public static void export(final Object service, int port) throws Exception {        if (service == null) {            throw new IllegalArgumentException("service instance == null");
        }        if (port <= 0 || port > 65535) {            throw new IllegalArgumentException("Invalid port " + port);
        }
        System.out.println("Export service " + service.getClass().getName() + " on port " + port);        // 建立Socket服务端
        ServerSocket server = new ServerSocket(port);        for (; ; ) {            try {                // 监听Socket请求
                final Socket socket = server.accept();                new Thread(new Runnable() {                    @Override
                    public void run() {                        try {                            try {                                /* 获取请求流,Server解析并获取请求*/
                                // 构建对象输入流,从源中读取对象到程序中
                                ObjectInputStream input = new ObjectInputStream(
                                    socket.getInputStream());                                try {

                                    System.out.println("\nServer解析请求 : ");
                                    String methodName = input.readUTF();
                                    System.out.println("methodName : " + methodName);                                    // 泛型与数组是不兼容的,除了通配符作泛型参数以外
                                    Class<?>[] parameterTypes = (Class<?>[])input.readObject();
                                    System.out.println(                                        "parameterTypes : " + Arrays.toString(parameterTypes));
                                    Object[] arguments = (Object[])input.readObject();
                                    System.out.println("arguments : " + Arrays.toString(arguments));                                    /* Server 处理请求,进行响应*/
                                    ObjectOutputStream output = new ObjectOutputStream(
                                        socket.getOutputStream());                                    try {                                        // service类型为Object的(可以发布任何服务),故只能通过反射调用处理请求
                                        // 反射调用,处理请求
                                        Method method = service.getClass().getMethod(methodName,
                                            parameterTypes);
                                        Object result = method.invoke(service, arguments);
                                        System.out.println("\nServer 处理并生成响应 :");
                                        System.out.println("result : " + result);
                                        output.writeObject(result);
                                    } catch (Throwable t) {
                                        output.writeObject(t);
                                    } finally {
                                        output.close();
                                    }
                                } finally {
                                    input.close();
                                }
                            } finally {
                                socket.close();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }    /**
     * 引用服务
     *
     * @param <T>            接口泛型
     * @param interfaceClass 接口类型
     * @param host           服务器主机名
     * @param port           服务器端口
     * @return 远程服务,返回代理对象
     * @throws Exception
     */
    @SuppressWarnings("unchecked")    public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)        throws Exception {        if (interfaceClass == null) {            throw new IllegalArgumentException("Interface class == null");
        }        // JDK 动态代理的约束,只能实现对接口的代理
        if (!interfaceClass.isInterface()) {            throw new IllegalArgumentException(                "The " + interfaceClass.getName() + " must be interface class!");
        }        if (host == null || host.length() == 0) {            throw new IllegalArgumentException("Host == null!");
        }        if (port <= 0 || port > 65535) {            throw new IllegalArgumentException("Invalid port " + port);
        }
        System.out.println(            "Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);        // JDK 动态代理
        T proxy = (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(),            new Class<?>[] {interfaceClass}, new InvocationHandler() {                // invoke方法本意是对目标方法的增强,在这里用于发送RPC请求和接收响应
                @Override
                public Object invoke(Object proxy, Method method, Object[] arguments)                    throws Throwable {                    // 创建Socket客户端,并与服务端建立链接
                    Socket socket = new Socket(host, port);                    try {                        /* 客户端像服务端进行请求,并将请求参数写入流中*/
                        // 将对象写入到对象输出流,并将其发送到Socket流中去
                        ObjectOutputStream output = new ObjectOutputStream(
                            socket.getOutputStream());                        try {                            // 发送请求
                            System.out.println("\nClient发送请求 : ");
                            output.writeUTF(method.getName());
                            System.out.println("methodName : " + method.getName());
                            output.writeObject(method.getParameterTypes());
                            System.out.println("parameterTypes : " + Arrays.toString(method
                                .getParameterTypes()));
                            output.writeObject(arguments);
                            System.out.println("arguments : " + Arrays.toString(arguments));                            /* 客户端读取并返回服务端的响应*/
                            ObjectInputStream input = new ObjectInputStream(
                                socket.getInputStream());                            try {
                                Object result = input.readObject();                                if (result instanceof Throwable) {                                    throw (Throwable)result;
                                }
                                System.out.println("\nClient收到响应 : ");
                                System.out.println("result : " + result);                                return result;
                            } finally {
                                input.close();
                            }
                        } finally {
                            output.close();
                        }
                    } finally {
                        socket.close();
                    }
                }
            });        return proxy;
    }
}

  以上是简易RPC框架实现的简易完整代码。


四. 关于RPC框架的若干问题说明

(1).RPC框架如何做到透明化远程服务调用?

  如何封装通信细节才能让用户像以本地调用方式调用远程服务呢?就Java而言,动态代理恰是解决之道。Java动态代理有JDK动态代理和CGLIB动态代理两种方式。尽管字节码生成方式实现的代理更为强大和高效,但代码维护不易,因此RPC框架的大部分实现还是选择JDK动态代理的方式。在上面的例子中,RPCFramework实现中的invoke方法封装了与远端服务通信的细节,消费方首先从RPCFramework获得服务提供方的接口,当执行helloService.hi(“Panda”)方法时就会调用invoke方法。


(2).如何发布自己的服务?

  如何让别人使用我们的服务呢?难道就像我们上面的代码一样直接写死服务的IP以及端口就可以了吗?事实上,在实际生产实现中,使用人肉告知的方式是不现实的,因为实际生产中服务机器上/下线太频繁了。如果你发现一台机器提供服务不够,要再添加一台,这个时候就要告诉调用者我现在有两个IP了,你们要轮询调用来实现负载均衡;调用者咬咬牙改了,结果某天一台机器挂了,调用者发现服务有一半不可用,他又只能手动修改代码来删除挂掉那台机器的ip。这必然是相当痛苦的!

  有没有一种方法能实现自动告知,即机器的上线/下线对调用方透明,调用者不再需要写死服务提供方地址?当然可以,生产中的RPC框架都采用的是自动告知的方式,比如,阿里内部使用的RPC框架HSF是通过ConfigServer来完成这项任务的。此外,Zookeeper也被广泛用于实现服务自动注册与发现功能。不管具体采用何种技术,他们大都采用的都是 发布/订阅模式。


(3).序列化与反序列化

  我们知道,Java对象是无法直接在网络中进行传输的。那么,我们的RPC请求如何发给服务端,客户端又如何接收来自服务端的响应呢?答案是,在传输Java对象时,首先对其进行序列化,然后在相应的终端进行反序列化还原对象以便进行处理。事实上,序列化/反序列化技术也有很多种,比如Java的原生序列化方式、JSON、阿里的Hessian和ProtoBuff序列化等,它们在效率上存在差异,但又有各自的特点。


  除上面提到的三个问题外,生产中使用的RPC框架要考虑的东西还有很多,在此就不作探讨了。本文的目的就是为了让各位看官对RPC框架有一个感性的、较为深入的了解,如果达到了这一目的,笔者的目的基本就算达到了。


五. 总结

  本文阐述了远程调用的产生背景,然后介绍了RPC的基本概念和要解决的问题,之后手动实现了简易得RPC框架并佐以实例进行演示,使看官们对RPC有一个感性完整的认识,最后讨论了RPC框架的几个重要问题。总之,RPC框架的精髓在于动态代理和反射,通过它们使得远程调用“本地化”,对用户透明且友好。


六. 说明与致谢

  本文主要参考了梁飞前辈的RPC框架几行代码就够了和Hosee的RPC原理及RPC实例分析,这两篇博文都是学习RPC的好文章,感谢它们的无私分享。

  若各位看官想进一步了解Java动态代理,请移步笔者的深入理解代理模式:静态代理与JDK动态代理一文。


引用

RPC原理及RPC实例分析
RPC框架几行代码就够了

原文出处

0人推荐
随时随地看视频
慕课网APP