手记

netty分析(二) -- 数据接收及报文处理

上篇讲到在bossGroup的NioEventLoop中的processSelectedKey函数中会调用unsafe.read()来执行NioServerSocketChannel的的accept操作。
在workerGroup中,NioEventLoop的processSelectedKey函数中会执行socket的数据读取操作,让我们来看一下。

1.读取客户端数据过程

processSelectedKey

 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {        final NioUnsafe unsafe = ch.unsafe();        if (!k.isValid()) {            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());            return;
        }        try {            int readyOps = k.readyOps();            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();                if (!ch.isOpen()) {                    // Connection already closed - no need to handle write.
                    return;
                }
            }            if ((readyOps & SelectionKey.OP_WRITE) != 0) {                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException e) {
            unsafe.close(unsafe.voidPromise());
        }
    }

NioSocketChannel的read方法

        public void read() {            final ChannelConfig config = config();            final ChannelPipeline pipeline = pipeline();            final ByteBufAllocator allocator = config.getAllocator();            final int maxMessagesPerRead = config.getMaxMessagesPerRead();
            RecvByteBufAllocator.Handle allocHandle = this.allocHandle;            if (allocHandle == null) {                this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
            }            if (!config.isAutoRead()) {
                removeReadOp();
            }

            ByteBuf byteBuf = null;            int messages = 0;            boolean close = false;            try {   
                int byteBufCapacity = allocHandle.guess();                int totalReadAmount = 0;                do {
                    byteBuf = allocator.ioBuffer(byteBufCapacity);                    int writable = byteBuf.writableBytes();                    int localReadAmount = doReadBytes(byteBuf);                    if (localReadAmount <= 0) {                        // not was read release the buffer
                        byteBuf.release();
                        close = localReadAmount < 0;                        break;
                    }

                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;                    if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {                        // Avoid overflow.
                        totalReadAmount = Integer.MAX_VALUE;                        break;
                    }

                    totalReadAmount += localReadAmount;                    if (localReadAmount < writable) {                        // Read less than what the buffer can hold,
                        // which might mean we drained the recv buffer completely.
                        break;
                    }
                } while (++ messages < maxMessagesPerRead);

                pipeline.fireChannelReadComplete();
                allocHandle.record(totalReadAmount);                if (close) {
                    closeOnRead(pipeline);
                    close = false;
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close);
            }
        }
    }

逻辑还是比较清晰的,核心操作是这两句"int localReadAmount = doReadBytes(byteBuf);"以及"pipeline.fireChannelReadComplete();"。
前者负责将数据从底层读入ByteBuf,后者负责将数据转交pipline处理。

DefaultChannelPipeline的fireChannelRead实现

    public ChannelPipeline fireChannelRead(Object msg) {
        head.fireChannelRead(msg);        return this;
    }

ChannelPipeline

每个socket绑定了一个pipline,pipline内部维护了一个可迭代的ChannelHandlerContext链表,用于处理io数据,以head标识头部,tail标识尾部。
当我们初始化通道,调用pipline的addLast方法塞入一个ChannelHandler时,实际对handler各个方法是否有@Skip注解做了标记,封装成一个ChannelHandlerContext放入链表的尾部。
这样有数据触发的时候,pipline会从链表首部开始迭代,找到对应能处理相应逻辑的handler进行处理。如果当前的handler需要将数据丢给下一层handler进行处理,需要调用ChannelHandlerContext的fireXXX方法将数据传递下去。

进一步查看DefaultChannelHandlerContext的fireChannelRead方法。

    @Override
    public ChannelHandlerContext fireChannelRead(Object msg) {
        DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ);
        next.invoker.invokeChannelRead(next, msg);        return this;
    }

这里的next上下文是根据handler中未携带@Skip的注解来查找最近的链表节点。进一步查看invokeChannelRead方法。

    public void invokeChannelRead(final ChannelHandlerContext ctx, final Object msg) {        if (msg == null) {            throw new NullPointerException("msg");
        }        if (executor.inEventLoop()) {
            invokeChannelReadNow(ctx, msg);
        } else {
            safeExecuteInbound(new Runnable() {                @Override
                public void run() {
                    invokeChannelReadNow(ctx, msg);
                }
            }, msg);
        }
    }

由于当前是workerGroup中的EventLoop线程,走进invokeChannelReadNow。

    public static void invokeChannelReadNow(final ChannelHandlerContext ctx, final Object msg) {        try {
            ctx.handler().channelRead(ctx, msg);
        } catch (Throwable t) {
            notifyHandlerException(ctx, t);
        }
    }

