ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

NIO - 非阻塞式IO

2022-01-12 21:37:00  阅读:146  来源: 互联网

标签:IO NIO buffer 阻塞 System ByteBuffer println out


NIO, No blocking IO, 非阻塞式IO。指的是在发生IO操作时不会产生阻塞,利用NIO可以处理高并发和高访问的场景。和之相对的是BIO, Blocking IO,这是标准的阻塞IO。

BIO

一个经典的BIO的例子是一个最简单的C/S模型程序。以下为代码:


public class BIODemoServer {

    public static void main(String[] args) throws IOException {
        ServerSocket ss = new ServerSocket();
        ss.bind(new InetSocketAddress(9999));

        System.out.println("wait for connecting ...");

        Socket socket = ss.accept();    // 此处产生阻塞

        System.out.println("a connector is accessed!");

        InputStream input = socket.getInputStream();
        input.read();

        System.out.println("we had some data!");
    }
}

public class BIODemoClient {

    public static void main(String[] args) throws IOException {
        Socket socket = new Socket();
        System.out.println("trying to connect ...");
        socket.connect(new InetSocketAddress("127.0.0.1", 9999));
        System.out.println("connect success!");
    }

}

accept()和connect()都是阻塞方法,在调用的时候,会启动一个阻塞。accept()方法在等到客户端连接上时会释放阻塞;connect()方法在连接上服务器的时候会释放阻塞。

同样,ServerSocket的getInputStream()获取的也是阻塞的IO流。调用read()方法之后,如果客户端迟迟没有向服务端输出数据,那么服务端将会一直卡这里,直到客户端向服务端写数据。

而实际上,客户端的OutputStream的write()方法也是阻塞的。如果服务端没有接收数据。那么write()方法会一直往外(一般是网卡设备的缓冲区)写数据,直到写出到一定量,没有任何一方接收,也就发生了阻塞。

NIO

和BIO相对,NIO就可以实现非阻塞式的IO。也就是说在IO发生的时候,不会产生任何阻塞。

NIO的核心是通道(Channel),IO就是在这个Channel中实现的。Channel的特点在于它可以打开或关闭。打开的Channel连接了某一个实体(设备,文件,网络套接字等),可以通过对Channel的IO操作来读写数据。并且可以配置整个读写过程是没有任何阻塞的。

Channel允许多个线程同时访问,并且可以保存整个过程是线程安全的。

Channel在java中是一个接口,用的最多的实现类有:ServerSocketChannel, SocketChannel。两个类底层是基于TCP协议的。

以下是最简单的NIO使用示例:


public class NIODemoServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        // 要想实现非阻塞,必须执行这条语句,否则仍为阻塞
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8888));
        System.out.println("a client has been connected!");
        ssc.accept();   // 此处不再阻塞
    }

}

public class NIODemoClient {

    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.configureBlocking(false);   // 设置非阻塞
        sc.connect(new InetSocketAddress("127.0.0.1", 8888));  // 此处不再阻塞
        System.out.println("connect success!");
    }

}

Channel进行IO操作时,用到的元数据是ByteBuffer,这是一个字节缓存区。是一个特定的基本类型元素的有限序列。其本身是一个抽象类,调用allocate(int)静态方法返回对象。

其包含三个重要属性:

  • capacity: 缓存区能够保存的元素最大个数
  • limit: 限制,从缓存区最多能取的数据个数,从0开始计。如果超出则抛出java.nio.BufferUnderflowException异常。
  • position: 当前位置。指的是从哪个位置开始取出数据。

ByteBuffer实际上就是Channel中数据的载体。就好比隧道里的汽车一样,人是数据,汽车是ByteBuffer,隧道是Channel。

allocate(int)方法可以创建一个指定长度的ByteBuffer。

ByteBuffer有一个put(byte)方法,其接受一个字节的数据,指的就是把数据写入缓存区。要想取出数据,则调用get()方法。

每当向ByteBuffer中put一个byte数据之后,position就会递增1。下一次put或get就从position的位置继续,所以以下代码输出的是两个0而不是1.(缓存区中的所有数据默认是0)

public class ByteBufferDemo {

    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        byte a = 1;
        byte b = 2;
        buffer.put(a);
        buffer.put(b);
        System.out.println(buffer.get());
        System.out.println(buffer.get());
    }

}

如果想正确取出a和b,那么可以通过position(int)方法修改position位置,通过limit(int)方法可以限制取出数据的个数:

