java nio网络编程的一点心得

Posted on

java nio网络编程的一点心得

前几日用java nio写了一个tcp端口转发小工具,还颇费周折,其中一个原因在于网上资料很混乱,不少还是错误的。这篇文章中我会以一个EchoServer作为例子。先看《Java网络编程》中的写法,这也是在网上颇为常见的一个写法。 Java代码 收藏代码

  1. public class EchoServer {
  2. public static int DEFAULT_PORT = 7777;
  3. public static void main(String[] args) throws IOException {
  4. System.out.println("Listening for connection on port " + DEFAULT_PORT);
  5. Selector selector = Selector.open();
  6. initServer(selector);
  7. while (true) {
  8. selector.select();
  9. for (Iterator itor = selector.selectedKeys().iterator(); itor.hasNext();) {
  10. SelectionKey key = (SelectionKey) itor.next();
  11. itor.remove();
  12. try {
  13. if (key.isAcceptable()) {
  14. ServerSocketChannel server = (ServerSocketChannel) key.channel();
  15. SocketChannel client = server.accept();
  16. System.out.println("Accepted connection from " + client);
  17. client.configureBlocking(false);
  18. SelectionKey clientKey = client.register(selector, SelectionKey.OP_WRITE|SelectionKey.OP_READ);
  19. ByteBuffer buffer = ByteBuffer.allocate(100);
  20. clientKey.attach(buffer);
  21. }
  22. if (key.isReadable()) {
  23. SocketChannel client = (SocketChannel) key.channel();
  24. ByteBuffer buffer = (ByteBuffer) key.attachment();
  25. client.read(buffer);
  26. }
  27. if (key.isWritable()) {
  28. // System.out.println("is writable...");
  29. SocketChannel client = (SocketChannel) key.channel();
  30. ByteBuffer buffer = (ByteBuffer) key.attachment();
  31. buffer.flip();
  32. client.write(buffer);
  33. buffer.compact();
  34. }
  35. } catch (IOException e) {
  36. key.cancel();
  37. try { key.channel().close(); } catch (IOException ioe) { }
  38. }
  39. }
  40. }
  41. }
  42. private static void initServer(Selector selector) throws IOException,
  43. ClosedChannelException {
  44. ServerSocketChannel serverChannel = ServerSocketChannel.open();
  45. ServerSocket ss = serverChannel.socket();
  46. ss.bind(new InetSocketAddress(DEFAULT_PORT));
  47. serverChannel.configureBlocking(false);
  48. serverChannel.register(selector, SelectionKey.OP_ACCEPT);
  49. }
  50. }

public class EchoServer {

public static int DEFAULT_PORT = 7777;


public static void main(String[] args) throws IOException {
    System.out.println("Listening for connection on port " + DEFAULT_PORT);


    Selector selector = Selector.open();

    initServer(selector);


    while (true) {
        selector.select();


        for (Iterator<SelectionKey> itor = selector.selectedKeys().iterator(); itor.hasNext();) {

            SelectionKey key = (SelectionKey) itor.next();
            itor.remove();

            try {
                if (key.isAcceptable()) {

                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel client = server.accept();

                    System.out.println("Accepted connection from " + client);
                    client.configureBlocking(false);

                    SelectionKey clientKey = client.register(selector, SelectionKey.OP_WRITE|SelectionKey.OP_READ);
                    ByteBuffer buffer = ByteBuffer.allocate(100);

                    clientKey.attach(buffer);
                }

                if (key.isReadable()) {
                    SocketChannel client = (SocketChannel) key.channel();

                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    client.read(buffer);

                }
                if (key.isWritable()) {

                    // System.out.println("is writable...");
                    SocketChannel client = (SocketChannel) key.channel();

                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    buffer.flip();

                    client.write(buffer);
                    buffer.compact();

                }
            } catch (IOException e) {

                key.cancel();
                try { key.channel().close(); } catch (IOException ioe) { }

            }
        }

    }
}


private static void initServer(Selector selector) throws IOException,