走到这里,终于和用户代码联系起来了。当接收到客户端数据会调用用户设置的ChannelHandler的channelRead方法。这个很重要。下面我们分析下常见的ChannelHandler。

2 常见的ChannelHandler

由于数据处理的复杂性,Netty针对常见的应用场景给我们封装了一系列的ChannelHandler。先介绍ByteToMessageDecoder的子类。

2.1 ByteToMessageDecoder

2.1.1.LineBasedFrameDecoder

LineBasedFrameDecoder用于以换行符做解码分割符的场景。我们来查看其实现。根据上面的分析,当数据来了首先调用到channelRead方法。我们查看其实现

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        if (msg instanceof ByteBuf) {
            RecyclableArrayList out = RecyclableArrayList.newInstance();            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;                if (first) {
                    cumulation = data;
                } else {                    if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) {
                        expandCumulation(ctx, data.readableBytes());
                    }
                    cumulation.writeBytes(data);
                    data.release();
                }
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {                throw e;
            } catch (Throwable t) {                throw new DecoderException(t);
            } finally {                if (cumulation != null && !cumulation.isReadable()) {
                    cumulation.release();
                    cumulation = null;
                }                int size = out.size();
                decodeWasNull = size == 0;                for (int i = 0; i < size; i ++) {
                    ctx.fireChannelRead(out.get(i));
                }
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

注意:Netty框架并不缓存数据,所以当有未处理完的半包,我们需要自己存起来。
ByteToMessageDecoder如其名,只处理ByteBuf类型的输入数据,内部有个cumulation用于缓存半包(确切的说是若干个已读的全包加一个半包).如果每次解码以后,恰好处理完读入的字节没有剩余半包,那么清空cumulation。读入的新数据会附加到cumulation的尾部,如果cumulation剩下的空间不够写入了,则会对cumulation重新分配内存,新的内存大小正好是需要的字节数。
接着对读入的数据进行分包,详细代码如下:

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {        try {            while(true) {                if (in.isReadable()) {
                    int outSize = out.size();
                    int oldInputLength = in.readableBytes();                    this.decode(ctx, in, out);                    if (!ctx.isRemoved()) {                        if (outSize == out.size()) {                            if (oldInputLength != in.readableBytes()) {                                continue;
                            }
                        } else {                            if (oldInputLength == in.readableBytes()) {                                throw new DecoderException(StringUtil.simpleClassName(this.getClass()) + ".decode() did not read anything but decoded a message.");
                            }                            if (!this.isSingleDecode()) {                                continue;
                            }
                        }
                    }
                }                return;
            }
        } catch (DecoderException var6) {            throw var6;
        } catch (Throwable var7) {            throw new DecoderException(var7);
        }
    }

接下来会调用子类的decode方法进行进一步解码,查看LineBasedFrameDecoder的实现:

    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {        int eol = findEndOfLine(buffer);        int length;        int length;        if (!this.discarding) {            if (eol >= 0) {
                length = eol - buffer.readerIndex();                int delimLength = buffer.getByte(eol) == 13 ? 2 : 1;                if (length > this.maxLength) {
                    buffer.readerIndex(eol + delimLength);                    this.fail(ctx, length);                    return null;
                } else {
                    ByteBuf frame;                    if (this.stripDelimiter) {
                        frame = buffer.readBytes(length);
                        buffer.skipBytes(delimLength);
                    } else {
                        frame = buffer.readBytes(length + delimLength);
                    }                    return frame;
                }
            } else {
                length = buffer.readableBytes();                if (length > this.maxLength) {                    this.discardedBytes = length;
                    buffer.readerIndex(buffer.writerIndex());                    this.discarding = true;                    if (this.failFast) {                        this.fail(ctx, "over " + this.discardedBytes);
                    }
                }                return null;
            }
        } else {            if (eol >= 0) {
                length = this.discardedBytes + eol - buffer.readerIndex();
                length = buffer.getByte(eol) == 13 ? 2 : 1;
                buffer.readerIndex(eol + length);                this.discardedBytes = 0;                this.discarding = false;                if (!this.failFast) {                    this.fail(ctx, length);
                }
            } else {                this.discardedBytes = buffer.readableBytes();
                buffer.readerIndex(buffer.writerIndex());
            }            return null;
        }
    }



作者:msrpp
链接:https://www.jianshu.com/p/20e9bb4c3d32


0人推荐
随时随地看视频
慕课网APP