谈谈java selector的机制
1)常用数据结构EPollSelectorImpl
维护了3个键set
keys【已经注册的键的集合】
selectedKeys【已选择的键的集合】
cancelledKeys【已取消的键的集合】
EPollArrayWrapper【真正的对linux epoll的封装】
包含了3个重要的native方法epollCreate、epollCtl、epollWait分别对应库函数epoll_create、epoll_ctl、epoll_wait
一个native实例pollArray模拟库中struct epoll_event
SelectionKeyImpl
表示了一个特定的通道对象和一个特定的选择器对象之间的注册 关系
包含两个以整数形式进行编码的比特掩码
关心的操作(interestOps)
表示通道准备好要执行的操作(readyOps)
SocketChannelImpl/ServerSocketChannelImpl: 分别是连接socket和监听socket
2)一段代码
有了上面的概念,接下来看一段简单的代码,采用select实现的echo server
[java]
<pre name="code" class="java">public class SelectTest {
public static int PORT_NUMBER = 1234;
public static void main(String args[]) throws Exception {
new SelectTest().go();
}
public void go() throws Exception {
int port = PORT_NUMBER;
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverChannel.socket();
// Set the port the server channel will listen to
serverSocket.bind(new InetSocketAddress(port));
// Set nonblocking mode for the listening socket
serverChannel.configureBlocking(false);
Selector selector = Selector.open();
// Register the ServerSocketChannel with the Selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// This may block for a long time. Upon returning, the // selected
// set contains keys of the ready channels.
int n = selector.select();
if (n == 0) {
// nothing to do
continue;
}
// Get an iterator over the set of selected keys
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
// Look at each key in the selected set while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
// step1 Is a new connection coming in?
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key
.channel();
SocketChannel channel = server.accept();
registerChannel(selector, channel, SelectionKey.OP_READ);
sayHello(channel);
}
// step2 Is there data to read on this channel?
if (key.isReadable()) {
readDataFromSocket(key);
}
// step3 Remove key from selected set; it's been handled
it.remove();
}
}
protected void registerChannel(Selector selector,
SelectableChannel channel, int ops) throws Exception {
if (channel == null) {
return; // could happen
}
// Set the new channel nonblocking
channel.configureBlocking(false);
// Register it with the selector
channel.register(selector, ops);
}
// Use the same byte buffer for all channels. A single thread is //
// servicing all the channels, so no danger of concurrent acccess.
private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
protected void readDataFromSocket(SelectionKey key) throws Exception {
SocketChannel socketChannel = (SocketChannel) key.channel();
int count;
buffer.clear(); // Empty buffer
// Loop while data is available; channel is nonblocking
while ((count = socketChannel.read(buffer)) > 0) {
buffer.flip(); // Make buffer readable
// Send the data; don't assume it goes all at once
补充:软件开发 , Java ,