public class ByteBufferDemo {

    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        byte a = 1;
        byte b = 2;
        buffer.put(a);
        buffer.put(b);
        buffer.position(0);  // 从第一个数据开始取出
        buffer.limit(2);     // 只允许取出两个数据
        System.out.println(buffer.get());
        System.out.println(buffer.get());
//      System.out.println(buffer.get());   // 报异常
    }
}

考虑为什么第三个输出会抛异常。因为limit限制了只能取出position为0-1两个byte的数据。第三个输出试图取出position为2的数据,自然抛出异常。

ByteBuffer还有一些很有意思的方法:

  • flip(): 反转缓存区,把limit设置为position,把position置为0。
  • clear(): 清空缓存区,把position置为0,limit置为capacity。可见这个方法并没有真正地清空缓存区,而是把缓存区的状态设置为初始值。
  • hasRemaining(): 返回缓存区中是否还剩余有效的数据。其本质是判断position是否小于limit。

那么,遍历缓存区有效数据的代码如下:

public class ByteBufferDemo {

    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(10);
        buffer.put((byte) 1);
        buffer.put((byte) 2);
        buffer.put((byte) 3);

        buffer.flip();
        while (buffer.hasRemaining()) {
            System.out.println(buffer.get());
        }

    }

}

我们可以把一个byte数组直接封装到ByteBuffer里头。调用静态方法warp(byte[])返回一个大小刚刚好为该byte数组的ByteBuffer。

现在我们有了车(ByteBuffer),并且知道了如何把数据装到车里(put方法),下一步就是如何让车通过隧道了。

ServerSocketChannel的accept()方法会返回一个SocketChannel实例。调用这个实例的read(ByteBuffer)就可以读取数据了。

注意这个返回的实例默认是非阻塞,还是需要调用configureBlocking(boolean)来设置其是否阻塞。

但是因为这个aceept()是不阻塞的,所以如果没有客户端连接服务器,那么将会返回null。可以利用这个来判断是否有客户端连接。

或者是我们可以在accept()处人为设置一个阻塞,来确保有客户端连接上服务端。

public class NIODemoServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(8888));
        // 要想实现非阻塞,必须执行这条语句,否则仍为阻塞
        ssc.configureBlocking(false);

        System.out.println("wait for client...");

        SocketChannel sc = null;
        while (sc == null) {   // 手动进行阻塞
            sc = ssc.accept();
        }

        System.out.println("a client has been connected!");

        ByteBuffer buffer = ByteBuffer.allocate(10);
        sc.configureBlocking(false);   // 将通道设置为非阻塞
        sc.read(buffer);

        System.out.println("We have read some data!");
        System.out.println("data: " + new String(buffer.array()));

    }

}

public class NIODemoClient {

    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.configureBlocking(false);   // 设置非阻塞

        System.out.println("trying to connect...");

        sc.connect(new InetSocketAddress("127.0.0.1", 8888));
        if (!sc.isConnected()) {   // 如果连接没有成功,继续进行连接...
            sc.finishConnect();
        }

        System.out.println("connect Server success!");

        // 开始向服务器写入数据
        ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());
        sc.write(buffer);

        System.out.println("write success!");

        while (true);  // 保持连接
    }

}

Selector

一旦使用NIO,稍有不慎就会产生各种异常。因此我们需要一种设计模式能够应对NIO的情况。

最简单的做法是每有一个客户端连接上服务端时,服务端启动一个单独的线程来处理客户端的请求。

这个模型有如下的缺点:

  • 如果并发量大,会导致服务端线程过多,很可能导致其宕机。
  • 如果客户端在连接服务端之后不进行任何操作,那么连接将会一直保持着,会占用过多无用的CPU调度。
  • 接上,还会导致真正需要被处理的线程无法及时地被服务。

为了解决闲置线程的问题,我们可以将闲置的线程设置为阻塞状态。这样CPU在调度的时候不会调度这种线程。所以需要引入事件的监听机制。

这里引入Selector(多路复用选择器)。它起到的就是事件监听的作用。

Selector会实时监听所有连接上服务器的线程。如果线程没有做任何事情,Selector会让线程沉睡。如果线程发生了accept(),connect(),write(),read()行为,Selector会让线程醒来并执行操作。

Selector从一定程度上减少了线程调度器的压力,让CPU在大部分时间内可以做有意义的事情。

在处理短请求的情况下,为了解决线程过多的情况,可以使用一个线程来处理所有的客户端请求。使用NIO技术可以实现多用户的并发,使用Selector来判断事件的发生。

java已经提供好Selector了,实例化Selector使用Selector的open()静态方法。