        ClosedChannelException {
    ServerSocketChannel serverChannel = ServerSocketChannel.open();

    ServerSocket ss = serverChannel.socket();
    ss.bind(new InetSocketAddress(DEFAULT_PORT));

    serverChannel.configureBlocking(false);
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);

}

} 上面的代码很典型,运行结果似乎也是正确的。 Java代码 收藏代码

  1. marlon$ java EchoServer&
  2. --> Listening for connection on port 7777
  3. marlon$ telnet localhost 7777
  4. --> Accepted connection from java.nio.channels.SocketChannel[connected local=/127.0.0.1:7777 remote=/127.0.0.1:65030]
  5. hello
  6. --> hello
  7. world
  8. -->world

marlon$ java EchoServer&

--> Listening for connection on port 7777 marlon$ telnet localhost 7777

--> Accepted connection from java.nio.channels.SocketChannel[connected local=/127.0.0.1:7777 remote=/127.0.0.1:65030] hello

--> hello world

-->world 但是如果你这时top用看一下发现服务器进程CPU占用到95%以上,如果取消掉32行的注释,服务器会不断地输出"is writable...",这是为什么呢?让我们来分析当第一个客户端连接上时发生什么情况。

  1. 在连接之前,服务器第11行:selector.select()处阻塞。当阻塞时,内核会将这个进程调度至休眠状态,此时基本不耗CPU。
  2. 当客户端发起一个连接时,服务器检测到客户端连接,selector.select()返回。selector.selectedKeys()返回已就绪的SelectionKey的集合,在这种情况下,它只包含一个key,也就是53行注册的acceptable key。服务器开始运行17-25行的代码,server.accept()返回代码客户端连接的socket,第22行在socket上注册OP_READ和OP_WRITE,表示当socket可读或者可写时就会通知selector。
  3. 接着服务器又回到第11行,尽管这时客户端还没有任何输入,但这时selector.select()不会阻塞,因为22行在socket注册了写操作,而socket只要send buffer不满就可以写,刚开始send buffer为空,socket总是可以写,于是server.select()立即返回,包含在22行注册的key。由于这个key可写,所以服务器会运行31-38行的代码,但是这时buffer为空,client.write(buffer)没有向socket写任何东西,立即返回0。
  4. 接着服务器又回到第11行,由于客户端连接socket可以写,这时selector.select()会立即返回,然后运行31-38行的代码,像步骤3一样,由于buffer为空,服务器没有干任何事又返回到第11行,这样不断循环,服务器却实际没有干事情,却耗大量的CPU。 从上面的分析可以看出问题在于我们在没有数据可写时就在socket上注册了OP_WRITE,导致服务器浪费大量CPU资源,解决办法是只有数据可以写时才注册OP_WRITE操作。上面的版本还不只浪费CPU那么简单,它还可能导致潜在的死锁。虽然死锁在我的机器上没有发生,对于这个简单的例子似乎也不大可能发生在别的机器上,但是在对于复杂的情况,比如我写的端口转发工具中就发生了,这还依赖于jdk的实现。对于上面的EchoServer,出现死锁的场景是这样的:

  5. 假设服务器已经启动,并且已经有一个客户端与它相连,此时正如上面的分析,服务器在不断地循环做无用功。这时用户在客户端输入"hello"。

  6. 当服务器运行到第11行:selector.select()时,这时selector.selectedKeys()会返回一个代表客户端连接的key,显然这时客户端socket是既可读又可写,但jdk却并不保证能够检测到两种状态。如果它检测到key既可读又可写,那么服务器会执行26-38行的代码。如果只检测到可读,那么服务器会执行26-30行的代码。如果只检测到可写,那么会执行31-38行的代码。对于前两种情况,不会造成死锁,因为当执行完29行,buffer会读到用户输入的内容,下次再运行到36行就可以将用户输入内容echo回。但是对最后一种情况,服务器完全忽略了客户端发过来的内容,如果每次selector.select()都只能检测到socket可写,那么服务器永远不能将echo回客户端输入的内容。 避免死锁的一个简单方法就是不要在同一个socket同时注册多个操作。对于上面的EchoServer来说就是不要同时注册OP_READ和OP_WRITE,要么只注册OP_READ,要么只注册OP_WRITE。下面的EchoServer修正了以上的错误: Java代码 收藏代码

  7. public static void main(String[] args) throws IOException {

  8. System.out.println("Listening for connection on port " + DEFAULT_PORT);
  9. Selector selector = Selector.open();
  10. initServer(selector);
  11. while (true) {
  12. selector.select();
  13. for (Iterator itor = selector.selectedKeys().iterator(); itor.hasNext();) {
  14. SelectionKey key = (SelectionKey) itor.next();
  15. itor.remove();
  16. try {
  17. if (key.isAcceptable()) {
  18. ServerSocketChannel server = (ServerSocketChannel) key.channel();
  19. SocketChannel client = server.accept();
  20. System.out.println("Accepted connection from " + client);
  21. client.configureBlocking(false);
  22. SelectionKey clientKey = client.register(selector, SelectionKey.OP_READ);
  23. ByteBuffer buffer = ByteBuffer.allocate(100);
  24. clientKey.attach(buffer);
  25. } else if (key.isReadable()) {
  26. SocketChannel client = (SocketChannel) key.channel();
  27. ByteBuffer buffer = (ByteBuffer) key.attachment();
  28. int n = client.read(buffer);
  29. if (n > 0) {
  30. buffer.flip();
  31. key.interestOps(SelectionKey.OP_WRITE); // switch to OP_WRITE
  32. }
  33. } else if (key.isWritable()) {
  34. System.out.println("is writable...");
  35. SocketChannel client = (SocketChannel) key.channel();
  36. ByteBuffer buffer = (ByteBuffer) key.attachment();
  37. client.write(buffer);
  38. if (buffer.remaining() == 0) { // write finished, switch to OP_READ
  39. buffer.clear();
  40. key.interestOps(SelectionKey.OP_READ);
  41. }
  42. }
  43. } catch (IOException e) {
  44. key.cancel();
  45. try { key.channel().close(); } catch (IOException ioe) { }
  46. }
  47. }
  48. }
  49. }

    public static void main(String[] args) throws IOException {

     System.out.println("Listening for connection on port " + DEFAULT_PORT);
    
    Selector selector = Selector.open();
    initServer(selector);


    while (true) {

        selector.select();


        for (Iterator<SelectionKey> itor = selector.selectedKeys().iterator(); itor.hasNext();) {
            SelectionKey key = (SelectionKey) itor.next();

            itor.remove();
            try {

                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();

                    SocketChannel client = server.accept();
                    System.out.println("Accepted connection from " + client);

                    client.configureBlocking(false);
                    SelectionKey clientKey = client.register(selector, SelectionKey.OP_READ);

                    ByteBuffer buffer = ByteBuffer.allocate(100);
                    clientKey.attach(buffer);

                } else if (key.isReadable()) {
                    SocketChannel client = (SocketChannel) key.channel();

                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    int n = client.read(buffer);

                    if (n > 0) {
                        buffer.flip();

                        key.interestOps(SelectionKey.OP_WRITE);        // switch to OP_WRITE
                    }

                } else if (key.isWritable()) {
                    System.out.println("is writable...");

                    SocketChannel client = (SocketChannel) key.channel();
                    ByteBuffer buffer = (ByteBuffer) key.attachment();

                    client.write(buffer);
                    if (buffer.remaining() == 0) {    // write finished, switch to OP_READ

                        buffer.clear();
                        key.interestOps(SelectionKey.OP_READ);

                    }
                }

            } catch (IOException e) {
                key.cancel();

                try { key.channel().close(); } catch (IOException ioe) { }
            }

        }
    }

}

