上篇讲到在bossGroup的NioEventLoop中的processSelectedKey函数中会调用unsafe.read()来执行NioServerSocketChannel的的accept操作。
在workerGroup中,NioEventLoop的processSelectedKey函数中会执行socket的数据读取操作,让我们来看一下。
1.读取客户端数据过程
processSelectedKey
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise()); return;
} try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write.
return;
}
} if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
} if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException e) {
unsafe.close(unsafe.voidPromise());
}
}NioSocketChannel的read方法
public void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final int maxMessagesPerRead = config.getMaxMessagesPerRead();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
} if (!config.isAutoRead()) {
removeReadOp();
}
ByteBuf byteBuf = null; int messages = 0; boolean close = false; try {
int byteBufCapacity = allocHandle.guess(); int totalReadAmount = 0; do {
byteBuf = allocator.ioBuffer(byteBufCapacity); int writable = byteBuf.writableBytes(); int localReadAmount = doReadBytes(byteBuf); if (localReadAmount <= 0) { // not was read release the buffer
byteBuf.release();
close = localReadAmount < 0; break;
}
pipeline.fireChannelRead(byteBuf);
byteBuf = null; if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { // Avoid overflow.
totalReadAmount = Integer.MAX_VALUE; break;
}
totalReadAmount += localReadAmount; if (localReadAmount < writable) { // Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely.
break;
}
} while (++ messages < maxMessagesPerRead);
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount); if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
}
}
}逻辑还是比较清晰的,核心操作是这两句"int localReadAmount = doReadBytes(byteBuf);"以及"pipeline.fireChannelReadComplete();"。
前者负责将数据从底层读入ByteBuf,后者负责将数据转交pipline处理。
DefaultChannelPipeline的fireChannelRead实现
public ChannelPipeline fireChannelRead(Object msg) {
head.fireChannelRead(msg); return this;
}ChannelPipeline
每个socket绑定了一个pipline,pipline内部维护了一个可迭代的ChannelHandlerContext链表,用于处理io数据,以head标识头部,tail标识尾部。
当我们初始化通道,调用pipline的addLast方法塞入一个ChannelHandler时,实际对handler各个方法是否有@Skip注解做了标记,封装成一个ChannelHandlerContext放入链表的尾部。
这样有数据触发的时候,pipline会从链表首部开始迭代,找到对应能处理相应逻辑的handler进行处理。如果当前的handler需要将数据丢给下一层handler进行处理,需要调用ChannelHandlerContext的fireXXX方法将数据传递下去。
进一步查看DefaultChannelHandlerContext的fireChannelRead方法。
@Override
public ChannelHandlerContext fireChannelRead(Object msg) {
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ);
next.invoker.invokeChannelRead(next, msg); return this;
}这里的next上下文是根据handler中未携带@Skip的注解来查找最近的链表节点。进一步查看invokeChannelRead方法。
public void invokeChannelRead(final ChannelHandlerContext ctx, final Object msg) { if (msg == null) { throw new NullPointerException("msg");
} if (executor.inEventLoop()) {
invokeChannelReadNow(ctx, msg);
} else {
safeExecuteInbound(new Runnable() { @Override
public void run() {
invokeChannelReadNow(ctx, msg);
}
}, msg);
}
}由于当前是workerGroup中的EventLoop线程,走进invokeChannelReadNow。
public static void invokeChannelReadNow(final ChannelHandlerContext ctx, final Object msg) { try {
ctx.handler().channelRead(ctx, msg);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}走到这里,终于和用户代码联系起来了。当接收到客户端数据会调用用户设置的ChannelHandler的channelRead方法。这个很重要。下面我们分析下常见的ChannelHandler。
2 常见的ChannelHandler
由于数据处理的复杂性,Netty针对常见的应用场景给我们封装了一系列的ChannelHandler。先介绍ByteToMessageDecoder的子类。
2.1 ByteToMessageDecoder
2.1.1.LineBasedFrameDecoder
LineBasedFrameDecoder用于以换行符做解码分割符的场景。我们来查看其实现。根据上面的分析,当数据来了首先调用到channelRead方法。我们查看其实现
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) {
RecyclableArrayList out = RecyclableArrayList.newInstance(); try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null; if (first) {
cumulation = data;
} else { if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) {
expandCumulation(ctx, data.readableBytes());
}
cumulation.writeBytes(data);
data.release();
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) { throw e;
} catch (Throwable t) { throw new DecoderException(t);
} finally { if (cumulation != null && !cumulation.isReadable()) {
cumulation.release();
cumulation = null;
} int size = out.size();
decodeWasNull = size == 0; for (int i = 0; i < size; i ++) {
ctx.fireChannelRead(out.get(i));
}
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}注意:Netty框架并不缓存数据,所以当有未处理完的半包,我们需要自己存起来。
ByteToMessageDecoder如其名,只处理ByteBuf类型的输入数据,内部有个cumulation用于缓存半包(确切的说是若干个已读的全包加一个半包).如果每次解码以后,恰好处理完读入的字节没有剩余半包,那么清空cumulation。读入的新数据会附加到cumulation的尾部,如果cumulation剩下的空间不够写入了,则会对cumulation重新分配内存,新的内存大小正好是需要的字节数。
接着对读入的数据进行分包,详细代码如下:
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while(true) { if (in.isReadable()) {
int outSize = out.size();
int oldInputLength = in.readableBytes(); this.decode(ctx, in, out); if (!ctx.isRemoved()) { if (outSize == out.size()) { if (oldInputLength != in.readableBytes()) { continue;
}
} else { if (oldInputLength == in.readableBytes()) { throw new DecoderException(StringUtil.simpleClassName(this.getClass()) + ".decode() did not read anything but decoded a message.");
} if (!this.isSingleDecode()) { continue;
}
}
}
} return;
}
} catch (DecoderException var6) { throw var6;
} catch (Throwable var7) { throw new DecoderException(var7);
}
}接下来会调用子类的decode方法进行进一步解码,查看LineBasedFrameDecoder的实现:
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { int eol = findEndOfLine(buffer); int length; int length; if (!this.discarding) { if (eol >= 0) {
length = eol - buffer.readerIndex(); int delimLength = buffer.getByte(eol) == 13 ? 2 : 1; if (length > this.maxLength) {
buffer.readerIndex(eol + delimLength); this.fail(ctx, length); return null;
} else {
ByteBuf frame; if (this.stripDelimiter) {
frame = buffer.readBytes(length);
buffer.skipBytes(delimLength);
} else {
frame = buffer.readBytes(length + delimLength);
} return frame;
}
} else {
length = buffer.readableBytes(); if (length > this.maxLength) { this.discardedBytes = length;
buffer.readerIndex(buffer.writerIndex()); this.discarding = true; if (this.failFast) { this.fail(ctx, "over " + this.discardedBytes);
}
} return null;
}
} else { if (eol >= 0) {
length = this.discardedBytes + eol - buffer.readerIndex();
length = buffer.getByte(eol) == 13 ? 2 : 1;
buffer.readerIndex(eol + length); this.discardedBytes = 0; this.discarding = false; if (!this.failFast) { this.fail(ctx, length);
}
} else { this.discardedBytes = buffer.readableBytes();
buffer.readerIndex(buffer.writerIndex());
} return null;
}
}
作者:msrpp
链接:https://www.jianshu.com/p/20e9bb4c3d32