Java 网络 IO(BIO、NIO、AIO)

打印 上一主题 下一主题

主题 911|帖子 911|积分 2733

简介

Java 支持三种网络 IO 模子:BIO、NIO、AIO。


  • Java BIO 是同步阻塞模子,一个连接对应一个线程,客户端有连接请求时服务端就启动一个线程,纵然这个连接不做任何事变也会占用线程资源。
  • Java NIO 是同步非阻塞模子,一个线程可以处置惩罚多个连接,客户端连接请求会注册到多路复用器(Selector),多路复用器检测到连接有 IO 时间就会处置惩罚。
  • Java AIO 是异步非阻塞模子,AIO 引入了异步通道的概念,读写异步通道会立刻返回,读写的数据由 Future 或 CompletionHandler 进一步处置惩罚。
BIO 实用于连接数少的场景,程序编写比较简单,对服务器的资源要求比较高,JDK1.4之前的唯一选择。NIO 实用于连接数多的场景,例如谈天服务器、服务器间通讯等,程序编写比较复杂,JDK1.4开始支持。AIO 也实用于连接数多的场景,但更加偏向于异步操作多的场景。
Java BIO

模子示例


客户端代码示例

  1. import java.io.*;
  2. import java.net.InetSocketAddress;
  3. import java.net.Socket;
  4. import java.net.SocketAddress;
  5. public class BIOClient {
  6.     public static void main(String[] args) {
  7.         new BIOClient().start("localhost", 6666);
  8.     }
  9.     public void start(String host, int port) {
  10.         // 初始化 socket
  11.         Socket socket = new Socket();
  12.         try {
  13.             // 设置 socket 连接
  14.             SocketAddress remote = new InetSocketAddress(host, port);
  15.             socket.setSoTimeout(5000);
  16.             socket.connect(remote);
  17.             // 发送数据
  18.             PrintWriter writer = getWriter(socket);
  19.             writer.write("hello server");
  20.             writer.flush();
  21. //            // 发起请求
  22. //            PrintWriter writer = getWriter(socket);
  23. //            writer.write(compositeRequest(host));
  24. //            writer.flush();
  25. //
  26. //            // 读取响应
  27. //            String msg;
  28. //            BufferedReader reader = getReader(socket);
  29. //            while ((msg = reader.readLine()) != null) {
  30. //                System.out.println(msg);
  31. //            }
  32.         } catch (IOException e) {
  33.             e.printStackTrace();
  34.         } finally {
  35.             try {
  36.                 socket.close();
  37.             } catch (IOException e) {
  38.                 e.printStackTrace();
  39.             }
  40.         }
  41.     }
  42.     private BufferedReader getReader(Socket socket) throws IOException {
  43.         InputStream in = socket.getInputStream();
  44.         return new BufferedReader(new InputStreamReader(in));
  45.     }
  46.     private PrintWriter getWriter(Socket socket) throws IOException {
  47.         OutputStream out = socket.getOutputStream();
  48.         return new PrintWriter(new OutputStreamWriter(out));
  49.     }
  50.     private String compositeRequest(String host) {
  51.         return "GET / HTTP/1.1\r\n" +
  52.                 "Host: " + host + "\r\n" +
  53.                 "User-Agent: curl/7.43.0\r\n" +
  54.                 "Accept: */*\r\n\r\n";
  55.     }
  56. }
复制代码
服务端代码示例

  1. import java.io.InputStream;
  2. import java.net.ServerSocket;
  3. import java.net.Socket;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6. public class BIOServer {
  7.     public static void main(String[] args) throws Exception {
  8.         // 创建一个线程池
  9.         ExecutorService pool = Executors.newCachedThreadPool();
  10.         // 创建 ServerSocket
  11.         ServerSocket serverSocket = new ServerSocket(6666);
  12.         while (true) {
  13.             // 等待客户端连接
  14.             final Socket socket = serverSocket.accept();
  15.             // 接收到一个客户端连接 放入线程池进行处理
  16.             pool.execute(() -> process(socket));
  17.         }
  18.     }
  19.     static void process(Socket socket) {
  20.         try {
  21.             byte[] bytes = new byte[1024];
  22.             // 通过 socket 获取输入流
  23.             InputStream inputStream = socket.getInputStream();
  24.             // 循环读取客户端发送的数据
  25.             while (true) {
  26.                 // 没有数据的时候这里会阻塞等待
  27.                 int read = inputStream.read(bytes);
  28.                 if (read == -1) break;
  29.                 // 输出客户端发送的数据
  30.                 System.out.println(new String(bytes, 0, read));
  31.             }
  32.         } catch (Exception e) {
  33.             e.printStackTrace();
  34.         } finally {
  35.             try {
  36.                 socket.close();
  37.             } catch (Exception e) {
  38.                 e.printStackTrace();
  39.             }
  40.         }
  41.     }
  42. }
