如何自定义编解码器
1. 前言
上一节我们一节了解了什么是编码解码、序列化和反序列化了,并且留有一道思考题,本节内容主要是深入解析该思考题。
思考题:能否把我们的编码和解码封装成独立的 Handler 呢?那么应该如何去封装呢?
2. 为什么要封装独立 Handler?
即使我们把编码和解码封装成了方法,但是还是需要在 Handler 业务逻辑里面进行手工调用,虽然看似不怎么影响,但是业务 Handler 不够纯粹,应该让 Handler 只是专心的负责处理业务逻辑就好。
实例:
ch.pipeline().addLast(new MyEncoderHandler());//解码Handler
ch.pipeline().addLast(new MyDecoderHandler());//编码Handler
ch.pipeline().addLast(new MyBusiHandler());//业务Handler
public class MyBusiHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//1.接受参数,可以直接强转
UserReq userReq=(UserReq)msg;
//2.相应数据,直接写对象
UserRes res=new UserRes();
res.setCode(0);
res.setMsg("接受成功");
ctx.writeAndFlush(res);
}
}
通过以上的代码,我们把编码和解码封装成两个独立的 Handler,并且加入到 ChannelPipeline 里面进行管理。在我们的业务 Handler 里面就可以直接操作实体数据,无需手工转换成字节数组了。
思考:那么如何进行封装 Handler 呢?
3. StringDecoder 和 StringEncoder
3.1 简单使用
StringDecoder 和 StringEncoder 是 Netty 为我们提供的专门针对普通字符串的解码和编码器,使用起来非常的简单。
客户端直接发送字符串。
实例:
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ClientTestHandler());
public class ClientTestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//客户端直接写字符串,没有任何的数据加工
ctx.channel().writeAndFlush("hello world");
}
}
服务端直接强转字符串。
实例:
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ServerTestHandler());
public class ServerTestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//直接把msg转换成String类型
String str=msg.toString();
System.out.println("str="+str);
}
}
总结,这种模式开发起来实在太方便了,无需做数据的加工,我们还是按照我们熟悉的方式去写代码,非常的方便。
但是,它只是支持普通的字符串类型进行编码和解码而已,对于复杂的引用类型则无效。
3.2 大体流程
其实原理是非常的简单的,请看下图。
执行流程说明:
- StringDecoder 必须放在业务 Handler 之前,因为都是 InboundHandler,需要按顺序执行;
- StringEncoder 放在业务 Handler 之前,则可以使用 ctx.writeAndFlush () 输出数据,也可以使用 ctx.channel ().writeAndFlus () 输出数据(ChannelHandler 已经讲过原理了);
- StringEncoder 放在业务 Handler 之后,则只能使用
ctx.channel().writeAndFlush()
输出数据。
3.3 源码阅读
思考:StringDecoder 和 StringEncoder 到底怎么实现的呢?
StringDecoder 源码:
@Sharable
public class StringDecoder extends MessageToMessageDecoder<ByteBuf> {
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
//直接msg.toString()
out.add(msg.toString(this.charset));
}
}
发现 StringDecoder 的源码非常的简单,直接.toString()
转换即可。
StringEncoder 源码:
@Sharable
public class StringEncoder extends MessageToMessageEncoder<CharSequence> {
protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
if (msg.length() != 0) {
//继续跟进源码
out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), this.charset));
}
}
}
public static ByteBuf encodeString(ByteBufAllocator alloc, CharBuffer src, Charset charset) {
//继续跟进源码
return encodeString0(alloc, false, src, charset, 0);
}
保留核心源码
static ByteBuf encodeString0(ByteBufAllocator alloc, boolean enforceHeap, CharBuffer src, Charset charset, int extraCapacity) {
CharsetEncoder encoder = CharsetUtil.encoder(charset);
int length = (int)((double)src.remaining() * (double)encoder.maxBytesPerChar()) + extraCapacity;
boolean release = true;
//1.创建ByteBuf分配器
ByteBuf dst;
if (enforceHeap) {
dst = alloc.heapBuffer(length);
} else {
dst = alloc.buffer(length);
}
ByteBuf var12;
try {
//2.得到NIO的ByteBuffer【跟进Netty的ByteBuf基本上一样】
ByteBuffer dstBuf = dst.internalNioBuffer(0, length);
int pos = dstBuf.position();
//3.把内容写得NIO的ByteBuffer
CoderResult cr = encoder.encode(src, dstBuf, true);
cr = encoder.flush(dstBuf);
//4.更新ByteBuf的写指针writeIndex
dst.writerIndex(dst.writerIndex() + dstBuf.position() - pos);
//5.给var12赋值
var12 = dst;
} catch (CharacterCodingException var16) {
throw new IllegalStateException(var16);
}
return var12;
}
大致流程就是把字符串内容转换成 NIO 的 ByteBuffer,这里大致知道整个流程即可,不用深究每行代码的意思,其实 Netty 的 ByteBuf 底层就是基于 ByteBuffer 进行封装的。
3. 自定义编解码器
通过上面 Demo 的学习,以及 StringDecoder 和 StringEncoder 两个类的学习,相信大家更加能理解编解码器了,毕竟 StringDecoder 和 StringEncoder 从字面意思也能理解它们是针对字符串格式的,如果我们想要传递一个实体那么怎么办呢?
主要解决方案有两种:
方案一: 把实体转换成 json 格式字符串,然后依然使用 StringDecoder 和 StringEncoder 编解码器,但是每次手工转换和解析,非常的麻烦;
方案二: 自定义针对实体的编解码器,并且加入到双向链表里面,这样就可以传递自定义实体了。
下面主要讲解如何实现针对实体的编解码器:
3.1 实体
实例:
@Data
public class User {
private String name;
private Integer age;
}
3.2 编码器
核心步骤:
- 继承 MessageToByteEncoder,重写 encode 方法;
- 把 User 对象转换成 byte [];
- 把 byte [] 写到 ByteBuf。
实例:
public class MyEncoder extends MessageToByteEncoder<User> {
protected void encode(ChannelHandlerContext channelHandlerContext,
User user,
ByteBuf byteBuf) throws Exception {
//1.对象流
ByteArrayOutputStream os = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(os);
oos.writeObject(user);
byte[] bytes=os.toByteArray();
//2.关闭流
oos.close();
os.close();
//3.写到ByteBuf容器
byteBuf.writeBytes(bytes);
}
}
3.3 解码器
核心步骤:
- 继承 ByteToMessageDecoder,重写 decode 方法;
- 自定义一个 byte [] 数组,长度是 ByteBuf 的可读长度;
- 把 ByteBuf 转换成 User 实体。
实例:
public class MyDecoder extends ByteToMessageDecoder {
protected void decode(ChannelHandlerContext channelHandlerContext,
ByteBuf byteBuf,
List<Object> list) throws Exception {
//1.定义byte[],长度为ByteBuf可读长度
byte[] bytes=new byte[byteBuf.readableBytes()];
//2.往byte[]读取数据
byteBuf.readBytes(bytes);
//3.对象流
ByteArrayInputStream is=new ByteArrayInputStream(bytes);
ObjectInputStream iss=new ObjectInputStream(is);
User user=(User)iss.readObject();
//4.关闭流
is.close();
iss.close();
//5.添加到集合
list.add(user);
}
}
3.4 添加到 Pipeline
实例:
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
//1.解码器
ch.pipeline().addLast(new MyDecoder());
//2.编码器
ch.pipeline().addLast(new MyEncoder());
//3.业务Handler
ch.pipeline().addLast(new ServerTestHandler());
}
});
4. 小结
通常情况下,需要把编解码器分别独立封装成 Handler,并且加入到 ChannelPipeline 进行管理,主要目的是简化繁琐的编码和解码的步骤,让业务 Handler 更加专注去处理业务逻辑,更加的符合开发人员的习惯。
本节主要掌握以下两点内容
- 如果针对字符串,那么可以使用 Netty 内置的编解码器,分别是 StringEncoder 和 StringDecoder;
- 如果是其它引用类型,主要有两种方式,①转换成字符串格式;②自定义编解码器。