手记

用SpringBoot集成Netty开发一个基于WebSocket的聊天室

前言

基于SpringBoot,借助Netty控制长链接,使用WebSocket协议做一个实时的聊天室。

项目效果

项目统一登录路径: http://localhost:8080/chat/netty

用户名随机生成,离线调用异步方法,数据写操作,登录显示历史聊天消息

GitHub

项目名:InChat

项目地址:https://github.com/UncleCatMy...

项目介绍:基于Netty4与SpringBoot,聊天室WebSocket(文字图片)加API调用Netty长链接执行发送消息(在线数、用户列表)、Iot物联网-MQTT协议、TCP/IP协议单片机通信,异步存储聊天数据

代码实操讲解

随机命名工具类

publicclassRandomNameUtil{privatestaticRandom ran =newRandom();privatefinalstaticintdelta =0x9fa5-0x4e00+1;publicstaticchargetName(){return(char)(0x4e00+ ran.nextInt(delta));    }}

配置文件yml

spring:datasource:driver-class-name:com.mysql.jdbc.Driverusername:rootpassword:rooturl:jdbc:mysql://localhost:3306/nettychat?characterEncoding=utf-8&useSSL=falsejpa:show-sql:truenetty:port:8090#监听端口bossThread:2#线程数workerThread:2#线程数keepalive:true#保持连接backlog:100

数据库准备

SETFOREIGN_KEY_CHECKS=0;-- ------------------------------ Table structure for user_msg-- ----------------------------DROPTABLEIFEXISTS`user_msg`;CREATETABLE`user_msg`(`id`int(11)NOTNULLAUTO_INCREMENT,`name`varchar(255)DEFAULTNULL,`msg`varchar(255)DEFAULTNULL,`create_time`timestampNOTNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,`update_time`timestampNOTNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,  PRIMARYKEY(`id`))ENGINE=InnoDBAUTO_INCREMENT=19DEFAULTCHARSET=utf8mb4;-- ------------------------------ Records of user_msg-- ----------------------------INSERTINTO`user_msg`VALUES('1','亪','今天不开心','2018-08-14 14:26:02','2018-08-14 14:26:02');INSERTINTO`user_msg`VALUES('2','祐','不错呀','2018-08-14 15:09:40','2018-08-14 15:09:40');INSERTINTO`user_msg`VALUES('3','搈','开心 开心','2018-08-14 15:09:40','2018-08-14 15:09:40');INSERTINTO`user_msg`VALUES('4','兇','可以的,后面再做个深入一点的','2018-08-14 15:18:35','2018-08-14 15:18:35');INSERTINTO`user_msg`VALUES('5','倎','开源这个项目','2018-08-14 15:18:35','2018-08-14 15:18:35');INSERTINTO`user_msg`VALUES('6','蝡','1-someting','2018-08-14 15:24:28','2018-08-14 15:24:28');INSERTINTO`user_msg`VALUES('7','弔','不行呀','2018-08-14 15:24:29','2018-08-14 15:24:29');INSERTINTO`user_msg`VALUES('8','習','可以的','2018-08-14 15:26:03','2018-08-14 15:26:03');INSERTINTO`user_msg`VALUES('9','蔫','开源这个项目','2018-08-14 15:26:03','2018-08-14 15:26:03');

dataObject与JPA数据DAO

@Data@Entity@DynamicUpdatepublicclassUserMsgimplementsSerializable{privatestaticfinallongserialVersionUID =4133316147283239759L;@Id@GeneratedValue(strategy = GenerationType.IDENTITY)privateInteger id;privateString name;privateString msg;privateDate createTime;privateDate updateTime;}