复制代码
Java NIO

NIO 接纳 Reactor 模式,属于 IO 多路复用模子,可以用一个线程处置惩罚多个请求。NIO 有三大核心模块,通道(Channel)、缓冲区(Buffer)、选择器(Selector)。NIO 的非阻塞模式,使主线程在未发生数据读写事件时无需阻塞,可以继承做其他事变,这就大大增强了服务器的并发处置惩罚能力。
模子示例


Selector 对应一个线程,一个 Selector 可以对应多个 Channel,一个 Channel 对应一个 Buffer。程序切换到哪个 Channel 是由事件决定的,Selector 会根据不同的事件切换不同的 Channel。下图描述了 Channel、Buffer 和 Selector 的关系。

MappedByteBuffer 简介

NIO 提供的 MappedByteBuffer 支持支持在内存(堆外内存)中修改文件,可以淘汰一次数据拷贝。文件同步的部分,由 NIO 本身完成。
代码示例
  1. import java.io.RandomAccessFile;
  2. import java.nio.MappedByteBuffer;
  3. import java.nio.channels.FileChannel;
  4. /**
  5. * 说明 1.MappedByteBuffer 可让文件直接在内存(堆外内存)修改,操作系统不需要拷贝一次
  6. */
  7. public class MappedByteBufferTest {
  8.     public static void main(String[] args) throws Exception {
  9.         RandomAccessFile randomAccessFile = new RandomAccessFile("1.txt", "rw");
  10.         //获取对应的通道
  11.         FileChannel channel = randomAccessFile.getChannel();
  12.         /**
  13.          * 参数 1:FileChannel.MapMode.READ_WRITE 使用的读写模式
  14.          * 参数 2:0:可以直接修改的起始位置
  15.          * 参数 3:5: 是映射到内存的大小(不是索引位置),即将 1.txt 的多少个字节映射到内存
  16.          * 可以直接修改的范围就是 0-5
  17.          * 实际类型 DirectByteBuffer
  18.          */
  19.         MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
  20.         mappedByteBuffer.put(0, (byte) 'H');
  21.         mappedByteBuffer.put(3, (byte) '9');
  22.         mappedByteBuffer.put(5, (byte) 'Y');//IndexOutOfBoundsException
  23.         randomAccessFile.close();
  24.         System.out.println("修改成功~~");
  25.     }
  26. }
复制代码
 NIO 编程代码原理分析图

关于 NIO 非阻塞网络编程相关的(Selector、SelectionKey、ServerScoketChannel 和 SocketChannel)关系梳理图

服务端代码示例

