手记

netty高级应用

简介

Netty是一个高性能、异步事件驱动的NIO框架,提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。

作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,一些业界著名的开源组件也基于Netty构建,比如RPC框架、zookeeper等。

netty  快速开发基于tcp udp等协议开发,就是快速的socket开发,而且是NIO的,

建立连接的流程大家都了解--三次握手,它如何与accept交互呢?下面以一个不太精确却通俗易懂的图来说明之:

netty服务器编写

步骤

Ø  使用启动类ServerBootstrap

Ø  需要两组线程接收客户端accept事件(默认一条线程)和read,write 事件(默认cpu核数的两倍)的请求server.group(parentGroup, childGroup)

Ø  添加参数erver.option(ChannelOption.SO_BACKLOG, 128)  假如客户端非常多 服务器处理不过来,可以有128的排队等候处理

Ø  绑定channel server.channel(NioServerSocketChannel.class)

Ø  添加Handler处理读写事件  并且需要编码和解码

Ø  最后server绑定端口异步接收信息并且关闭

ublic static void main(String[] args) throws InterruptedException {
  
   ServerBootstrap server = new ServerBootstrap();
   //accept,read write
   EventLoopGroup parentGroup = new NioEventLoopGroup();
   EventLoopGroup childGroup =new NioEventLoopGroup();
   server.group(parentGroup, childGroup);
   server.option(ChannelOption.SO_BACKLOG, 128);
  
   server.channel(NioServerSocketChannel.class);
  
  
   server.childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
         ch.pipeline().addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter()[0]));
         ch.pipeline().addLast(new SimpleServerHander());
        
      }
   });
  
  
   ChannelFuture future = server.bind(8080).sync();
   future.channel().closeFuture().sync();
  
  
}

netty客户端

Ø  启动类Bootstrap

Ø  只需要设置一个线程(客户端不需要处理accept事件)

Ø  设置channel

Ø  设置handler处理服务器返回数据的处理

Ø  连接服务器

Ø  向管道中写数据

Ø  等待通道关闭获取里面的数据

public static void main(String[] args) throws InterruptedException {
       
       Bootstrap client = new Bootstrap();
       //没有了accept事件
       EventLoopGroup group = new NioEventLoopGroup();
       client.group(group );
       client.channel(NioSocketChannel.class);
       client.handler(new ChannelInitializer<NioSocketChannel>() {
          @Override
          protected void initChannel(NioSocketChannel ch) throws Exception {
             ch.pipeline().addLast(new StringEncoder());
             ch.pipeline().addLast(new DelimiterBasedFrameDecoder(Integer., Delimiters.()[0]));
             ch.pipeline().addLast(new SimpleClientHandler());
          }
       });
       
       ChannelFuture future = client.connect("192.168.10.155", 4560).sync();
       User user = new User();
       user.setAge(12);
       user.setId(1);
       user.setName("sssss");
       future.channel().writeAndFlush(JSONObject.(user)+"\r\n");
 //    for(int i=0;i<100;i++){
 //        String msg = "ssss"+i+"\r\n";
 //        future.channel().writeAndFlush(msg);
 //       
 //    }
       future.channel().closeFuture().sync();
       Object result = future.channel().attr(AttributeKey.("ServerData")).get();
       System..println(result.toString());
       
    }

netty实现一套springmvc框架

服务器端:

l  有个UserController类里有login方法,客户端想直接调用到login方法,我们需要在SimpleServerHander里接收客户端请求时直接调到UserController里面来

l  SimpleServerHander的msg需要制定访问那一个controller和方法

l  通过java反射 调到方法

l  首先所有Controller都有Controller注解,我们利用spring容器将这些注解类缓存到本地缓存中去

l  根据客户端的Controller名通过本地缓存直接找到对象,在通过java反射去执行里面的具体方法

l  启动spring容器

写个类监听spring容器(Ordered  是用于排序 值越小越早初始化)

l  需要用到中介这模式创建类(和业务代码分离)InitMediator 初始化和netty一样