publicinterfaceUserMsgRepositoryextendsJpaRepository{//本次未使用到自定义方法,JPA原生即可}

NoSQL模拟环境

我没有去配置虚拟机环境,就本地模拟了

保存用户名称与链接随机ID

@ComponentpublicclassLikeRedisTemplate{privateMap RedisMap =newConcurrentHashMap<>();publicvoidsave(Object id,Object name){        RedisMap.put(id,name);    }publicvoiddelete(Object id){        RedisMap.remove(id);    }publicObjectget(Object id){returnRedisMap.get(id);    }}

聊天内容临时存储

@ComponentpublicclassLikeSomeCacheTemplate{privateSet SomeCache =newLinkedHashSet<>();publicvoidsave(Object user,Object msg){        UserMsg userMsg =newUserMsg();        userMsg.setName(String.valueOf(user));        userMsg.setMsg(String.valueOf(msg));        SomeCache.add(userMsg);    }publicSetcloneCacheMap(){returnSomeCache;    }publicvoidclearCacheMap(){        SomeCache.clear();    }}

异步任务处理

@ComponentpublicclassMsgAsyncTesk{@AutowiredprivateLikeSomeCacheTemplate cacheTemplate;@AutowiredprivateUserMsgRepository userMsgRepository;@AsyncpublicFuturesaveChatMsgTask()throwsException{//        System.out.println("启动异步任务");Set set = cacheTemplate.cloneCacheMap();for(UserMsg item:set){//保存用户消息userMsgRepository.save(item);        }//清空临时缓存cacheTemplate.clearCacheMap();returnnewAsyncResult<>(true);    }}

netty核心

配置类

@Data@Component@ConfigurationProperties(prefix ="netty")publicclassNettyAccountConfig{privateintport;privateintbossThread;privateintworkerThread;privatebooleankeepalive;privateintbacklog;}

核心消息处理类

@Component@Qualifier("textWebSocketFrameHandler")@ChannelHandler.SharablepublicclassTextWebSocketFrameHandlerextendsSimpleChannelInboundHandler{publicstaticChannelGroup channels =newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);@AutowiredprivateLikeRedisTemplate redisTemplate;@AutowiredprivateLikeSomeCacheTemplate cacheTemplate;@AutowiredprivateMsgAsyncTesk msgAsyncTesk;@OverrideprotectedvoidchannelRead0(ChannelHandlerContext ctx,

                                TextWebSocketFrame msg)throwsException{        Channel incoming = ctx.channel();        String uName = String.valueOf(redisTemplate.get(incoming.id()));for(Channel channel : channels) {//将当前每个聊天内容进行存储System.out.println("存储数据:"+uName+"-"+msg.text());            cacheTemplate.save(uName,msg.text());if(channel != incoming){                channel.writeAndFlush(newTextWebSocketFrame("["+ uName +"]"+ msg.text()));            }else{                channel.writeAndFlush(newTextWebSocketFrame("[you]"+ msg.text() ));            }        }    }@OverridepublicvoidhandlerAdded(ChannelHandlerContext ctx)throwsException{        System.out.println(ctx.channel().remoteAddress());        String uName = String.valueOf(RandomNameUtil.getName());//用来获取一个随机的用户名,可以用其他方式代替//新用户接入Channel incoming = ctx.channel();for(Channel channel : channels) {            channel.writeAndFlush(newTextWebSocketFrame("[新用户] - "+ uName +" 加入"));        }        redisTemplate.save(incoming.id(),uName);//存储用户channels.add(ctx.channel());    }@OverridepublicvoidhandlerRemoved(ChannelHandlerContext ctx)throwsException{        Channel incoming = ctx.channel();        String uName = String.valueOf(redisTemplate.get(incoming.id()));//用户离开for(Channel channel : channels) {            channel.writeAndFlush(newTextWebSocketFrame("[用户] - "+ uName +" 离开"));        }        redisTemplate.delete(incoming.id());//删除用户channels.remove(ctx.channel());    }@OverridepublicvoidchannelActive(ChannelHandlerContext ctx)throwsException{        Channel incoming = ctx.channel();        System.out.println("用户:"+redisTemplate.get(incoming.id())+"在线");    }@OverridepublicvoidchannelInactive(ChannelHandlerContext ctx)throwsException{        Channel incoming = ctx.channel();        System.out.println("用户:"+redisTemplate.get(incoming.id())+"掉线");        msgAsyncTesk.saveChatMsgTask();    }@OverridepublicvoidexceptionCaught(ChannelHandlerContext ctx, Throwable cause)throwsException{        Channel incoming = ctx.channel();        System.out.println("用户:"+ redisTemplate.get(incoming.id()) +"异常");        cause.printStackTrace();        ctx.close();    }}

定义Initializer

@Component@Qualifier("somethingChannelInitializer")publicclassNettyWebSocketChannelInitializerextendsChannelInitializer{@AutowiredprivateTextWebSocketFrameHandler textWebSocketFrameHandler;@OverridepublicvoidinitChannel(SocketChannel ch)throwsException{        ChannelPipeline pipeline = ch.pipeline();        pipeline.addLast(newHttpServerCodec());        pipeline.addLast(newHttpObjectAggregator(65536));        pipeline.addLast(newChunkedWriteHandler());        pipeline.addLast(newWebSocketServerProtocolHandler("/ws"));        pipeline.addLast(textWebSocketFrameHandler);//这里不能使用new,不然在handler中不能注入依赖}}

启动创建Netty基本组件

@ComponentpublicclassNettyConfig{@AutowiredprivateNettyAccountConfig nettyAccountConfig;@Bean(name ="bossGroup", destroyMethod ="shutdownGracefully")publicNioEventLoopGroupbossGroup(){returnnewNioEventLoopGroup(nettyAccountConfig.getBossThread());    }@Bean(name ="workerGroup", destroyMethod ="shutdownGracefully")publicNioEventLoopGroupworkerGroup(){returnnewNioEventLoopGroup(nettyAccountConfig.getWorkerThread());    }@Bean(name ="tcpSocketAddress")publicInetSocketAddresstcpPost(){returnnewInetSocketAddress(nettyAccountConfig.getPort());    }@Bean(name ="tcpChannelOptions")publicMap, Object> tcpChannelOptions(){        Map, Object> options =newHashMap, Object>();        options.put(ChannelOption.SO_KEEPALIVE, nettyAccountConfig.isKeepalive());        options.put(ChannelOption.SO_BACKLOG, nettyAccountConfig.getBacklog());returnoptions;    }@Autowired@Qualifier("somethingChannelInitializer")privateNettyWebSocketChannelInitializer nettyWebSocketChannelInitializer;@Bean(name ="serverBootstrap")publicServerBootstrapbootstrap(){        ServerBootstrap b =newServerBootstrap();        b.group(bossGroup(), workerGroup())                .channel(NioServerSocketChannel.class)                .handler(newLoggingHandler(LogLevel.DEBUG))                .childHandler(nettyWebSocketChannelInitializer);        Map, Object> tcpChannelOptions = tcpChannelOptions();        Set> keySet = tcpChannelOptions.keySet();for(@SuppressWarnings("rawtypes") ChannelOption option : keySet) {            b.option(option, tcpChannelOptions.get(option));        }returnb;    }}

服务启动协助类

@Data@ComponentpublicclassTCPServer{@Autowired@Qualifier("serverBootstrap")privateServerBootstrap serverBootstrap;@Autowired@Qualifier("tcpSocketAddress")privateInetSocketAddress tcpPort;privateChannel serverChannel;publicvoidstart()throwsException{        serverChannel =  serverBootstrap.bind(tcpPort).sync().channel().closeFuture().sync().channel();    }@PreDestroypublicvoidstop()throwsException{        serverChannel.close();        serverChannel.parent().close();    }}

项目启动

@SpringBootApplication@EnableScheduling//启动异步任务publicclassNettychatApplication{publicstaticvoidmain(String[] args)throwsException{        ConfigurableApplicationContext context = SpringApplication.run(NettychatApplication.class, args);//注入NettyConfig 获取对应BeanNettyConfig nettyConfig = context.getBean(NettyConfig.class);//注入TCPServer 获取对应BeanTCPServer tcpServer = context.getBean(TCPServer.class);//启动websocket的服务tcpServer.start();    }}

GitHub

项目名:InChat

项目地址:https://github.com/UncleCatMy...

项目介绍:基于Netty4与SpringBoot,聊天室WebSocket(文字图片)加API调用Netty长链接执行发送消息(在线数、用户列表)、Iot物联网-MQTT协议、TCP/IP协议单片机通信,异步存储聊天数据



作者:Java邵先生
链接:https://www.jianshu.com/p/fe30806af6fa


2人推荐
随时随地看视频
慕课网APP