主要变化,在第19行接受客户端连接时只注册OP_READ操作,第28行当读到数据时才切换到OP_WRITE操作,第35-38行,当写操作完成时再切换到OP_READ操作。由于一个key同时只能执行一个操作,我将原来三个并行if换成了if...else。 上面的代码不够优雅,它将处理服务器Socket和客户连接Socket的代码搅在一起,对于简单的EchoServer这样做没什么问题,当服务器变得复杂,使用命令模式将它们分开变显得非常必要。首先创建一个接口来抽象对SelectionKey的处理。 Java代码 收藏代码

  1. interface Handler {
  2. void execute(Selector selector, SelectionKey key);
  3. }

    interface Handler {

     void execute(Selector selector, SelectionKey key);
    

    } 再来看main函数: Java代码 收藏代码

  4. public static void main(String[] args) throws IOException {

  5. System.out.println("Listening for connection on port " + DEFAULT_PORT);
  6. Selector selector = Selector.open();
  7. initServer(selector);
  8. while (true) {
  9. selector.select();
  10. for (Iterator itor = selector.selectedKeys().iterator(); itor.hasNext();) {
  11. SelectionKey key = (SelectionKey) itor.next();
  12. itor.remove();
  13. Handler handler = (Handler) key.attachment();
  14. handler.execute(selector, key);
  15. }
  16. }
  17. }
  18. private static void initServer(Selector selector) throws IOException,
  19. ClosedChannelException {
  20. ServerSocketChannel serverChannel = ServerSocketChannel.open();
  21. ServerSocket ss = serverChannel.socket();
  22. ss.bind(new InetSocketAddress(DEFAULT_PORT));
  23. serverChannel.configureBlocking(false);
  24. SelectionKey serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
  25. serverKey.attach(new ServerHandler());
  26. }

    public static void main(String[] args) throws IOException {

     System.out.println("Listening for connection on port " + DEFAULT_PORT);
    
    Selector selector = Selector.open();
    initServer(selector);


    while (true) {

        selector.select();


        for (Iterator<SelectionKey> itor = selector.selectedKeys().iterator(); itor.hasNext();) {
            SelectionKey key = (SelectionKey) itor.next();

            itor.remove();
            Handler handler = (Handler) key.attachment();

            handler.execute(selector, key);
        }

    }
}


