springboot整合netty搭建的websocket服务端,握手阻塞

//springboot(version2.0.2)整合netty(version4.1.25)搭建的websocket服务端,握手阻塞,握手成功(handlerAdded调用)但是未返回握手成功信息给前端,
//前端websocket握手阻塞,连接超时
@Component
public class NettySocket {

    private static class SingletionSocketServer {
        static final NettySocket INSTANCE = new NettySocket();
    }

    public static NettySocket getInstance() {
        return SingletionSocketServer.INSTANCE;
    }

    private EventLoopGroup parentGroup;
    private EventLoopGroup childGroup;
    private ServerBootstrap serverBootstrap;
    private ChannelFuture future;

    public NettySocket() {
        parentGroup = new NioEventLoopGroup();
        childGroup = new NioEventLoopGroup();
        serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(parentGroup, childGroup).channel(NioServerSocketChannel.class).childHandler(new NettySocketInitializer());
    }

    public void start() {
        this.future = serverBootstrap.bind(8065);
        System.out.println("netty WebSocket start ok");
    }

}

初始化器

public class NettySocketInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) {
        ChannelPipeline channelPipeline = ch.pipeline();

        ////////////////http协议的支持    ///////////////

        //Http编解码器
        channelPipeline.addLast(new HttpServerCodec());
        //对写大数据流的支持
        channelPipeline.addLast(new ChunkedWriteHandler());
        //Http对象聚合器,,参数:消息的最大长度
        //几乎在Netty中的编程都会使用到这个handler
        channelPipeline.addLast(new HttpObjectAggregator(1024 * 64));

        ////////////////http协议的支持 END///////////////

        channelPipeline.addLast(new WebSocketServerProtocolHandler("/nt-ws"));

        channelPipeline.addLast(new ChatHandler());

    }
}

消息处理

@Slf4j
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    /**
     * 记录和管理所有客户端
     */
    private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        String content = msg.text();
        log.info("accept: {}", content);

        for (Channel channel : clients) {
            channel.writeAndFlush(new TextWebSocketFrame("服务器消息[" + ctx.channel().id().asLongText() + "]: " + content));
        }

//            clients.writeAndFlush(new TextWebSocketFrame("服务器消息[" + ctx.channel().id().asLongText() + "]: " + content));
    }


    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        clients.add(ctx.channel());
        log.info("客户端连接  ——   channel id : {}", ctx.channel().id().asLongText());
        log.info("剩余客户端:{}", clients.size());
//            ctx.channel().writeAndFlush(new TextWebSocketFrame("连接成功!当前在线人数:" + clients.size()));
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        //触发该方法ChannelGroup会自动移除对应客户端的channel,所以不需要专门移除
//            clients.remove(ctx.channel());
        log.info("客户端连接断开  ——   channel id : {}", ctx.channel().id().asLongText());
        log.info("剩余客户端:{}", clients.size());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        // 发生异常之后关闭连接(关闭channel),随后从ChannelGroup中移除
        ctx.channel().close();
        clients.remove(ctx.channel());
    }
}

Booter启动类

@Component
public class NettyBooter implements ApplicationListener<ContextRefreshedEvent> {

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (event.getApplicationContext().getParent() == null) {
            try {
                NettySocket.getInstance().start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}


成佛的小兔几
浏览 6391回答 1
1回答
打开App,查看更多内容
随时随地看视频慕课网APP