当前位置 : 主页 > 编程语言 > java >

Java BIO,NIO,AIO总结

来源:互联网 收集:自由互联 发布时间:2021-04-10
Java 中的 BIO、NIO和 AIO 理解为是 Java 语言对操作系统的各种 IO 模型的封装。程序员在使用这些 API 的时候,不需要关心操作系统层面的知识,也不需要根据不同操作系统编写不同的代码

Java 中的 BIO、NIO和 AIO 理解为是 Java 语言对操作系统的各种 IO 模型的封装。程序员在使用这些 API 的时候,不需要关心操作系统层面的知识,也不需要根据不同操作系统编写不同的代码。只需要使用Java的API就可以了。

在讲 BIO,NIO,AIO 之前先来回顾一下这样几个概念:同步与异步,阻塞与非阻塞。 同步与异步

  • 同步: 同步就是发起一个调用后,被调用者未处理完请求之前,调用不返回。
  • 异步: 异步就是发起一个调用后,立刻得到被调用者的回应表示已接收到请求,但是被调用者并没有返回结果,此时我们可以处理其他的请求,被调用者通常依靠事件,回调等机制来通知调用者其返回结果。 同步和异步的区别最大在于异步的话调用者不需要等待处理结果,被调用者会通过回调等机制来通知调用者其返回结果。

阻塞和非阻塞

  • 阻塞: 阻塞就是发起一个请求,调用者一直等待请求结果返回,也就是当前线程会被挂起,无法从事其他任务,只有当条件就绪才能继续。
  • 非阻塞: 非阻塞就是发起一个请求,调用者不用一直等着结果返回,可以先去干其他事情。 举个生活中简单的例子,你妈妈让你烧水,小时候你比较笨啊,在那里傻等着水开(同步阻塞)。等你稍微再长大一点,你知道每次烧水的空隙可以去干点其他事,然后只需要时不时来看看水开了没有(同步非阻塞)。后来,你们家用上了水开了会发出声音的壶,这样你就只需要听到响声后就知道水开了,在这期间你可以随便干自己的事情,你需要去倒水了(异步非阻塞)。

BIO (Blocking I/O)

同步阻塞I/O模式,数据的读取写入必须阻塞在一个线程内等待其完成。

传统 BIO

BIO通信(一请求一应答)模型图如下(图源网络,原出处不明):

采用 BIO 通信模型 的服务端,通常由一个独立的 Acceptor 线程负责监听客户端的连接。我们一般通过在while(true) 循环中服务端会调用 accept() 方法等待接收客户端的连接的方式监听请求,请求一旦接收到一个连接请求,就可以建立通信套接字在这个通信套接字上进行读写操作,此时不能再接收其他客户端连接请求,只能等待同当前连接的客户端的操作执行完成, 不过可以通过多线程来支持多个客户端的连接,如上图所示。

如果要让 BIO 通信模型 能够同时处理多个客户端请求,就必须使用多线程(主要原因是socket.accept()、socket.read()、socket.write() 涉及的三个主要函数都是同步阻塞的),也就是说它在接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,线程销毁。这就是典型的 一请求一应答通信模型 。我们可以设想一下如果这个连接不做任何事情的话就会造成不必要的线程开销,不过可以通过 线程池机制 改善,线程池还可以让线程的创建和回收成本相对较低。使用FixedThreadPool 可以有效的控制了线程的最大数量,保证了系统有限的资源的控制,实现了N(客户端请求数量):M(处理客户端请求的线程数量)的伪异步I/O模型(N 可以远远大于 M),下面一节"伪异步 BIO"中会详细介绍到。

我们再设想一下当客户端并发访问量增加后这种模型会出现什么问题?

在 Java 虚拟机中,线程是宝贵的资源,线程的创建和销毁成本很高,除此之外,线程的切换成本也是很高的。尤其在 Linux 这样的操作系统中,线程本质上就是一个进程,创建和销毁线程都是重量级的系统函数。如果并发访问量增加会导致线程数急剧膨胀可能会导致线程堆栈溢出、创建新线程失败等问题,最终导致进程宕机或者僵死,不能对外提供服务。

伪异步 IO

为了解决同步阻塞I/O面临的一个链路需要一个线程处理的问题,后来有人对它的线程模型进行了优化一一一后端通过一个线程池来处理多个客户端的请求接入,形成客户端个数M:线程池最大线程数N的比例关系,其中M可以远远大于N.通过线程池可以灵活地调配线程资源,设置线程的最大值,防止由于海量并发接入导致线程耗尽。