l  完成onApplicationEvent方法(他能获得所有spring中的bean),可以通过注解获取bean

l  遍历map 里面是类的名字和对象

l  获得里面所有带Action的注解 并保存到Mediator类的Map中

l  目前已经将controller和方法都保存到本地缓存中去了

l  下面再用java反射执行需要调用的方法

netty服务器处理通道读

l  首先获取msg转化客户端消息,其中包括,调用类和方法名,和数据

l  在Mediator类(方法缓存类)构建process方法(调用方法 获取结果)

l  从缓存中获取调用方法bean

l  根据java反射 通过bean 获得方法和参数去执行

客户端

l  首先启动代码静态化

l  创建单独发送方法send

l  重写NettyClientHandler方法并实现channelRead方法

长连接

之前写的netty客户端是一个端连接,每次请求完之后需要对通道进行关闭,主线程才能获取到服务器端相应的数据,如果我们不关闭通道,如何获取NettyServerHandler返回的数据哪,这个时候我们需要jdk里面的多线程交互

l  首先每次请求的id在整个系统中需要唯一。所以在请求数据封装上用AtomicLong来保证id唯一

l  我们创建一个得到响应结果的类DefaultFuture,里面需要一个构造方法和get方法(其实就是在获取结果是线程等待,直到有数据返回在唤醒线程获取数据)

l  在NettyClientHandler 对线程进行赋值,是根据请求id 找到对应的DefaultFuture,并将数据赋值给他

l  所以我们需要将所有DefaultFuture 方到ConcurrentHashMap中去,并在初始化时将放入map中

l  我们可以通过response.getId()获得对应的DefaultFuture之后进行处理

l  之后需要加锁处理,将response赋值给DefaultFuture中,并通知等待的主线程处理数据。

l  在获取数据的get方法中,首先直接循环获取DefaultFuture中的response值,没有就等待,直到被唤醒直接  返回response

l  当然可以设置等待时间

Netty+Zookeeper+spring实现类似于dubbo分布式RPC框架

服务器注册到zk上

服务器注册到zk上其实就是创建一个临时节点,并且把信息存到临时节点上

Ø  首先创建zk客户端

Ø  CuratorFramework client = ZookeeperFactory.create();

Ø  zk创建节点(临时节点),路径是netty+ip

netty长连接心跳包设置

服务器要不断检查客户端的连接是否可用,假如不可用断掉连接

Ø  在客户端hander处理类前添加

Ø  ch.pipeline().addLast(new IdleStateHandler(60, 45, 20, TimeUnit.SECONDS));

Ø  之后再SimpleServerHandler 继承空闲检查方法

Ø   @Override
 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if(evt instanceof IdleStateEvent){
       IdleStateEvent event = (IdleStateEvent)evt;
       if(event.state().equals(IdleState.)){
          System..println("读空闲===");
          ctx.channel().close();
       }else if(event.state().equals(IdleState.)){
          System..println("写空闲=====");
       }else if(event.state().equals(IdleState.)){
          System..println("读写空闲");
          ctx.channel().writeAndFlush("ping\r\n");
       }
       
       
    }

Ø  客户端在接收到ping指令  并处理

netty客户端加上zk监听服务器

如果netty 宕机了 zk没有加监听 会一直认为客户端存才,,所以需要加监听

Ø  添加zk监听(监听路径下面的子路径变化)

Ø  实现监听类

总结

用netty+zk实现dubbo(rpc)  用netty实现服务器端和客户端 并且将服务器端注册到zk上,就是在zk上创建临时节点  格式是/netty+ip+port+权重,所有服务器都注册上,客户端可以从zk上获取服务端列表 拿到地址进行连接(可以轮询也可以参考权重),客户端向服务器端发消息(自定义协议)就是消息id,消息内容,处理消息的类和方法名,服务器端通过类和方法名反射得到处理的方法去调用处理,返回结果。

 


1人推荐
随时随地看视频
慕课网APP