手记

Motan中transport模块客户端心跳管理

客户端心跳

在Motan中客户端会定期发送心跳包到服务端,用以检查服务是否可用。并且修改当前channel连接的状态isAliveState,是否活着。

客户端负载均衡

Motan中使用了客户端的负载均衡,大概实现如下:
每次增加一个server服务注册中心就会通知客户端,客户端会就创建一个Referer(相当于客户端和服务端的连接,有几个服务就会有几个Referer,每一个具体的provider服务都被抽象成一个Referer接口),然后加入服务列表中,客户端请求服务的时候通过指定的算法(轮训,随机等)从服务列表中选择一个进行请求。

当心跳检查到某个服务不可用的时候,同时也会将服务列表中的Referer过滤掉,不会再去请求。

transport模块源码

1-入口类:
NettyEndpointFactory extends AbstractEndpointFactory

2-创建server和client的类
AbstractEndpointFactory.createServer
具体实现在子类NettyEndpointFactory中的方法:
protected Server innerCreateServer(URL url, MessageHandler messageHandler) {
        return new NettyServer(url, messageHandler);
}

客户端也类似
AbstractEndpointFactory.createClient
具体实现在子类NettyEndpointFactory中的方法:
protected Client innerCreateClient(URL url) {
        return new NettyClient(url);
}

3-客户端心跳发送
相比server客户端多了一个心跳的机制,每创建一个客户端连接都会将其加入到心跳管理的HeartbeatClientEndpointManager中:
private Client createClient(URL url, EndpointManager endpointManager) {
        Client client = innerCreateClient(url);
        //将创建的客户端进行统一管理,每创建一个客户端连接,就将其加入到心跳管理
        endpointManager.addEndpoint(client);
        return client;
 }
 HeartbeatClientEndpointManager对心跳进行管理,并且初始化后会定时向服务端发送心跳请求:
   public void init() {
        executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                for (Map.Entry<Client, HeartbeatFactory> entry : endpoints.entrySet()) {
                    Client endpoint = entry.getKey();
                    try {
                        // 如果节点是存活状态,那么没必要走心跳
                        if (endpoint.isAvailable()) {
                            continue;
                        }
                        HeartbeatFactory factory = entry.getValue();
                        //1创建心跳request,2-发送心跳请求
                        endpoint.heartbeat(factory.createRequest());
                    } catch (Exception e) {
                        LoggerUtil.error("HeartbeatEndpointManager send heartbeat Error: url=" + endpoint.getUrl().getUri() + ", " + e.getMessage());
                    }
                }
            }
        }, MotanConstants.HEARTBEAT_PERIOD, MotanConstants.HEARTBEAT_PERIOD, TimeUnit.MILLISECONDS);
        ShutDownHook.registerShutdownHook(new Closable() {
            @Override
            public void close() {
                if (!executorService.isShutdown()) {
                    executorService.shutdown();
                }
            }
        });
    }
 具体心跳请求封装为了HeartbeatRequest
 public static Request getDefaultHeartbeatRequest(long requestId){
        HeartbeatRequest request = new HeartbeatRequest();

        request.setRequestId(requestId);
        request.setInterfaceName(MotanConstants.HEARTBEAT_INTERFACE_NAME);
        request.setMethodName(MotanConstants.HEARTBEAT_METHOD_NAME);
        request.setParamtersDesc(MotanConstants.HHEARTBEAT_PARAM);
        return request;
    }   
    
 实际调用的代码在NettyClient中
 public void heartbeat(Request request) {
        // 如果节点还没有初始化或者节点已经被close掉了,那么heartbeat也不需要进行了
        if (state.isUnInitState() || state.isCloseState()) {
            LoggerUtil.warn("NettyClient heartbeat Error: state={} url={}", state.name(), url.getUri());
            return;
        }
        LoggerUtil.info("NettyClient heartbeat request: url={}", url.getUri());
        try {
            // async request后,如果service is  异步
            // available,那么将会自动把该client设置成可用
            request(request, true);
        } catch (Exception e) {
            LoggerUtil.error("NettyClient heartbeat Error: url={}, {}", url.getUri(), e.getMessage());
        }
    }
               


