IO流中「线程」模型总结

打印 上一主题 下一主题

主题 927|帖子 927|积分 2781

目录

IO流模块:经常看、经常用、经常忘;
一、基础简介

在IO流的网络模型中,以常见的「客户端-服务端」交互场景为例;

客户端与服务端进行通信「交互」,可能是同步或者异步,服务端进行「流」处理时,可能是阻塞或者非阻塞模式,当然也有自定义的业务流程需要执行,从处理逻辑看就是「读取数据-业务执行-应答写数据」的形式;
Java提供「三种」IO网络编程模型,即:「BIO同步阻塞」、「NIO同步非阻塞」、「AIO异步非阻塞」;
二、同步阻塞

1、模型图解

BIO即同步阻塞,服务端收到客户端的请求时,会启动一个线程处理,「交互」会阻塞直到整个流程结束;

这种模式如果在高并发且流程复杂耗时的场景下,客户端的请求响应会存在严重的性能问题,并且占用过多资源;
2、参考案例

服务端】启动ServerSocket接收客户端的请求,经过一系列逻辑之后,向客户端发送消息,注意这里线程的10秒休眠;
  1. public class SocketServer01 {
  2.     public static void main(String[] args) throws Exception {
  3.         // 1、创建Socket服务端
  4.         ServerSocket serverSocket = new ServerSocket(8080);
  5.         // 2、方法阻塞等待,直到有客户端连接
  6.         Socket socket = serverSocket.accept();
  7.         // 3、输入流,输出流
  8.         InputStream inStream = socket.getInputStream();
  9.         OutputStream outStream = socket.getOutputStream();
  10.         // 4、数据接收和响应
  11.         int readLen = 0;
  12.         byte[] buf = new byte[1024];
  13.         if ((readLen=inStream.read(buf)) != -1){
  14.             // 接收数据
  15.             String readVar = new String(buf, 0, readLen) ;
  16.             System.out.println("readVar======="+readVar);
  17.         }
  18.         // 响应数据
  19.         Thread.sleep(10000);
  20.         outStream.write("sever-8080-write;".getBytes());
  21.         // 5、资源关闭
  22.         IoClose.ioClose(outStream,inStream,socket,serverSocket);
  23.     }
  24. }
复制代码
客户端】Socket连接,先向ServerSocket发送请求,再接收其响应,由于Server端模拟耗时,Client处于长时间阻塞状态;
  1. public class SocketClient01 {
  2.     public static void main(String[] args) throws Exception {
  3.         // 1、创建Socket客户端
  4.         Socket socket = new Socket(InetAddress.getLocalHost(), 8080);
  5.         // 2、输入流,输出流
  6.         OutputStream outStream = socket.getOutputStream();
  7.         InputStream inStream = socket.getInputStream();
  8.         // 3、数据发送和响应接收
  9.         // 发送数据
  10.         outStream.write("client-hello".getBytes());
  11.         // 接收数据
  12.         int readLen = 0;
  13.         byte[] buf = new byte[1024];
  14.         if ((readLen=inStream.read(buf)) != -1){
  15.             String readVar = new String(buf, 0, readLen) ;
  16.             System.out.println("readVar======="+readVar);
  17.         }
  18.         // 4、资源关闭
  19.         IoClose.ioClose(inStream,outStream,socket);
  20.     }
  21. }
复制代码
三、同步非阻塞

1、模型图解

NIO即同步非阻塞,服务端可以实现一个线程,处理多个客户端请求连接,服务端的并发能力得到极大的提升;

这种模式下客户端的请求连接都会注册到Selector多路复用器上,多路复用器会进行轮询,对请求连接的IO流进行处理;
2、参考案例

