//springboot(version2.0.2)整合netty(version4.1.25)搭建的websocket服务端,握手阻塞,握手成功(handlerAdded调用)但是未返回握手成功信息给前端, //前端websocket握手阻塞,连接超时 @Component public class NettySocket { private static class SingletionSocketServer { static final NettySocket INSTANCE = new NettySocket(); } public static NettySocket getInstance() { return SingletionSocketServer.INSTANCE; } private EventLoopGroup parentGroup; private EventLoopGroup childGroup; private ServerBootstrap serverBootstrap; private ChannelFuture future; public NettySocket() { parentGroup = new NioEventLoopGroup(); childGroup = new NioEventLoopGroup(); serverBootstrap = new ServerBootstrap(); serverBootstrap.group(parentGroup, childGroup).channel(NioServerSocketChannel.class).childHandler(new NettySocketInitializer()); } public void start() { this.future = serverBootstrap.bind(8065); System.out.println("netty WebSocket start ok"); } }
初始化器
public class NettySocketInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline channelPipeline = ch.pipeline(); ////////////////http协议的支持 /////////////// //Http编解码器 channelPipeline.addLast(new HttpServerCodec()); //对写大数据流的支持 channelPipeline.addLast(new ChunkedWriteHandler()); //Http对象聚合器,,参数:消息的最大长度 //几乎在Netty中的编程都会使用到这个handler channelPipeline.addLast(new HttpObjectAggregator(1024 * 64)); ////////////////http协议的支持 END/////////////// channelPipeline.addLast(new WebSocketServerProtocolHandler("/nt-ws")); channelPipeline.addLast(new ChatHandler()); } }
消息处理
@Slf4j public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { /** * 记录和管理所有客户端 */ private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { String content = msg.text(); log.info("accept: {}", content); for (Channel channel : clients) { channel.writeAndFlush(new TextWebSocketFrame("服务器消息[" + ctx.channel().id().asLongText() + "]: " + content)); } // clients.writeAndFlush(new TextWebSocketFrame("服务器消息[" + ctx.channel().id().asLongText() + "]: " + content)); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { clients.add(ctx.channel()); log.info("客户端连接 —— channel id : {}", ctx.channel().id().asLongText()); log.info("剩余客户端:{}", clients.size()); // ctx.channel().writeAndFlush(new TextWebSocketFrame("连接成功!当前在线人数:" + clients.size())); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { //触发该方法ChannelGroup会自动移除对应客户端的channel,所以不需要专门移除 // clients.remove(ctx.channel()); log.info("客户端连接断开 —— channel id : {}", ctx.channel().id().asLongText()); log.info("剩余客户端:{}", clients.size()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); // 发生异常之后关闭连接(关闭channel),随后从ChannelGroup中移除 ctx.channel().close(); clients.remove(ctx.channel()); } }
Booter启动类
@Component public class NettyBooter implements ApplicationListener<ContextRefreshedEvent> { @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (event.getApplicationContext().getParent() == null) { try { NettySocket.getInstance().start(); } catch (Exception e) { e.printStackTrace(); } } } }