手记

Netty源码-一分钟掌握4种tcp粘包解决方案

目录:
  • TCP报文的传输过程

  • TCP粘包产生的原因

  • TCP粘包的解决方案

  • Netty粘包的4种落地实现

  • 实战-自定义协议

TCP报文的传输过程

每个tcp socket连接在内核中都有一个recv缓冲区和send缓冲区。

发送端:数据包先发送到send缓冲区中,然后经Nagle算法决定是不是立即发送。

接收端:数据包先接收到recv缓冲区中,然后从内核拷贝到用户空间。

TCP粘包产生的原因

  • 由Nagle算法造成的发送端的粘包:Nagle算法是减少必须发送包的个数来提高网络传输效率的。很多小的数据包先发送到send缓冲区中,积累到一定量或超时后一起发送。

  • 接收端接收不及时导致的粘包:TCP会先接收数据包到recv缓冲区中,应用程序由于某种原因未能及时获取导致多个包粘在一起。

粘包

粘包产生的根本原因:没有界限,解决方案:就是如何划定报文边界了。

TCP粘包的解决方案

  • 定长法:每次只能发送固定长度的报文,不足填充。(需考虑长度多大合适)

  • 分界符法:每个报文后添加指定分界符来区分。(需考虑消息中是否存在分界符)

  • 消息头长度法:每个报文前添加个消息头,消息头中添加个长度字段,用来标识当前消息体的长度。(最常用)

Netty粘包的4种落地实现

  • 定长法实现类:FixedLengthFrameDecoder

  • 分界符法实现类:LineBasedFrameDecoder和DelimiterBasedFrameDecoder

  • 消息头length字段法实现类:LengthFieldBasedFrameDecoder(解码)和LengthFieldPrepender(编码)

  • 自定义协议:继承ByteToMessageDecoder实现自定义协议

Netty粘包实现的原理:收到的报文先暂存到一个容器中,等所有报文到来后拆包处理。

Netty粘包实现的原理

容器:ByteToMessageDecoder中的cumulation;拆包处理:ByteToMessageDecoder中待实现的抽象方法decode。

FixedLengthFrameDecoder(定长法):参数frameLength是设置的字节长度!字节长度!字节长度!不是报文长度!

LineBasedFrameDecoder(分界符法):此实现类的分界符是换行符(\n),不是\r!不是\r!不是\r!

LengthFieldPrepender(消息头length字段法-编码):参数lengthFieldLength是设置长度字段占几个字节,非常简单的对报文编码。


LengthFieldBasedFrameDecoder(消息头length字段法-解码):可以称为万能解码器,可以解密任意格式的编码。

参数介绍

lengthFieldOffset:长度字段的起始位置offset。

lengthFieldLength:长度字段占几个字节。

lengthAdjustment:调整长度。

initialBytesToStrip:解码后的消息,跳过多少字节

// 从长度字段获取到 消息体长度frameLengthlong frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);// 根据长度字段结束为止lengthFieldEndOffset + 消息体长度frameLength + 长度调整lengthAdjustment,来计算整个报文长度frameLength += lengthAdjustment + lengthFieldEndOffset;// frameLengthInt 是整个完整报文长度int frameLengthInt = (int) frameLength;// 读取到容器中的字节是否是完整的报文,没有继续读取,反之解码处理if (in.readableBytes() < frameLengthInt) {  return null;
}
...........

下面例子:消息头总共4个字节,HDR1占1个字节,长度字段起始值:1,长度字段占2字节
,长度字段值0x0010为16表示整个报文长度。
lengthFieldOffset     =  1lengthFieldLength    =  2lengthAdjustment    =  -3(16(报文总长度) = 3(lengthFieldEndOffset) + 16(lengthFieldLength) - 3(lengthAdjustment))
initialBytesToStrip    =  3(解码后跳过3个字节)

 * BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
 * +------+--------+------+----------------+      +------+----------------+
 * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
 * | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
 * +------+--------+------+----------------+      +------+----------------+

实战-自定义协议(SelfEncoder)

public class SelfEncoder extends MessageToMessageEncoder<ByteBuf> {    @Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {

        String reqId = DateUtil.formatDate(new Date(),"yyyyMMddHHmmss") + RandomUtil.randomNumbers(4);
        System.out.println("client reqId:" + reqId);
        out.add(ctx.alloc().buffer(18).writeBytes(reqId.getBytes()));        int length = msg.readableBytes();
        System.out.println("client length:" + length);

        out.add(ctx.alloc().buffer(3).writeMedium(length));
        out.add(msg.retain());
    }
}

server端:

public class SelfFrameServer extends AbstractNettyServer {    @Override
    public void initChannelInitializer(SocketChannel ch) {
        ch.pipeline().addLast(new SelfEncoder());
        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,18,3,0,0));
        ch.pipeline().addLast(new CommonHandler());
    }    public static void main(String[] args) {
        Server server = new SelfFrameServer();
        server.start();
    }    @ChannelHandler.Sharable    class CommonHandler extends SimpleChannelInboundHandler<ByteBuf> {        @Override
        public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {

            System.out.println("server recv msg:" + msg.toString());
            ByteBuf reqIdByte = msg.readBytes(18);
            System.out.println("reqId:" + reqIdByte.toString(CharsetUtil.UTF_8));            int lenByte = msg.readMedium();
            System.out.println("length:" + lenByte);

            ByteBuf contentByte = msg.readBytes(lenByte);
            System.out.println("content:" + contentByte.toString(CharsetUtil.UTF_8));

        }        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
}

client端:

public class SelfFrameClient extends AbstractNettyClient {    @Override
    public void initChannelInitializer(SocketChannel ch) {
        ch.pipeline().addLast(new SelfEncoder());
        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,18,3,0,0));
        ch.pipeline().addLast(new CommonHandler());
    }    public static void main(String[] args) {
        Client client = new SelfFrameClient();
        client.start();
    }    @ChannelHandler.Sharable    class CommonHandler extends ChannelInboundHandlerAdapter {        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {            final ChannelFuture f = ctx.channel().writeAndFlush(Unpooled.wrappedBuffer("HELLO, WORLD".getBytes()));
            f.addListener(new ChannelFutureListener() {                @Override
                public void operationComplete(ChannelFuture future) {
                    System.out.println("发送消息完成!");
                }
            });
        }        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println(msg.toString());
        }        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
}
0人推荐
随时随地看视频
慕课网APP