要想让Selector提供监听,需要先注册事件,这些事件定义在SelectionKey中,均是int型常数。常见的事件有:

  • OP_ACCEPT: 表示接收事件
  • OP_CONNCT: 表示连接事件
  • OP_WRITE: 表示写事件
  • OP_READ: 表示读事件

通过ServerSocketChannel的register(Selector,int)方法可以注册Selector。

调用Selector的select()方法,会产生一个阻塞,直到有注册的监听事件被触发。

当有任何一个事件触发后,我们可以通过selectedKeys()获取触发事件的集合。这表示所有连接上服务端的客户集合。那么遍历这个集合就达到了遍历所有已经连上的客户端的作用。

这个集合保存的是SelectionKey对象。每个对象可以判断是否发生了某个事件(上述4个事件)。通过这样的判断我们可以对不同的事件进行处理。

在accept()事件发生之后,需要调用SelectionKey的channel()方法取得当前会话的ServerSocketChannel对象(需要强制转换),随后调用accept()。

其它事件的处理技巧类似,不过channel()方法将返回SocketChannel,这个SocketChannel就是accept()后通过ServerSocketChannel取得的socketChannel,所以这里可以不用设置非阻塞了。

在本示例中,在客户端连接上服务器之后(也就是ACCEPT事件发生之后),服务器就可以向客户端写数据了。也就是保持连接的客户端的WRITE事件是一直发生的。

而因为NIO,服务端可以持续不断向客户端写数据。所以在写数据的时候,我们需要进行一些处理,防止服务端向客户端过度写数据。

可以通过一些简单的二进制计算,在WRITE或READ事件发生之后,就把发生过的事件给去掉。这里可以通过SelectionKey的interestOps()方法取得现有事件,然后与当前事件的取反结果进行一次按位与计算即可。

public class Server {

    // implement a Selector Server
    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);   // use no-blocking.
        ssc.bind(new InetSocketAddress(6666));

        // create a selector
        Selector selector = Selector.open();
        // register the selector and event ACCEPT.
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            selector.select();  // block here ... wait for client connect ...

            // get all events' set.
            Set<SelectionKey> set = selector.selectedKeys();
            Iterator<SelectionKey> ite = set.iterator();
            while (ite.hasNext()) {

                SelectionKey key = ite.next();
                if (key.isAcceptable()) {      // ACCEPT event.

                    // the serverSocketChannel for new client.
                    ServerSocketChannel serverSocketChannel =
                            (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = null;
                    while (socketChannel == null) {
                        socketChannel = serverSocketChannel.accept();
                    }
                    socketChannel.configureBlocking(false);

                    System.out.println("Thread " + Thread.currentThread().getId()
                        + " is now servicing a client ...");

                    // register WRITE and READ event for socketChannel.
                    socketChannel.register(selector, SelectionKey.OP_READ
                            | SelectionKey.OP_WRITE);

                }

                if (key.isReadable()) {  // READ event.

                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(10);
                    socketChannel.read(buffer);
                    System.out.println("Server has read some data: "
                            + new String(buffer.array()));

                    // remove current event.
                    socketChannel.register(selector, key.interestOps() &
                            ~SelectionKey.OP_WRITE);
                }

                // WRITE event, this will happen after connecting success.
                if (key.isWritable()) {

                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.wrap("nihao!".getBytes());
                    socketChannel.write(buffer);
                    System.out.println("Server has wrote some data to client.");

                    // remove current event.
                    socketChannel.register(selector, key.interestOps() &
                            ~SelectionKey.OP_WRITE);

                }

                // delete this event, to prevent repeated calls to the event.
                ite.remove();
            }
        }
    }
}

public class Client {

    public static void main(String[] args) throws IOException,
            InterruptedException {

        SocketChannel sc = SocketChannel.open();
        sc.configureBlocking(false);

        sc.connect(new InetSocketAddress(
                "127.0.0.1", 6666));

        while (!sc.isConnected()) {
            sc.finishConnect();
        }

        Thread.sleep(2000);

        // write data to Server.
        System.out.println("Write some data to Server...");
        ByteBuffer buffer = ByteBuffer.wrap("hello!".getBytes());
        sc.write(buffer);

        Thread.sleep(2000);

        // read data from Server.
        ByteBuffer receive = ByteBuffer.allocate(20);
        sc.read(receive);

        System.out.println("Client received some data: "
                + new String(receive.array()));

        while (true);
    }

}

NIO因为非阻塞的特性,要实现正常的数据交互代价是很大的。上面的代码通过sleep()方法延迟获取数据,但在实际中是不可能这么用的

标签:IO,NIO,buffer,阻塞,System,ByteBuffer,println,out
来源: https://www.cnblogs.com/lazycat7706/p/15795380.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有