简介
Netty是一个高性能、异步事件驱动的NIO框架,提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。
作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,一些业界著名的开源组件也基于Netty构建,比如RPC框架、zookeeper等。
netty 快速开发基于tcp udp等协议开发,就是快速的socket开发,而且是NIO的,
建立连接的流程大家都了解--三次握手,它如何与accept交互呢?下面以一个不太精确却通俗易懂的图来说明之:
netty服务器编写
步骤
Ø 使用启动类ServerBootstrap
Ø 需要两组线程接收客户端accept事件(默认一条线程)和read,write 事件(默认cpu核数的两倍)的请求server.group(parentGroup, childGroup)
Ø 添加参数erver.option(ChannelOption.SO_BACKLOG, 128) 假如客户端非常多 服务器处理不过来,可以有128的排队等候处理
Ø 绑定channel server.channel(NioServerSocketChannel.class)
Ø 添加Handler处理读写事件 并且需要编码和解码
Ø 最后server绑定端口异步接收信息并且关闭
ublic static void main(String[] args) throws InterruptedException {
ServerBootstrap server = new ServerBootstrap();
//accept,read write
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup =new NioEventLoopGroup();
server.group(parentGroup, childGroup);
server.option(ChannelOption.SO_BACKLOG, 128);
server.channel(NioServerSocketChannel.class);
server.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter()[0]));
ch.pipeline().addLast(new SimpleServerHander());
}
});
ChannelFuture future = server.bind(8080).sync();
future.channel().closeFuture().sync();
}
netty客户端
Ø 启动类Bootstrap
Ø 只需要设置一个线程(客户端不需要处理accept事件)
Ø 设置channel
Ø 设置handler处理服务器返回数据的处理
Ø 连接服务器
Ø 向管道中写数据
Ø 等待通道关闭获取里面的数据
public static void main(String[] args) throws InterruptedException { Bootstrap client = new Bootstrap(); //没有了accept事件 EventLoopGroup group = new NioEventLoopGroup(); client.group(group ); client.channel(NioSocketChannel.class); client.handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(Integer., Delimiters.()[0])); ch.pipeline().addLast(new SimpleClientHandler()); } }); ChannelFuture future = client.connect("192.168.10.155", 4560).sync(); User user = new User(); user.setAge(12); user.setId(1); user.setName("sssss"); future.channel().writeAndFlush(JSONObject.(user)+"\r\n"); // for(int i=0;i<100;i++){ // String msg = "ssss"+i+"\r\n"; // future.channel().writeAndFlush(msg); // // } future.channel().closeFuture().sync(); Object result = future.channel().attr(AttributeKey.("ServerData")).get(); System..println(result.toString()); }
netty实现一套springmvc框架
服务器端:
l 有个UserController类里有login方法,客户端想直接调用到login方法,我们需要在SimpleServerHander里接收客户端请求时直接调到UserController里面来
l SimpleServerHander的msg需要制定访问那一个controller和方法
l 通过java反射 调到方法
l 首先所有Controller都有Controller注解,我们利用spring容器将这些注解类缓存到本地缓存中去
l 根据客户端的Controller名通过本地缓存直接找到对象,在通过java反射去执行里面的具体方法
l 启动spring容器
写个类监听spring容器(Ordered 是用于排序 值越小越早初始化)
l 需要用到中介这模式创建类(和业务代码分离)InitMediator 初始化和netty一样
l 完成onApplicationEvent方法(他能获得所有spring中的bean),可以通过注解获取bean
l 遍历map 里面是类的名字和对象
l 获得里面所有带Action的注解 并保存到Mediator类的Map中
l 目前已经将controller和方法都保存到本地缓存中去了
l 下面再用java反射执行需要调用的方法
netty服务器处理通道读
l 首先获取msg转化客户端消息,其中包括,调用类和方法名,和数据
l 在Mediator类(方法缓存类)构建process方法(调用方法 获取结果)
l 从缓存中获取调用方法bean
l 根据java反射 通过bean 获得方法和参数去执行
客户端
l 首先启动代码静态化
l 创建单独发送方法send
l 重写NettyClientHandler方法并实现channelRead方法
长连接
之前写的netty客户端是一个端连接,每次请求完之后需要对通道进行关闭,主线程才能获取到服务器端相应的数据,如果我们不关闭通道,如何获取NettyServerHandler返回的数据哪,这个时候我们需要jdk里面的多线程交互
l 首先每次请求的id在整个系统中需要唯一。所以在请求数据封装上用AtomicLong来保证id唯一
l 我们创建一个得到响应结果的类DefaultFuture,里面需要一个构造方法和get方法(其实就是在获取结果是线程等待,直到有数据返回在唤醒线程获取数据)
l 在NettyClientHandler 对线程进行赋值,是根据请求id 找到对应的DefaultFuture,并将数据赋值给他
l 所以我们需要将所有DefaultFuture 方到ConcurrentHashMap中去,并在初始化时将放入map中
l 我们可以通过response.getId()获得对应的DefaultFuture之后进行处理
l 之后需要加锁处理,将response赋值给DefaultFuture中,并通知等待的主线程处理数据。
l 在获取数据的get方法中,首先直接循环获取DefaultFuture中的response值,没有就等待,直到被唤醒直接 返回response
l 当然可以设置等待时间
Netty+Zookeeper+spring实现类似于dubbo分布式RPC框架
服务器注册到zk上
服务器注册到zk上其实就是创建一个临时节点,并且把信息存到临时节点上
Ø 首先创建zk客户端
Ø CuratorFramework client = ZookeeperFactory.create();
Ø zk创建节点(临时节点),路径是netty+ip
netty长连接心跳包设置
服务器要不断检查客户端的连接是否可用,假如不可用断掉连接
Ø 在客户端hander处理类前添加
Ø ch.pipeline().addLast(new IdleStateHandler(60, 45, 20, TimeUnit.SECONDS));
Ø 之后再SimpleServerHandler 继承空闲检查方法
Ø @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; if(event.state().equals(IdleState.)){ System..println("读空闲==="); ctx.channel().close(); }else if(event.state().equals(IdleState.)){ System..println("写空闲====="); }else if(event.state().equals(IdleState.)){ System..println("读写空闲"); ctx.channel().writeAndFlush("ping\r\n"); } }
Ø 客户端在接收到ping指令 并处理
netty客户端加上zk监听服务器
如果netty 宕机了 zk没有加监听 会一直认为客户端存才,,所以需要加监听
Ø 添加zk监听(监听路径下面的子路径变化)
Ø 实现监听类
总结
用netty+zk实现dubbo(rpc) 用netty实现服务器端和客户端 并且将服务器端注册到zk上,就是在zk上创建临时节点 格式是/netty+ip+port+权重,所有服务器都注册上,客户端可以从zk上获取服务端列表 拿到地址进行连接(可以轮询也可以参考权重),客户端向服务器端发消息(自定义协议)就是消息id,消息内容,处理消息的类和方法名,服务器端通过类和方法名反射得到处理的方法去调用处理,返回结果。