手记

dubbo剖析:三 网络通信之 -- Server实现

引子:

  • dubbo剖析:一 服务发布 中,我们讲到了      RegistryProtocol.export过程中有一个关键步骤,即调用doLocalExport(final Invoker<T> originInvoker)生成Exporter,其最终调用了DubboProtocolexport()方法。

  • DubboProtocolexport()方法:完成了 "启动并监听网络服务" 的工作,具体是通过HeaderExchangerbind()方法创建了一个HeaderExchangerServer实现的。

  • 本章我们就来介绍HeaderExchangerServer设计架构功能实现

一、入口流程

服务发布流程中,RegistryProtocol会调用到DubboProtocolexport()方法,用于完成网络服务的启动和监听。

1.1 入口代码

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();        //step1 export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);        //...省略部分代码...

        //step2 创建ExchangeServer
        openServer(url);        return exporter;
    }
    private void openServer(URL url) {
        String key = url.getAddress();        if (isServer) {
            ExchangeServer server = serverMap.get(key);            if (server == null) {                //调用createServer
                serverMap.put(key, createServer(url));
            } else {
                server.reset(url);
            }
        }
    }
    private ExchangeServer createServer(URL url) {        //...省略部分代码,参数解析之类的...
        ExchangeServer server;        try {            //关键代码,使用HeaderExchanger.bind创建HeaderExchangeServer
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }        return server;
    }

1.2 流程图解

DubboProtocol.export()流程图


第一步:生成Exporter:

  • AbstractProxyInvoker作为构造参数,new出一个DubboExporter;

  • DubboExporter维护了AbstractProxyInvoker的生命周期;

  • 服务url、DubboExporter放入DubboProtocol的缓存map供消息接收处理使用;

第二步:创建ExchangeServer:

  • AbstractProxyInvoker中获取服务url;

  • 服务url作为入参、同时使用DubboProtocol包含的网络事件处理器requestHandler,调用HeaderExchangerbind()方法创建ExchangeServer

  • HeaderExchanger中又依赖了NettyTransporter,使用其bind()方法创建NettyServer

  • NettyServer是启动网络服务的核心类。HeaderExchangerServer使用NettyServer作为构造参数,扩展了它的功能;

二、server端网络层结构

server端网络层类图关系说明

2.1 网络传输层

  • EndPoint为网络端点的抽象接口,定义了获取网络端点地址、连接、及最原始的发送消息的方法。

  • ChannelHandler为网络事件处理器接口,定义了Server端监听到各种类型的网络事件时的处理方法(connected、disconnected、sent、received、caught),Netty中也有类似定义。

  • Server为网络服务端的抽象接口,继承了EndPoint的功能,并扩展了获取与服务端建连的通道Channel的方法。

  • Transporter为网络传输层的抽象接口,核心作用就是提供了创建ServerClient两个核心接口实现类的方法。

2.2 信息交换层

  • ExchangeHandler,在ChannelHandler接口基础上,添加了 响应请求 的方法。

  • ExchangeServer,在Server接口基础上,将获取Channel的方法扩展为获取ExchangeChannel的方法。

  • Exchanger为信息交换层的抽象接口,核心作用就是提供了创建ExchangeServerExchangeClient两个核心接口实现类的方法。

2.3 网络通道Channel

网络通道接口定义

  • Channel为网络通道的抽象接口,继承了EndPoint的功能,并扩展了绑定获取属性和获取通道对端地址的方法。

  • ExchangeChannel,在Channel接口的基础上,扩展了请求响应模式的功能,并能获取绑定在通道上的网络事件监听器。

三、HeaderExchangeServer & NettyServer实现详解

Server实现层次结构图

3.1 网络层

AbstractPeer类(网络事件处理器和网络节点的通用实现):

  • 定义了属性ChannelHandlerURL,作为构造方法入参注入;

  • 实现了ChannelHandlerEndPoint接口,ChannelHandler接口的相关方法依赖其channelHandler属性完成实现;

AbstractEndPoint类(加入编解码功能):

  • 定义了构造方法,入参包含属性ChannelHandlerURL

  • 定义了属性Codec2,用于编解码,通过SPI动态注入;

  • 定义了timeout/connectTimeout相关超时属性,由URL解析赋值;

  • 对外暴露了获取Codec2和超时相关属性的方法,供上层依赖调用;

AbstractServer类(网络服务端通用抽象,抽象出openclosesend的公共流程,并提供了doOpendoClose的实现扩展):

  • 定义了构造方法,入参包含属性ChannelHandlerURL,并触发doOpen()扩展;

  • 重写EndPoint接口的close()方法,触发doClose()扩展;

  • 实现EndPoint接口的send()方法,遍历并调用Channel.send()

NettyServer类(网络服务端Netty实现类,实现了doOpendoClosegetChannels三个具体扩展):

  • 实现了doOpen()扩展方法,使用Netty的ServerBootstrap完成服务启动监听,其网络世界处理器为this包装成的NettyHandler

  • 实现了doClose()扩展方法,调用Netty的boostrapchannel完成网络资源释放;

  • 实现了getChannels()方法,channels的值由网络事件处理器在connect、disconnect事件触发时变动维护;

    @Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);                return pipeline;
            }
        });        // bind
        channel = bootstrap.bind(getBindAddress());
    }
    @Override
    protected void doClose() throws Throwable {        try {            if (channel != null) {                // unbind.
                channel.close();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }        try {
            Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();            if (channels != null && channels.size() > 0) {                for (com.alibaba.dubbo.remoting.Channel channel : channels) {                    try {
                        channel.close();
                    } catch (Throwable e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }        try {            if (bootstrap != null) {                // release external resource.
                bootstrap.releaseExternalResources();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }        try {            if (channels != null) {
                channels.clear();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    }

3.2 交换层

HeaderExchangeServer类(交换层服务端,将网络层的Channel扩展为交换层的ExchangeChannel,并加入心跳检测功能):

  • 定义了构造方法,入参包含属性Server,用于实现服务端网络层功能;

  • 定义了属性定时任务线程池scheduled,用于执行“定时心跳收发及心跳超时监测”任务;

  • 定义了hearbeat/heartbeatTieout相关心跳属性,由URL解析赋值;

  • 构造方法中启动“定时心跳收发及心跳超时监测”任务,超时时“Server断连、Client断连重连”;

  • 将网络层Channel扩展为交换层ExchangeChannel,具体实现后续另辟章节;



作者:益文的圈
链接:https://www.jianshu.com/p/1a1404ce2201


0人推荐
随时随地看视频
慕课网APP