使用Netty实现远程方法调用(RPC)
很多情况下,我们可能需要用到调用远程方法的时候。比如,我们有统一的布隆过滤器,其它服务需要调用布隆过滤器进行判重;比如,我们需要调用统一的缓存数据;比如我们需要跨机器调用一些服务方法等等。这些时候都可以使用远程方法调用。
接下来,开始讲解使用Netty实现远程方法调用的步骤。
代码目录结构为:
nettynetty/client//RPC消息回调类netty/client/MessageCallBack.java//Rpc客户端管道初始化netty/client/MessageSendChannelInitializer.java//RPC客户端消息发送执行(动态代理)类netty/client/MessageSendExecutor.java//RPC客户端消息发送处理类netty/client/MessageSendHandler.java//Rpc客户端线程任务处理netty/client/MessageSendInitializeTask.java//Rpc客户端代理netty/client/MessageSendProxy.java//RPC客户端消息序列化协议框架netty/client/RpcSendSerializeFrame.java//rpc客户端服务器配置加载netty/client/RpcServerLoader.javanetty/interfnetty/interf.impl//接口实现类netty/interf/impl/CalculateImpl.java//接口netty/interf/Calculate.javanetty/serializenetty/serialize.kryo//Kryo RPC消息进行编码、解码类netty/serialize/kryo/KryoCodecUtil.java//Kryo ×××netty/serialize/kryo/KryoDecoder.java//Kryo 编码器netty/serialize/kryo/KryoEncoder.java//Kryo 工厂类netty/serialize/kryo/KryoPoolFactory.java//Kryo RPC序列化类netty/serialize/kryo/KryoSerialize.java//RPC消息进行编码、解码接口netty/serialize/MessageCodecUtil.java//消息×××netty/serialize/MessageDecoder.java//消息编码器netty/serialize/MessageEncoder.java//RPC消息序列化/反序列化接口定义netty/serialize/RpcSerialize.java//RPC消息序序列化协议选择器接口netty/serialize/RpcSerializeFrame.java//RPC消息序序列化协议类型netty/serialize/RpcSerializeProtocol.javanetty/server//服务端: 线程池异常策略netty/server/AbortPolicyWithReport.java//Rpc服务器执行模块netty/server/MessageRecvChannelInitializer.java//服务器执行模块netty/server/MessageRecvExecutor.java//Rpc服务器消息处理netty/server/MessageRecvHandler.java//Rpc服务器消息线程任务处理netty/server/MessageRecvInitializeTask.java//线程工厂:实际上就是对Runable进行一个包装,对线程设置一些信息和监控信息netty/server/NamedThreadFactory.java//RPC服务端消息序列化协议框架netty/server/RpcRecvSerializeFrame.java//自定义的线程池netty/server/RpcThreadPool.java//消息的请求体netty/MessageRequest.java//响应的请求体netty/MessageResponse.java//客户端netty/NettyClient.java//服务端netty/NettyServer.java
启动NettyServer服务端:
public static void main(String[] args) { MessageRecvExecutor executor = new MessageRecvExecutor("127.0.0.1:8686", RpcSerializeProtocol.KRYOSERIALIZE.name()); try { executor.startServer(); } catch (Exception e) { e.printStackTrace(); } }
控制台打印:
RPC Server start success!ip:127.0.0.1port:8686protocol:RpcSerializeProtocol[serializeProtocol=kryo,name=KRYOSERIALIZE,ordinal=1]
启动NettyClient客户端,并远程调用Calculate接口中的add方法:
final static MessageSendExecutor executor = new MessageSendExecutor("127.0.0.1:8686",RpcSerializeProtocol.KRYOSERIALIZE.name()); final static Calculate cal = executor.execute(Calculate.class); public static void main(String[] args) throws Exception { for(int i = 1;i< 100000;i++) ThreadExecuteUtil.submitTaskBlock(ThreadExecuteUtil.CLIENT, new Runnable() { @Override public void run() { long start = System.currentTimeMillis(); int result = cal.add(10, 20); System.out.println("result:"+result); long end = System.currentTimeMillis(); System.out.println(end-start); } }, 1000); }