伪异步IO模型图(图源网络,原出处不明):

采用线程池和任务队列可以实现一种叫做伪异步的 I/O 通信框架,它的模型图如上图所示。当有新的客户端接入时,将客户端的 Socket 封装成一个Task(该任务实现java.lang.Runnable接口)投递到后端的线程池中进行处理,JDK 的线程池维护一个消息队列和 N 个活跃线程,对消息队列中的任务进行处理。由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。

伪异步I/O通信框架采用了线程池实现,因此避免了为每个请求都创建一个独立线程造成的线程资源耗尽问题。不过因为它的底层仍然是同步阻塞的BIO模型,因此无法从根本上解决问题。

代码示例

下面代码中演示了BIO通信(一请求一应答)模型。我们会在客户端创建多个线程依次连接服务端并向其发送"当前时间+:hello world",服务端会为每个客户端线程创建一个线程来处理。代码示例出自闪电侠的博客,原地址如下:

客户端

/**
 * 
 * @author 闪电侠
 * @date 2018年10月14日
 * @Description:客户端
 */
public class IOClient {

 public static void main(String[] args) {
  // TODO 创建多个线程,模拟多个客户端连接服务端
  new Thread(() -> {
   try {
    Socket socket = new Socket("127.0.0.1", 3333);
    while (true) {
     try {
      socket.getOutputStream().write((new Date() + ": hello world").getBytes());
      Thread.sleep(2000);
     } catch (Exception e) {
     }
    }
   } catch (IOException e) {
   }
  }).start();

 }

}

服务端

/**
 * @author 闪电侠
 * @date 2018年10月14日
 * @Description: 服务端
 */
public class IOServer {

 public static void main(String[] args) throws IOException {
  // TODO 服务端处理客户端连接请求
  ServerSocket serverSocket = new ServerSocket(3333);

  // 接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理
  new Thread(() -> {
   while (true) {
    try {
     // 阻塞方法获取新的连接
     Socket socket = serverSocket.accept();

     // 每一个新的连接都创建一个线程,负责读取数据
     new Thread(() -> {
      try {
       int len;
       byte[] data = new byte[1024];
       InputStream inputStream = socket.getInputStream();
       // 按字节流方式读取数据
       while ((len = inputStream.read(data)) != -1) {
        System.out.println(new String(data, 0, len));
       }
      } catch (IOException e) {
      }
     }).start();

    } catch (IOException e) {
    }

   }
  }).start();

 }

}

总结

在活动连接数不是特别高(小于单机1000)的情况下,这种模型是比较不错的,可以让每一个连接专注于自己的 I/O 并且编程模型简单,也不用过多考虑系统的过载、限流等问题。线程池本身就是一个天然的漏斗,可以缓冲一些系统处理不了的连接或请求。但是,当面对十万甚至百万级连接的时候,传统的 BIO 模型是无能为力的。因此,我们需要一种更高效的 I/O 处理模型来应对更高的并发量。

NIO (no blocking io 也叫 new io)

NIO 即非阻塞IO,是JDK 1.4 更新的api, 核心内容是 将建立连接、数据可读、可写等事件交给了操作系统来维护, 通过调用操作系统的 api (如:select、epoll等),来判断当前是否支持:可读、可写,如果当前不可操作,那么直接返回,从而实现了非阻塞。 而不需要像 BIO 那样每次去轮询等待连接的建立以及数据的准备是否完成。主要核心的模块分以下几类:

1. 缓冲区Buffer

一个特定基类(byte、short、int、long 等)的数据容器,用作在建立socket 连接之后的数据传输。
通过 capacity, limit, position,mark 指针来实现数据的读写

get()、put() 方法为每个子类都具有的读、写数据的api方法,当从当前的 position 读或写的同时,position会增加 相应读写的数据的长度。当position 达到limit 之后,再次 get、put则会抛出异常

2. Channel 连接通道

一个 channel 代表一个与“实体”的连接通道,如:硬件设备、文件、网络 socket 。通过连接通道可以使得客户端-服务器互相传输数据,因此通道也是全双工的(因为是建立在TCP 传输层的协议上,因此具备全双工的能力)。

JDK 中 channel 可以分为以下几类:

SelectableChannel 用于 阻塞和非阻塞 socket 连接的通道
FileChannel 用于文件操作,包括:reading, writing, mapping, and manipulating a file

3.Selector 多路复用选择器

用于 SelectableChannel 的多路复用器,当使用非阻塞的 socket 时,需要将监听的通道 SelectableChannel 感兴趣的事件注册到 selector 多路复用器上(selector 实际上是通过调用操作系统层面的 select、epoll 方法来获取当前可用的时间)

与之对应的感兴趣的事件用 SelectionKey 来表示

  • OP_READ = 1 << 0; 可读
  • OP_WRITE = 1 << 2; 可写
  • OP_CONNECT = 1 << 3; // 完成连接
  • OP_ACCEPT = 1 << 4; // 接收连接

处理流程图:

代码示例:

  1. 通过 ServerSocketChannel 监听 8082 端口
  2. 设置为非阻塞
  3. 选择与操作系统适配的选择器,serverSocketChannel 的 OP_ACCEPT 事件注册到 selector 选择器上
  4. 当OP_ACCEPT 事件触发时,将所有建立好的Socketchannel 连接的感兴趣的事件(这里为 read事件)再次注册到Selector 上
    // 1.根据操作系统选择适当的底层 io复用方法
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.bind(new InetSocketAddress(8082));
    //2.设置为非阻塞
    serverSocketChannel.configureBlocking(false);
    //3.选择与操作系统适配的选择器
    Selector selector = Selector.open();
    //将 serverSocket 的OP_ACCEPT 事件注册到 selector 选择器上
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    while (true) {
      // 4.监听当前连接建立情况
      int select = selector.select();
      if (select > 0) {
        //判断连接业务类型
        Set<SelectionKey> set = selector.selectedKeys();
        Iterator<SelectionKey> iterator = set.iterator();
        while (iterator.hasNext()) {
          SelectionKey key = iterator.next();
          iterator.remove();
          //建立连接
          if (key.isAcceptable()) {
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            //通过 accept 方法获取与 server端 已经创建好的 socket连接
            SocketChannel sc = ssc.accept();
            //设置为非阻塞
            sc.configureBlocking(false);
            //注册感兴趣的事件为 READ
            sc.register(selector, SelectionKey.OP_READ);
          }
          //可读
          else if (key.isReadable()) {
            SocketChannel socket = (SocketChannel) key.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            socket.read(byteBuffer);
            System.out.println(new String(byteBuffer.array(), StandardCharsets.UTF_8));
            key.interestOps(SelectionKey.OP_WRITE);
          }
          //可写
          else if (key.isWritable()) {
            SocketChannel socket = (SocketChannel) key.channel();
            socket.write(ByteBuffer.wrap("I'm receive your message".getBytes(StandardCharsets.UTF_8)));
            socket.close();
            System.out.println("连接关闭成功!");
          }
        }
      }
    }

AIO(asynchronous io)

NIO 2.0引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。
异步的套接字通道时真正的异步非阻塞I/O,对应于UNIX网络编程中的事件驱动I/O(AIO)。他不需要过多的Selector对注册的通道进行轮询即可实现异步读写,从而简化了NIO的编程模型。

代码示例

  private static void server() throws IOException {
    //根据操作系统建立对应的底层操作类
    AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open();
    channel.bind(new InetSocketAddress(8082));
    while (true) {
      Future<AsynchronousSocketChannel> future = channel.accept();
      try {
        AsynchronousSocketChannel asc = future.get();
        System.out.println("建立连接成功");
        Future<Integer> write = asc.write(ByteBuffer.wrap("Now let's exchange datas".getBytes(StandardCharsets.UTF_8)));
        while (!write.isDone()) {
          TimeUnit.SECONDS.sleep(2);
        }
        System.out.println("发送数据完成");
        asc.close();
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }

  private static void client() throws Exception {
    AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
    Future<Void> future = socketChannel.connect(new InetSocketAddress(8082));
    while (!future.isDone()) {
      TimeUnit.SECONDS.sleep(2);
    }
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    Future<Integer> read = socketChannel.read(buffer);
    while (!read.isDone()) {
      TimeUnit.SECONDS.sleep(2);
    }
    System.out.println("接收服务器数据:" + new String(buffer.array(), 0, read.get()));
  }

以上就是Java BIO,NIO,AIO总结的详细内容,更多关于Java BIO,NIO,AIO的资料请关注易盾网络其它相关文章!

网友评论