服务端】单线程可以处理多个客户端请求,通过轮询多路复用器查看是否有IO请求;
  1. public class SocketServer01 {
  2.     public static void main(String[] args) throws Exception {
  3.         try {
  4.             //启动服务开启监听
  5.             ServerSocketChannel socketChannel = ServerSocketChannel.open();
  6.             socketChannel.socket().bind(new InetSocketAddress("127.0.0.1", 8989));
  7.             // 设置非阻塞,接受客户端
  8.             socketChannel.configureBlocking(false);
  9.             // 打开多路复用器
  10.             Selector selector = Selector.open();
  11.             // 服务端Socket注册到多路复用器,指定兴趣事件
  12.             socketChannel.register(selector, SelectionKey.OP_ACCEPT);
  13.             // 多路复用器轮询
  14.             ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
  15.             while (selector.select() > 0){
  16.                 Set<SelectionKey> selectionKeys = selector.selectedKeys();
  17.                 Iterator<SelectionKey> selectionKeyIter = selectionKeys.iterator();
  18.                 while (selectionKeyIter.hasNext()){
  19.                     SelectionKey selectionKey = selectionKeyIter.next() ;
  20.                     selectionKeyIter.remove();
  21.                     if(selectionKey.isAcceptable()) {
  22.                         // 接受新的连接
  23.                         SocketChannel client = socketChannel.accept();
  24.                         // 设置读非阻塞
  25.                         client.configureBlocking(false);
  26.                         // 注册到多路复用器
  27.                         client.register(selector, SelectionKey.OP_READ);
  28.                     } else if (selectionKey.isReadable()) {
  29.                         // 通道可读
  30.                         SocketChannel client = (SocketChannel) selectionKey.channel();
  31.                         int len = client.read(buffer);
  32.                         if (len > 0){
  33.                             buffer.flip();
  34.                             byte[] readArr = new byte[buffer.limit()];
  35.                             buffer.get(readArr);
  36.                             System.out.println(client.socket().getPort() + "端口数据:" + new String(readArr));
  37.                             buffer.clear();
  38.                         }
  39.                     }
  40.                 }
  41.             }
  42.         } catch (Exception e) {
  43.             e.printStackTrace();
  44.         }
  45.     }
  46. }
复制代码
客户端】每隔3秒持续的向通道内写数据,服务端通过轮询多路复用器,持续的读取数据;
  1. public class SocketClient01 {
  2.     public static void main(String[] args) throws Exception {
  3.         try {
  4.             // 连接服务端
  5.             SocketChannel socketChannel = SocketChannel.open();
  6.             socketChannel.connect(new InetSocketAddress("127.0.0.1", 8989));
  7.             ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
  8.             String conVar = "client-hello";
  9.             writeBuffer.put(conVar.getBytes());
  10.             writeBuffer.flip();
  11.             // 每隔3S发送一次数据
  12.             while (true) {
  13.                 Thread.sleep(3000);
  14.                 writeBuffer.rewind();
  15.                 socketChannel.write(writeBuffer);
  16.                 writeBuffer.clear();
  17.             }
  18.         } catch (Exception e) {
  19.             e.printStackTrace();
  20.         }
  21.     }
  22. }
复制代码
四、异步非阻塞

1、模型图解

AIO即异步非阻塞,对于通道内数据的「读」和「写」动作,都是采用异步的模式,对于性能的提升是巨大的;

这与常规的第三方对接模式很相似,本地服务在请求第三方服务时,请求过程耗时很大,会异步执行,第三方第一次回调,确认请求可以被执行;第二次回调则是推送处理结果,这种思想在处理复杂问题时,可以很大程度的提高性能,节省资源:
2、参考案例

服务端】各种「accept」、「read」、「write」动作是异步,通过Future来获取计算的结果;
  1. public class SocketServer01 {
  2.     public static void main(String[] args) throws Exception {
  3.         // 启动服务开启监听
  4.         AsynchronousServerSocketChannel socketChannel = AsynchronousServerSocketChannel.open() ;
  5.         socketChannel.bind(new InetSocketAddress("127.0.0.1", 8989));
  6.         // 指定30秒内获取客户端连接,否则超时
  7.         Future<AsynchronousSocketChannel> acceptFuture = socketChannel.accept();
  8.         AsynchronousSocketChannel asyChannel = acceptFuture.get(30, TimeUnit.SECONDS);
  9.         if (asyChannel != null && asyChannel.isOpen()){
  10.             // 读数据
  11.             ByteBuffer inBuffer = ByteBuffer.allocate(1024);
  12.             Future<Integer> readResult = asyChannel.read(inBuffer);
  13.             readResult.get();
  14.             System.out.println("read:"+new String(inBuffer.array()));
  15.             // 写数据
  16.             inBuffer.flip();
  17.             Future<Integer> writeResult = asyChannel.write(ByteBuffer.wrap("server-hello".getBytes()));
  18.             writeResult.get();
  19.         }
  20.         // 关闭资源
  21.         asyChannel.close();
  22.     }
  23. }