private static void initServer(Selector selector) throws IOException,

        ClosedChannelException {
    ServerSocketChannel serverChannel = ServerSocketChannel.open();

    ServerSocket ss = serverChannel.socket();
    ss.bind(new InetSocketAddress(DEFAULT_PORT));

    serverChannel.configureBlocking(false);
    SelectionKey serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT);

    serverKey.attach(new ServerHandler());
}

main函数非常简单,迭代SelectionKey,对每个key的attachment为Handler,调用它的execute的方法,不用管它是服务器Socket还是客户Socket。注意initServer方法将serverKey附加了一个ServerHandler。下面是ServerHandler的代码: Java代码 收藏代码

  1. class ServerHandler implements Handler {
  2. public void execute(Selector selector, SelectionKey key) {
  3. ServerSocketChannel server = (ServerSocketChannel) key.channel();
  4. SocketChannel client = null;
  5. try {
  6. client = server.accept();
  7. System.out.println("Accepted connection from " + client);
  8. } catch (IOException e) {
  9. e.printStackTrace();
  10. return;
  11. }
  12. SelectionKey clientKey = null;
  13. try {
  14. client.configureBlocking(false);
  15. clientKey = client.register(selector, SelectionKey.OP_READ);
  16. clientKey.attach(new ClientHandler());
  17. } catch (IOException e) {
  18. if (clientKey != null)
  19. clientKey.cancel();
  20. try { client.close(); } catch (IOException ioe) { }
  21. }
  22. }
  23. }

    class ServerHandler implements Handler {

     public void execute(Selector selector, SelectionKey key) {
         ServerSocketChannel server = (ServerSocketChannel) key.channel();
    
         SocketChannel client = null;
         try {
    
             client = server.accept();
             System.out.println("Accepted connection from " + client);
    
         } catch (IOException e) {
             e.printStackTrace();
    
             return;
         }
    
        SelectionKey clientKey = null;

        try {
            client.configureBlocking(false);

            clientKey = client.register(selector, SelectionKey.OP_READ);
            clientKey.attach(new ClientHandler());

        } catch (IOException e) {
            if (clientKey != null)

                clientKey.cancel();
            try { client.close(); } catch (IOException ioe) { }

        }
    }

}

