Channel
Buffer
Selector
Channel故名思义就是一个渠道,一条链接,实际物理意义映射着我们平时说的tcp链接。
在java的tcp服务端中,主要有两种Channel,ServerSocketChannel和 SocketChannel,工作模式都是全双工的。ServerSocketChannel是用于监听服务端端口的,假设我们的程序监听了8080端口,那么ServerSocketChannel就扮演者监听者的角色,如果有客户端握手成功了,将第一时间被这个Channel发现到。发现到以后,ServerSocketChannel会调用accept()来创建一个小弟ServerSocketChannel,对这个客户端链接独立服务。从这以后,ServerSocketChannel就不在关注这个客户端链接了。和客户端之间的数据传输,均是交由该SocketChannel去干。
由此看来。在一个服务端程序中,ServerSocketChannel就像是一个总机的角色,有客户端连进来了,由总机首次发觉,然后就交给一个新的分机去处理具体的业务数据流动。
客户端程序相比于服务端,因其是靶向连接到固定地址,所以不需要ServerSocketChannel的角色。
Buffer,服务端socket和客户端socket通讯时,存储数据的缓冲区,这个不展开讲了。网络编程中,我们常用ByteBuffer。
一个服务端程序,如果仅仅使用Channel和Buffer,我们也许会这样做:
一个线程去检测ServerSocketChannel的状态,然后每accept()一个客户端之后,起一个新的线程带着新的SocketChannel去玩,循环检测这个Channel上是否有数据传进来,读入Buffer,处理,用Buffer塞入Channel。。。
这样我们需要开启1+n个线程。当然,我们也可以使用1个线程轮寻所有Channel,但是这种方法效率低下,不被使用。为了解决这个问题,jdk引入了NIO的第三大组件 -- Selector。
Selector 复用选择器,可以register方法将所有需要事件监听的Channel都放入其中,监听的事件主要有两种,可读和可写。 可读又进一步区分成accept和read事件。
我们只需要开启一个线程循环调用Selector的select方法,即可一个返回发生事件的Channel迭代器,使用上更加方便了。
下面我们借用<<netty实战>>中的代码我们展示下如何使用Selector创建一个服务端程序 。
/* * Copyright 2013-2018 Lilinfeng. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package com.phei.netty.nio;import java.io.IOException;import java.net.InetSocketAddress;import java.net.SocketOption;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;/** * @author Administrator * @date 2014年2月16日 * @version 1.0 */public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; /** * 初始化多路复用器、绑定监听端口 * * @param port */ public MultiplexerTimeServer(int port) { try { selector = Selector.open(); servChannel = ServerSocketChannel.open(); servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(port), 1024); servChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is start in port : " + port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop() { this.stop = true; } /* * (non-Javadoc) * * @see java.lang.Runnable#run() */ @Override public void run() { while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Throwable t) { t.printStackTrace(); } } // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源 if (selector != null) try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { // 处理新接入的请求消息 if (key.isAcceptable()) { // Accept the new connection ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); // Add the new connection to the selector sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { // Read the data SocketChannel sc = (SocketChannel) key.channel(); //sc.setOption(SocketOption) ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The time server receive order : " + body); String currentTime = "QUERY TIME ORDER" .equalsIgnoreCase(body) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc, currentTime); } else if (readBytes < 0) { // 对端链路关闭 key.cancel(); sc.close(); } else ; // 读到0字节,忽略 } } } private void doWrite(SocketChannel channel, String response) throws IOException { if (response != null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } } public static void main(String[] args) throws IOException { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start(); } }
如果有c的编程经验的话,可以看出来,java中的nio在c中都是有对应原型的,selector对象对应着c中的nio句柄(不同平台下有着不同的实现),Channel对应着socket句柄,可以调用setoption设置socket参数。而Channel应该就是socket句柄调用read等方法读取出来的数据做了缓存。下面我们来验证一下我们的猜想。
selector = Selector.open();
这里打开了一个selector对象,查看实现。
public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); }
看下provider的实现。
public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
重要的是这一句"provider = sun.nio.ch.DefaultSelectorProvider.create();",这里下层的代码,各个平台的jdk版本有了差异,我这里是mac,生成的是一个KQueueSelectorProvider对象。一步步跟进,最后调用到了这里
KQueueArrayWrapper() { int var1 = SIZEOF_KEVENT * 128; this.keventArray = new AllocatedNativeObject(var1, true); this.keventArrayAddress = this.keventArray.address(); this.kq = this.init(); }
其中init是一个native方法,这个方法的jni层命名应该是包名+对象名+函数名,拼凑出来是这样的。"sun.nio.ch"+ "KQueueArrayWrapper"+"init",结果是"Java_sun_nio_ch_KQueueArrayWrapper_init"找到jvm的实现代码,位于jdk/src/macosx/native/sun/nio/ch/KQueueArrayWrapper.c 。
JNIEXPORT jint JNICALLJava_sun_nio_ch_KQueueArrayWrapper_init(JNIEnv *env, jobject this){ int kq = kqueue(); if (kq < 0) { JNU_ThrowIOExceptionWithLastError(env, "KQueueArrayWrapper: kqueue() failed"); } return kq; }
确实是新建了一个kqueue句柄。由此可以得到结论,selector在macos下的底层实现是kqueue。
同时附上register和event的实现。
JNIEXPORT void JNICALLJava_sun_nio_ch_KQueueArrayWrapper_register0(JNIEnv *env, jobject this, jint kq, jint fd, jint r, jint w){ struct kevent changes[2]; struct kevent errors[2]; struct timespec dontBlock = {0, 0}; // if (r) then { register for read } else { unregister for read } // if (w) then { register for write } else { unregister for write } // Ignore errors - they're probably complaints about deleting non- // added filters - but provide an error array anyway because // kqueue behaves erratically if some of its registrations fail. EV_SET(&changes[0], fd, EVFILT_READ, r ? EV_ADD : EV_DELETE, 0, 0, 0); EV_SET(&changes[1], fd, EVFILT_WRITE, w ? EV_ADD : EV_DELETE, 0, 0, 0); kevent(kq, changes, 2, errors, 2, &dontBlock); }JNIEXPORT jint JNICALLJava_sun_nio_ch_KQueueArrayWrapper_kevent0(JNIEnv *env, jobject this, jint kq, jlong kevAddr, jint kevCount, jlong timeout){ struct kevent *kevs = (struct kevent *)jlong_to_ptr(kevAddr); struct timespec ts; struct timespec *tsp; int result; // Java timeout is in milliseconds. Convert to struct timespec. // Java timeout == -1 : wait forever : timespec timeout of NULL // Java timeout == 0 : return immediately : timespec timeout of zero if (timeout >= 0) { ts.tv_sec = timeout / 1000; ts.tv_nsec = (timeout % 1000) * 1000000; //nanosec = 1 million millisec tsp = &ts; } else { tsp = NULL; } result = kevent(kq, NULL, 0, kevs, kevCount, tsp); if (result < 0) { if (errno == EINTR) { // ignore EINTR, pretend nothing was selected result = 0; } else { JNU_ThrowIOExceptionWithLastError(env, "KQueueArrayWrapper: kqueue failed"); } } return result; }
那么我们可以假设程序运行到这一步到时候
servChannel.register(selector, SelectionKey.OP_ACCEPT);
应该会调用到底层的函数Java_sun_nio_ch_KQueueArrayWrapper_register0。
我们来看一下:
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { synchronized (regLock) { if (!isOpen()) throw new ClosedChannelException(); if ((ops & ~validOps()) != 0) throw new IllegalArgumentException(); if (blocking) throw new IllegalBlockingModeException(); SelectionKey k = findKey(sel); if (k != null) { k.interestOps(ops); k.attach(att); } if (k == null) { // New registration synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException(); k = ((AbstractSelector)sel).register(this, ops, att); addKey(k); } } return k; } }
protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) { if (!(var1 instanceof SelChImpl)) { throw new IllegalSelectorException(); } else { SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this); var4.attach(var3); Set var5 = this.publicKeys; synchronized(this.publicKeys) { this.implRegister(var4); } var4.interestOps(var2); return var4; } }
进一步看impRegister.
protected void implRegister(SelectionKeyImpl var1) { if (this.closed) { throw new ClosedSelectorException(); } else { int var2 = IOUtil.fdVal(var1.channel.getFD()); this.fdMap.put(var2, new KQueueSelectorImpl.MapEntry(var1)); ++this.totalChannels; this.keys.add(var1); } }
发现并没有,这里仅仅是将Channel封装成SelectionKeyImpl塞入一个HashSet了,那么证明我们刚才的推断是错误的,那NIO是如何将注册的Channel和底层的kqueue关联起来呢。
我们在看这一步。
selector.select(1000);
查看jdk源码。
private int lockAndDoSelect(long var1) throws IOException { synchronized(this) { if (!this.isOpen()) { throw new ClosedSelectorException(); } else { Set var4 = this.publicKeys; int var10000; synchronized(this.publicKeys) { Set var5 = this.publicSelectedKeys; synchronized(this.publicSelectedKeys) { var10000 = this.doSelect(var1); } } return var10000; } } }
看看doSelect
protected int doSelect(long var1) throws IOException { boolean var3 = false; if (this.closed) { throw new ClosedSelectorException(); } else { this.processDeregisterQueue(); int var7; try { this.begin(); var7 = this.kqueueWrapper.poll(var1); } finally { this.end(); } this.processDeregisterQueue(); return this.updateSelectedKeys(var7); } }
查看poll
int poll(long var1) { this.updateRegistrations(); int var3 = this.kevent0(this.kq, this.keventArrayAddress, 128, var1); return var3; } void updateRegistrations() { LinkedList var1 = this.updateList; synchronized(this.updateList) { KQueueArrayWrapper.Update var2 = null; while((var2 = (KQueueArrayWrapper.Update)this.updateList.poll()) != null) { SelChImpl var3 = var2.channel; if (var3.isOpen()) { this.register0(this.kq, var3.getFDVal(), var2.events & Net.POLLIN, var2.events & Net.POLLOUT); } } } }
作者:msrpp
链接:https://www.jianshu.com/p/2cccad527509