4-NettyServer创建
public NettyServer(URL url, MessageHandler messageHandler) {
        //初始化 Codec  DefaultRpcCodec
        super(url);
        this.messageHandler = messageHandler;
    }
主要代码如下        
//server 处理任务的线程池创建和预先启动核心线程
standardThreadExecutor = (standardThreadExecutor != null && !standardThreadExecutor.isShutdown()) ? standardThreadExecutor
                : new StandardThreadExecutor(minWorkerThread, maxWorkerThread, workerQueueSize, new DefaultThreadFactory("NettyServer-" + url.getServerPortStr(), true));
        standardThreadExecutor.prestartAllCoreThreads();
设置当前server最大的连接数
        //默认当前server最大的连接数100000  all clients conn  server支持的最大连接数TCP
     channelManage = new NettyServerChannelManage(maxServerConnection);
    
     serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast("channel_manage", channelManage);
                        pipeline.addLast("decoder", new NettyDecoder(codec, NettyServer.this, maxContentLength));
                        pipeline.addLast("encoder", new NettyEncoder());
                        pipeline.addLast("handler", new NettyChannelHandler(NettyServer.this, messageHandler, standardThreadExecutor));
                    }
                });  
                    
5-NettyServer处理消息
   @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
        if (msg instanceof NettyMessage) {
            if (threadPoolExecutor != null) {
                try {
                    threadPoolExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            processMessage(ctx, ((NettyMessage) msg));
                        }
                    });
                } catch (RejectedExecutionException rejectException) {
                    if (((NettyMessage) msg).isRequest()) {
                        rejectMessage(ctx, (NettyMessage) msg);
                    } else {
                        LoggerUtil.warn("process thread pool is full, run in io thread, active={} poolSize={} corePoolSize={} maxPoolSize={} taskCount={} requestId={}",
                                threadPoolExecutor.getActiveCount(), threadPoolExecutor.getPoolSize(), threadPoolExecutor.getCorePoolSize(),
                                threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getTaskCount(), ((NettyMessage) msg).getRequestId());
                        processMessage(ctx, (NettyMessage) msg);
                    }
                }
            } else {
                processMessage(ctx, (NettyMessage) msg);
            }
        } else {
            LoggerUtil.error("NettyChannelHandler messageReceived type not support: class=" + msg.getClass());
            throw new MotanFrameworkException("NettyChannelHandler messageReceived type not support: class=" + msg.getClass());
        }
    }  
                   
6-那服务端是如何处理心跳的呢?
  重点在AbstractEndpointFactory.createServer中。
  AbstractEndpointFactory.createServer创建server的时候做了什么呢?直接看关键代码:
  messageHandler = getHeartbeatFactory(url).wrapMessageHandler(messageHandler);
  它把我们的消息处理器messageHandler 进行了包装
  最终传递给我们NettyServer的messageHandler (***pipeline.addLast("handler", new 
 NettyChannelHandler(NettyServer.this,messageHandler,standardThreadExecutor));
 ***) 是HeartMessageHandleWrapper:
    private class HeartMessageHandleWrapper implements MessageHandler {
        private MessageHandler messageHandler;

        public HeartMessageHandleWrapper(MessageHandler messageHandler) {
            this.messageHandler = messageHandler;
        }

        @Override
        public Object handle(Channel channel, Object message) {
            //判断是否是HeartbeatRequest,如果是则直接从这里进行返回
            if (isHeartbeatRequest(message)) {
                return getDefaultHeartbeatResponse(((Request)message).getRequestId());
            }
            //如果不是则直接处理业务的请求
            return messageHandler.handle(channel, message);
        }
    }      

可以看到如果request 是HeartbeatRequest 则会直接返回HeartbeatResponse。    
0人推荐
随时随地看视频
慕课网APP