复制代码
客户端】相关「connect」、「read」、「write」方法调用是异步的,通过Future来获取计算的结果;
  1. public class SocketClient01 {
  2.     public static void main(String[] args) throws Exception {
  3.         // 连接服务端
  4.         AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
  5.         Future<Void> result = socketChannel.connect(new InetSocketAddress("127.0.0.1", 8989));
  6.         result.get();
  7.         // 写数据
  8.         String conVar = "client-hello";
  9.         ByteBuffer reqBuffer = ByteBuffer.wrap(conVar.getBytes());
  10.         Future<Integer> writeFuture = socketChannel.write(reqBuffer);
  11.         writeFuture.get();
  12.         // 读数据
  13.         ByteBuffer inBuffer = ByteBuffer.allocate(1024);
  14.         Future<Integer> readFuture = socketChannel.read(inBuffer);
  15.         readFuture.get();
  16.         System.out.println("read:"+new String(inBuffer.array()));
  17.         // 关闭资源
  18.         socketChannel.close();
  19.     }
  20. }
复制代码
五、Reactor模型

1、模型图解

这部分内容,可以参考「Doug Lea的《IO》」文档,查看更多细节;
1.1 Reactor设计原理

Reactor模式基于事件驱动设计,也称为「反应器」模式或者「分发者」模式;服务端收到多个客户端请求后,会将请求分派给对应的线程处理;

Reactor:负责事件的监听和分发;Handler:负责处理事件,核心逻辑「read读」、「decode解码」、「compute业务计算」、「encode编码」、「send应答数据」;
1.2 单Reactor单线程


【1】Reactor线程通过select监听客户端的请求事件,收到事件后通过Dispatch进行分发;
【2】如果是建立连接请求事件,Acceptor通过「accept」方法获取连接,并创建一个Handler对象来处理后续业务;
【3】如果不是连接请求事件,则Reactor会将该事件交由当前连接的Handler来处理;
【4】在Handler中,会完成相应的业务流程;
这种模式将所有逻辑「连接、读写、业务」放在一个线程中处理,避免多线程的通信,资源竞争等问题,但是存在明显的并发和性能问题;
1.3 单Reactor多线程


【1】Reactor线程通过select监听客户端的请求事件,收到事件后通过Dispatch进行分发;
【2】如果是建立连接请求事件,Acceptor通过「accept」方法获取连接,并创建一个Handler对象来处理后续业务;
【3】如果不是连接请求事件,则Reactor会将该事件交由当前连接的Handler来处理;
【4】在Handler中,只负责事件响应不处理具体业务,将数据发送给Worker线程池来处理;
【5】Worker线程池会分配具体的线程来处理业务,最后把结果返回给Handler做响应;
这种模式将业务从Reactor单线程分离处理,可以让其更专注于事件的分发和调度,Handler使用多线程也充分的利用cpu的处理能力,导致逻辑变的更加复杂,Reactor单线程依旧存在高并发的性能问题;
1.4 主从Reactor多线程


【1】 MainReactor主线程通过select监听客户端的请求事件,收到事件后通过Dispatch进行分发;
【2】如果是建立连接请求事件,Acceptor通过「accept」方法获取连接,之后MainReactor将连接分配给SubReactor;
【3】如果不是连接请求事件,则MainReactor将连接分配给SubReactor,SubReactor调用当前连接的Handler来处理;
【4】在Handler中,只负责事件响应不处理具体业务,将数据发送给Worker线程池来处理;
【5】Worker线程池会分配具体的线程来处理业务,最后把结果返回给Handler做响应;
这种模式Reactor线程分工明确,MainReactor负责接收新的请求连接,SubReactor负责后续的交互业务,适应于高并发的处理场景,是Netty组件通信框架的所采用的模式;
2、参考案例

