small_925_ant
2020-02-02 22:59:29浏览 1586
客户端心跳
在Motan中客户端会定期发送心跳包到服务端,用以检查服务是否可用。并且修改当前channel连接的状态isAliveState,是否活着。
客户端负载均衡
Motan中使用了客户端的负载均衡,大概实现如下:
每次增加一个server服务注册中心就会通知客户端,客户端会就创建一个Referer(相当于客户端和服务端的连接,有几个服务就会有几个Referer,每一个具体的provider服务都被抽象成一个Referer接口),然后加入服务列表中,客户端请求服务的时候通过指定的算法(轮训,随机等)从服务列表中选择一个进行请求。
当心跳检查到某个服务不可用的时候,同时也会将服务列表中的Referer过滤掉,不会再去请求。
transport模块源码
1-入口类:
NettyEndpointFactory extends AbstractEndpointFactory
2-创建server和client的类
AbstractEndpointFactory.createServer
具体实现在子类NettyEndpointFactory中的方法:
protected Server innerCreateServer(URL url, MessageHandler messageHandler) {
return new NettyServer(url, messageHandler);
}
客户端也类似
AbstractEndpointFactory.createClient
具体实现在子类NettyEndpointFactory中的方法:
protected Client innerCreateClient(URL url) {
return new NettyClient(url);
}
3-客户端心跳发送
相比server客户端多了一个心跳的机制,每创建一个客户端连接都会将其加入到心跳管理的HeartbeatClientEndpointManager中:
private Client createClient(URL url, EndpointManager endpointManager) {
Client client = innerCreateClient(url);
endpointManager.addEndpoint(client);
return client;
}
HeartbeatClientEndpointManager对心跳进行管理,并且初始化后会定时向服务端发送心跳请求:
public void init() {
executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
for (Map.Entry<Client, HeartbeatFactory> entry : endpoints.entrySet()) {
Client endpoint = entry.getKey();
try {
if (endpoint.isAvailable()) {
continue;
}
HeartbeatFactory factory = entry.getValue();
endpoint.heartbeat(factory.createRequest());
} catch (Exception e) {
LoggerUtil.error("HeartbeatEndpointManager send heartbeat Error: url=" + endpoint.getUrl().getUri() + ", " + e.getMessage());
}
}
}
}, MotanConstants.HEARTBEAT_PERIOD, MotanConstants.HEARTBEAT_PERIOD, TimeUnit.MILLISECONDS);
ShutDownHook.registerShutdownHook(new Closable() {
@Override
public void close() {
if (!executorService.isShutdown()) {
executorService.shutdown();
}
}
});
}
具体心跳请求封装为了HeartbeatRequest
public static Request getDefaultHeartbeatRequest(long requestId){
HeartbeatRequest request = new HeartbeatRequest();
request.setRequestId(requestId);
request.setInterfaceName(MotanConstants.HEARTBEAT_INTERFACE_NAME);
request.setMethodName(MotanConstants.HEARTBEAT_METHOD_NAME);
request.setParamtersDesc(MotanConstants.HHEARTBEAT_PARAM);
return request;
}
实际调用的代码在NettyClient中
public void heartbeat(Request request) {
if (state.isUnInitState() || state.isCloseState()) {
LoggerUtil.warn("NettyClient heartbeat Error: state={} url={}", state.name(), url.getUri());
return;
}
LoggerUtil.info("NettyClient heartbeat request: url={}", url.getUri());
try {
request(request, true);
} catch (Exception e) {
LoggerUtil.error("NettyClient heartbeat Error: url={}, {}", url.getUri(), e.getMessage());
}
}
4-NettyServer创建
public NettyServer(URL url, MessageHandler messageHandler) {
super(url);
this.messageHandler = messageHandler;
}
主要代码如下
standardThreadExecutor = (standardThreadExecutor != null && !standardThreadExecutor.isShutdown()) ? standardThreadExecutor
: new StandardThreadExecutor(minWorkerThread, maxWorkerThread, workerQueueSize, new DefaultThreadFactory("NettyServer-" + url.getServerPortStr(), true));
standardThreadExecutor.prestartAllCoreThreads();
设置当前server最大的连接数
channelManage = new NettyServerChannelManage(maxServerConnection);
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("channel_manage", channelManage);
pipeline.addLast("decoder", new NettyDecoder(codec, NettyServer.this, maxContentLength));
pipeline.addLast("encoder", new NettyEncoder());
pipeline.addLast("handler", new NettyChannelHandler(NettyServer.this, messageHandler, standardThreadExecutor));
}
});
5-NettyServer处理消息
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
if (msg instanceof NettyMessage) {
if (threadPoolExecutor != null) {
try {
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
processMessage(ctx, ((NettyMessage) msg));
}
});
} catch (RejectedExecutionException rejectException) {
if (((NettyMessage) msg).isRequest()) {
rejectMessage(ctx, (NettyMessage) msg);
} else {
LoggerUtil.warn("process thread pool is full, run in io thread, active={} poolSize={} corePoolSize={} maxPoolSize={} taskCount={} requestId={}",
threadPoolExecutor.getActiveCount(), threadPoolExecutor.getPoolSize(), threadPoolExecutor.getCorePoolSize(),
threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getTaskCount(), ((NettyMessage) msg).getRequestId());
processMessage(ctx, (NettyMessage) msg);
}
}
} else {
processMessage(ctx, (NettyMessage) msg);
}
} else {
LoggerUtil.error("NettyChannelHandler messageReceived type not support: class=" + msg.getClass());
throw new MotanFrameworkException("NettyChannelHandler messageReceived type not support: class=" + msg.getClass());
}
}
6-那服务端是如何处理心跳的呢?
重点在AbstractEndpointFactory.createServer中。
AbstractEndpointFactory.createServer创建server的时候做了什么呢?直接看关键代码:
messageHandler = getHeartbeatFactory(url).wrapMessageHandler(messageHandler);
它把我们的消息处理器messageHandler 进行了包装
最终传递给我们NettyServer的messageHandler (***pipeline.addLast("handler", new
NettyChannelHandler(NettyServer.this,messageHandler,standardThreadExecutor));
***) 是HeartMessageHandleWrapper:
private class HeartMessageHandleWrapper implements MessageHandler {
private MessageHandler messageHandler;
public HeartMessageHandleWrapper(MessageHandler messageHandler) {
this.messageHandler = messageHandler;
}
@Override
public Object handle(Channel channel, Object message) {
if (isHeartbeatRequest(message)) {
return getDefaultHeartbeatResponse(((Request)message).getRequestId());
}
return messageHandler.handle(channel, message);
}
}
可以看到如果request 是HeartbeatRequest 则会直接返回HeartbeatResponse。