可以结合上面的原理图观察代码实现细节
  1. import java.io.IOException;
  2. import java.net.InetSocketAddress;
  3. import java.nio.ByteBuffer;
  4. import java.nio.channels.*;
  5. import java.util.Iterator;
  6. public class GroupChatServer {
  7.     // 定义属性
  8.     private Selector selector;
  9.     private ServerSocketChannel listenChannel;
  10.     private static final int PORT = 6667;
  11.     // 构造器执行初始化工作
  12.     public GroupChatServer() {
  13.         try {
  14.             // 得到选择器
  15.             selector = Selector.open();
  16.             // 监听端口的主线程
  17.             listenChannel = ServerSocketChannel.open();
  18.             // 绑定端口
  19.             listenChannel.socket().bind(new InetSocketAddress(PORT));
  20.             // 设置非阻塞模式
  21.             listenChannel.configureBlocking(false);
  22.             // 将该 listenChannel 注册到 selector
  23.             listenChannel.register(selector, SelectionKey.OP_ACCEPT);
  24.         } catch (IOException e) {
  25.             e.printStackTrace();
  26.         }
  27.     }
  28.     public void listen() {
  29.         try {
  30.             // 循环处理
  31.             while (true) {
  32.                 int count = selector.select();
  33.                 // 有事件处理
  34.                 if (count > 0) {
  35.                     // 遍历得到 selectionKey 集合
  36.                     Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
  37.                     while (iterator.hasNext()) {
  38.                         // 取出 selectionKey
  39.                         SelectionKey key = iterator.next();
  40.                         // 监听到 accept
  41.                         if (key.isAcceptable()) {
  42.                             SocketChannel sc = listenChannel.accept();
  43.                             sc.configureBlocking(false);
  44.                             // 将该 sc 注册到 selector
  45.                             sc.register(selector, SelectionKey.OP_READ);
  46.                             // 提示
  47.                             System.out.println(sc.getRemoteAddress() + " 上线 ");
  48.                         }
  49.                         if (key.isReadable()) {// 通道发送read事件,即通道是可读的状态
  50.                             // 处理读(专门写方法..)
  51.                             readData(key);
  52.                         }
  53.                         // 当前的 key 删除,防止重复处理
  54.                         iterator.remove();
  55.                     }
  56.                 } else {
  57.                     System.out.println("等待....");
  58.                 }
  59.             }
  60.         } catch (Exception e) {
  61.             e.printStackTrace();
  62.         } finally {
  63.             // 发生异常处理....
  64.         }
  65.     }
  66.     // 读取客户端消息
  67.     public void readData(SelectionKey key) {
  68.         SocketChannel channel = null;
  69.         try {
  70.             // 得到 channel
  71.             channel = (SocketChannel) key.channel();
  72.             // 创建 buffer
  73.             ByteBuffer buffer = ByteBuffer.allocate(1024);
  74.             int count = channel.read(buffer);// NIO这里不会阻塞 因为事件触发时必然已经有数据了 所以叫非阻塞IO
  75.             // 根据 count 的值做处理
  76.             if (count > 0) {
  77.                 // 把缓存区的数据转成字符串
  78.                 String msg = new String(buffer.array());
  79.                 // 输出该消息
  80.                 System.out.println("form客户端:" + msg);
  81.                 // 向其它的客户端转发消息(去掉自己),专门写一个方法来处理
  82.                 sendInfoToOtherClients(msg, channel);
  83.             }
  84.         } catch (IOException e) {
  85.             try {
  86.                 System.out.println(channel.getRemoteAddress() + "离线了..");
  87.                 // 取消注册
  88.                 key.cancel();
  89.                 // 关闭通道
  90.                 channel.close();
  91.             } catch (IOException e2) {
  92.                 e2.printStackTrace();
  93.             }
  94.         }
  95.     }
  96.     // 转发消息给其它客户(通道)
  97.     private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException {
  98.         System.out.println("服务器转发消息中...");
  99.         // 遍历所有注册到 selector 上的 SocketChannel,并排除 self
  100.         for (SelectionKey key : selector.keys()) {
  101.             // 通过 key 取出对应的 SocketChannel
  102.             Channel targetChannel = key.channel();
  103.             // 排除自己
  104.             if (targetChannel instanceof SocketChannel && targetChannel != self) {
  105.                 // 转型
  106.                 SocketChannel dest = (SocketChannel) targetChannel;
  107.                 // 将 msg 存储到 buffer
  108.                 ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
  109.                 // 将 buffer 的数据写入通道
  110.                 dest.write(buffer);
  111.             }
  112.         }
  113.     }
  114.     public static void main(String[] args) {
  115.         // 创建服务器对象
  116.         GroupChatServer groupChatServer = new GroupChatServer();
  117.         groupChatServer.listen();
  118.     }
  119. }
复制代码
Java AIO

AIO 是异步非阻塞的,引入了异步通道的概念,接纳 Proactor 模式,操作系统完成数据拷贝操作后才会关照服务端线程。AIO 本质上还是 IO 多路复用模子,与 NIO 比起来,AIO 只是在非阻塞的前提下增加了异步功能,详细则表现在代码编写以及数据传输两个层面。


  • 从代码编写角度来说,原来的同步方法会阻塞等候接口返回,而现在可以异步等候返回结果。
  • 从数据传输角度来说,每个请求都须要传输数据,NIO 固然好坏阻塞的,但是事件到达后,NIO 须要本身把数据从内核空间复制到用户空间。AIO 引入异步逻辑后,事件到达后系统不会立刻关照服务端线程,而是会本身把数据从内核空间复制到用户空间,完成这个操作后,才会关照服务端线程行止理。
AIO 的使用场景还是比较少,现在大部分开源框架中应该还是以使用 NIO 为主,AIO 在性能方面的提升还是比较有限,紧张的变化还是增加了异步功能。
如何明白 Reactor 和 Proactor 的区别?

Reactor 可以明白为「来了事件操作系统关照应用进程,让应用进程来处置惩罚」,而Proactor 可以明白为「来了事件操作系统来处置惩罚,处置惩罚完再关照应用进程」。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

西河刘卡车医

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表