服务端】提供两个EventLoopGroup,「ParentGroup」主要是用来接收客户端的请求连接,真正的处理是转交给「ChildGroup」执行,即Reactor多线程模型;
  1. @Slf4j
  2. public class NettyServer {
  3.     public static void main(String[] args) {
  4.         // EventLoop组,处理事件和IO
  5.         EventLoopGroup parentGroup = new NioEventLoopGroup();
  6.         EventLoopGroup childGroup = new NioEventLoopGroup();
  7.         try {
  8.             // 服务端启动引导类
  9.             ServerBootstrap serverBootstrap = new ServerBootstrap();
  10.             serverBootstrap.group(parentGroup, childGroup)
  11.                     .channel(NioServerSocketChannel.class).childHandler(new ServerChannelInit());
  12.             // 异步IO的结果
  13.             ChannelFuture channelFuture = serverBootstrap.bind(8989).sync();
  14.             channelFuture.channel().closeFuture().sync();
  15.         } catch (Exception e){
  16.             e.printStackTrace();
  17.         } finally {
  18.             parentGroup.shutdownGracefully();
  19.             childGroup.shutdownGracefully();
  20.         }
  21.     }
  22. }
  23. class ServerChannelInit extends ChannelInitializer<SocketChannel> {
  24.     @Override
  25.     protected void initChannel(SocketChannel socketChannel) {
  26.         // 获取管道
  27.         ChannelPipeline pipeline = socketChannel.pipeline();
  28.         // 编码、解码器
  29.         pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
  30.         pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
  31.         // 添加自定义的handler
  32.         pipeline.addLast("serverHandler", new ServerHandler());
  33.     }
  34. }
  35. class ServerHandler extends ChannelInboundHandlerAdapter {
  36.     /**
  37.      * 通道读和写
  38.      */
  39.     @Override
  40.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  41.         System.out.println("Server-Msg【"+msg+"】");
  42.         TimeUnit.MILLISECONDS.sleep(2000);
  43.         String nowTime = DateTime.now().toString(DatePattern.NORM_DATETIME_PATTERN) ;
  44.         ctx.channel().writeAndFlush("hello-client;time:" + nowTime);
  45.         ctx.fireChannelActive();
  46.     }
  47.     @Override
  48.     public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception {
  49.         cause.printStackTrace();
  50.         ctx.close();
  51.     }
  52. }
复制代码
客户端】通过Bootstrap类,与服务器建立连接,服务端通过ServerBootstrap启动服务,绑定在8989端口,然后服务端和客户端进行通信;
  1. public class NettyClient {
  2.     public static void main(String[] args) {
  3.         // EventLoop处理事件和IO
  4.         NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
  5.         try {
  6.             // 客户端通道引导
  7.             Bootstrap bootstrap = new Bootstrap();
  8.             bootstrap.group(eventLoopGroup)
  9.                     .channel(NioSocketChannel.class).handler(new ClientChannelInit());
  10.             // 异步IO的结果
  11.             ChannelFuture channelFuture = bootstrap.connect("localhost", 8989).sync();
  12.             channelFuture.channel().closeFuture().sync();
  13.         } catch (Exception e){
  14.             e.printStackTrace();
  15.         } finally {
  16.             eventLoopGroup.shutdownGracefully();
  17.         }
  18.     }
  19. }
  20. class ClientChannelInit extends ChannelInitializer<SocketChannel> {
  21.     @Override
  22.     protected void initChannel(SocketChannel socketChannel) {
  23.         // 获取管道
  24.         ChannelPipeline pipeline = socketChannel.pipeline();
  25.         // 编码、解码器
  26.         pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
  27.         pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
  28.         // 添加自定义的handler
  29.         pipeline.addLast("clientHandler", new ClientHandler());
  30.     }
  31. }
  32. class ClientHandler extends ChannelInboundHandlerAdapter {
  33.     /**
  34.      * 通道读和写
  35.      */
  36.     @Override
  37.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  38.         System.out.println("Client-Msg【"+msg+"】");
  39.         TimeUnit.MILLISECONDS.sleep(2000);
  40.         String nowTime = DateTime.now().toString(DatePattern.NORM_DATETIME_PATTERN) ;
  41.         ctx.channel().writeAndFlush("hello-server;time:" + nowTime);
  42.     }
  43.     @Override
  44.     public void channelActive(ChannelHandlerContext ctx) throws Exception {
  45.         ctx.channel().writeAndFlush("channel...active");
  46.     }
  47.     @Override
  48.     public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception {
  49.         cause.printStackTrace();
  50.         ctx.close();
  51.     }
  52. }
复制代码
六、参考源码
  1. 编程文档:
  2. https://gitee.com/cicadasmile/butte-java-note
  3. 应用仓库:
  4. https://gitee.com/cicadasmile/butte-flyer-parent
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

半亩花草

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表