ServerHandler接收连接,为每个客户Socket注册OP_READ操作,返回的clientKey附加上ClientHandler。 Java代码 收藏代码

  1. class ClientHandler implements Handler {
  2. private ByteBuffer buffer;
  3. public ClientHandler() {
  4. buffer = ByteBuffer.allocate(100);
  5. }
  6. public void execute(Selector selector, SelectionKey key) {
  7. try {
  8. if (key.isReadable()) {
  9. readKey(selector, key);
  10. } else if (key.isWritable()) {
  11. writeKey(selector, key);
  12. }
  13. } catch (IOException e) {
  14. key.cancel();
  15. try { key.channel().close(); } catch (IOException ioe) { }
  16. }
  17. }
  18. private void readKey(Selector selector, SelectionKey key) throws IOException {
  19. SocketChannel client = (SocketChannel) key.channel();
  20. int n = client.read(buffer);
  21. if (n > 0) {
  22. buffer.flip();
  23. key.interestOps(SelectionKey.OP_WRITE); // switch to OP_WRITE
  24. }
  25. }
  26. private void writeKey(Selector selector, SelectionKey key) throws IOException {
  27. // System.out.println("is writable...");
  28. SocketChannel client = (SocketChannel) key.channel();
  29. client.write(buffer);
  30. if (buffer.remaining() == 0) { // write finished, switch to OP_READ
  31. buffer.clear();
  32. key.interestOps(SelectionKey.OP_READ);
  33. }
  34. }
  35. }

    class ClientHandler implements Handler {

     private ByteBuffer buffer;
    
    public ClientHandler() {
        buffer = ByteBuffer.allocate(100);

    }


    public void execute(Selector selector, SelectionKey key) {
        try {

            if (key.isReadable()) {
                readKey(selector, key);

            } else if (key.isWritable()) {
                writeKey(selector, key);

            }
        } catch (IOException e) {

            key.cancel();
            try { key.channel().close(); } catch (IOException ioe) { }

        }
    }


    private void readKey(Selector selector, SelectionKey key) throws IOException {

        SocketChannel client = (SocketChannel) key.channel();
        int n = client.read(buffer);

        if (n > 0) {
            buffer.flip();

            key.interestOps(SelectionKey.OP_WRITE);        // switch to OP_WRITE
        }

    }


    private void writeKey(Selector selector, SelectionKey key) throws IOException {
        // System.out.println("is writable...");

        SocketChannel client = (SocketChannel) key.channel();
        client.write(buffer);

        if (buffer.remaining() == 0) {    // write finished, switch to OP_READ
            buffer.clear();

            key.interestOps(SelectionKey.OP_READ);
        }

    }
}

这个代码没有什么新内容,只是将根据key是可读还可写拆分为两个方法,代码结构显得更清晰。对于EchoServer,这么做确实有些过度工程,对于稍微复杂一点的服务器这么做是很值得的。 代码:EchoServer.java, EchoServer2.java, EchoServer3.java 参考:

  1. The Rox Java NIO Tutorial
  2. Architecture of a Highly Scalable NIO-Based Server
希望本站内容对您有点用处,有什么疑问或建议请在后面留言评论
转载请注明作者(RobinChia)和出处 It so life ,请勿用于任何商业用途