大纲
1.Buffer缓冲区
2.Channel通道
3.BIO编程
4.伪异步IO编程
5.改造程序以支持长毗连
6.NIO三大核心组件
7.NIO服务端的创建流程
8.NIO客户端的创建流程
9.NIO优点总结
10.NIO标题总结
4.伪异步IO编程
(1)BIO的主要标题
(2)BIO编程模子的改进
(3)伪异步IO编程
(4)伪异步IO的标题
(5)伪异步IO可能引起的级联故障
(1)BIO的主要标题
BIO的主要标题在于每当有一个新的客户端请求接入时,服务端必须创建一个新的线程来处理新接入的客户端链路,一个线程只能处理一个客户端毗连。
比如在上述BIO编程的服务端程序中:SocketServer和一个客户端创建毗连后,服务端会一直执行in.read(buf)进行壅闭等待,从而导致服务端只能和这个客户端进行通信。如果有一个新的客户端想和服务端创建毗连,那么服务端是不能和这个新的客户端创建TCP毗连的,由于服务端还在壅闭执行in.read(buf)而没能执行serverSocket.accept()。
(2)BIO编程模子的改进
为了改进一个线程处理一个毗连的BIO模子,可以通过线程池或者消息队列,实现一个或者多个线程处理N个客户端的伪异步IO模子。
此时由于底层通信机制依然使用同步壅闭IO,所以被称为伪异步IO。服务端线程池的最大线程数N和客户端并发访问数M呈N : M的比例关系。改进后的BIO模子能有效防止海量并发接入导致线程资源耗尽。
(3)伪异步IO编程
当有新的客户端接入时,将客户端的Socket封装成一个使命Task(实现Runnable接口)投递到后端的线程池中进行处理。这个线程池中会维护一个消息队列和N个生动的线程来对消息队列中的使命Task进行处理。由于线程池可以设置消息队列的大小和最大线程数,因此线程资源的占用是可控的,不会导致线程耗尽而宕机。
- //服务端(为简洁省略try catch)
- public class SocketServer {
- public static void main(String[] args) throws Exception {
- ServerSocket serverSocket = new ServerSocket(9000);
- ServerHandlerExecutePool singleExcutor = new ServerHandlerExecutePool(50, 1000);
- while (true) {
- //在这里会阻塞住,一直等待客户端来建立连接
- Socket socket = serverSocket.accept();
- //获取客户端的连接后,将socket提交给线程池处理
- singleExcutor.execute(new ServerHandler(socket));
- }
- ...
- }
- }
- public class ServerHandlerExecutePool {
- private ExecutorService executor;
- public ServerHandlerExecutePool(int maxPoolSize, int queueSize) {
- executor = new ThreadPoolExecutor(
- Runtime.getRuntime().availableProcessors(),
- maxPoolSize,
- 120L,
- TimeUnit.SECONDS,
- new ArrayBlockingQueue<Runnable>(queueSize)
- );
- }
-
- public void execute(Runnable task) {
- executor.execute(task);
- }
- }
- public class ServerHandler implements Runnable {
- private Socket socket;
- public ServerHandler(Socket socket) {
- this.socket = socket;
- }
-
- @Override
- public void run() {
- InputStreamReader in = new InputStreamReader(socket.getInputStream());
- OutputStream out = socket.getOutputStream();
- char[] buf = new char[1024 * 1024];
- int len = in.read(buf);
- while (len != -1) {
- String request = new String(buf, 0, len);
- System.out.println("[" + Thread.currentThread().getName() + "]服务端接收到了请求:" + request);
- out.write("收到,收到".getBytes());
- len = in.read(buf);
- }
- //释放资源
- out.close();
- in.close();
- //通过TCP四次挥手断开连接
- socket.close();
- }
- }
复制代码 (4)伪异步IO的标题
一.当对Socket的输入流进行读操作时
也就是调用Java输入流InputStream的read()方法读输入流时,会一直壅闭下去直到可用数据已经读取完毕。这意味着当一方发送请求或者应答消息比较缓慢或者网络传输比较缓慢时,读取输入流的一方的通信线程将被长时间壅闭。如果发送方要60s才能够将数据发送完毕,读取方的IO线程在执行read()方法时会被同步壅闭60s。在此期间,其他的毗连使命只能在消息队列中排队。
二.当对Socket的输出流进行写操作时
也就是调用Java输出流OutputStream的write()方法写输出流时,也会一直壅闭下去直到所有要发送的字节全部写入完毕。当消息的吸收方处理缓慢时,将不能及时从TCP缓冲区中读取数据。这将会导致发送方的TCP Window Size不停减小,直到为0。末了消息发送方将不能再向TCP缓冲区写入消息,于是write()方法将被壅闭直到TCP Window Size大于0。所以输入流和输出流的读和写操作都是同步壅闭的,壅闭时间取决于对方IO线程的处理速度和网络IO的传输速度。
(5)伪异步IO可能引起的级联故障
一.服务端处理缓慢,返回应答消息泯灭60s,平时只需10ms。
二.此时客户端在读取服务端响应,由于读取输入流是壅闭的,客户端将会被同步壅闭60s。
三.如果所有可用线程都被该服务端壅闭,那后续的所有IO消息都将在队列中排队。
四.由于线程池采用壅闭队列实现,当队列积满后,后续入队的操作将会被壅闭。
五.由于只有一个Acceptor线程吸收客户端接入,线程池的壅闭队列满了之后,它也会被壅闭。于是新的客户端请求消息将会被拒绝,从而客户端发生大量的毗连超时。
因此BIO模子显然无法支持百万并发请求,由于一个线程只能处理一个请求。改进BIO后的伪异步IO模子即便可以通过一个线程处理多个请求,也存在级联故障的标题。
5.改造程序以支持长毗连
(1)什么是长毗连和短毗连
(2)长毗连和短毗连的代码
(1)什么是长毗连和短毗连
一.短毗连
客户端每次创建一个毗连,发送一个请求,获取一个响应,然后就断开毗连,就是所谓的短毗连。
二.长毗连
客户端每次创建一个毗连,可以发送很多个请求,一直连续维持这个TCP毗连,不停开毗连。客户端连续通过这个毗连与服务端进行通信,不停地发送数据和请求。服务端也长期维持这个毗连,不停地继承请求返回响应。这个就是所谓的长毗连,毗连存在的时间很长的。所以只要客户端不停地发送请求不释放毗连,那么就是长毗连了。
(2)长毗连和短毗连的代码
- //客户端(短连接)(为简洁省略try catch)
- public class SocketClient {
- public static void main(String[] args) throws Exception {
- for (int i = 0; i < 10; i++) {
- new Thread() {
- public void run() {
- Socket socket = new Socket("localhost", 9000);
- InputStreamReader in = new InputStreamReader(socket.getInputStream());
- OutputStream out = socket.getOutputStream();
- //发送数据流,底层拆分为一个一个的TCP包发过去
- out.write("你好".getBytes());
- char[] buf = new char[1024 * 1024];
- int len = in.read(buf);
- if (len != -1) {
- String response = new String(buf, 0, len);
- System.out.println("[" + Thread.currentThread().getName() + "]客户端接收到了响应:" + response);
- }
- in.close();
- out.close();
- socket.close();
- };
- }.start();
- }
- }
- }
- //客户端(长连接)(为简洁省略try catch)
- public class SocketClient {
- public static void main(String[] args) throws Exception {
- for (int i = 0; i < 10; i++) {
- new Thread() {
- public void run() {
- Socket socket = new Socket("localhost", 9000);
- InputStreamReader in = new InputStreamReader(socket.getInputStream());
- OutputStream out = socket.getOutputStream();
- char[] buf = new char[1024 * 1024];
- //客户端不停地每隔一秒发送请求,不释放就是长连接了
- while (true) {
- //发送数据流,底层拆分为一个一个的TCP包发过去
- out.write("你好".getBytes());
- int len = in.read(buf);
- if (len != -1) {
- String response = new String(buf, 0, len);
- System.out.println("[" + Thread.currentThread().getName() + "]客户端接收到了响应:" + response);
- }
- Thread.sleep(1000);
- }
- ...
- };
- }.start();
- }
- }
- }
复制代码
6.NIO三大核心组件
(1)Buffer缓冲区
(2)Channel数据通道
(3)Selector多路复用器
(4)BIO和IO多路复用的理解
(5)一些概念增补
NIO的三大核心组件分别是:Buffer、Channel、Selector。
(1)Buffer缓冲区
Buffer缓冲区是用来封装数据的,也就是把数据写入到Buffer、或者从Buffer中读取数据。
(2)Channel数据通道
Channel就是一个数据管道,负责传输数据(封装好数据的Buffer),比如把数据写入到文件或网络、从文件或网络中读取数据。
(3)Selector多路复用器
Selector会不停地轮询注册在其上的Channel。如果某个Channel上发生读或写事件,那么这个Channel就处于停当状态。然后停当的Channel就会被Selector轮询出来,具体就是通过Selector的SelectionKey来获取停当的Channel聚集。获取到停当的Channel后,就可以进行后续的IO操作了。
一个Selector多路复用器可以同时轮询多个Channel。由于JDK使用了epoll()取代传统的select实现,所以它并没有最大毗连句柄1024/2048的限制。这意味着只需要一个线程负责Selector多路复用器的轮询,就可以接入成千上万的客户端。
(4)BIO和IO多路复用的理解
由于TCP毗连的创建需要经过三次握手,所以可理解为客户端向服务端发起的Socket毗连停当需要经过三次握手请求。服务端吸收到客户端的第一次握手请求时,会创建Socket毗连(即创建一个Channel)。服务端吸收客户端的第三次握手请求,这个Socket毗连才算预备好(Channel才算停当)。
在BIO模子下,只有一次系统调用recvfrom。所以服务端从吸收到客户端的第一次握手请求开始,就必须同步壅闭等待直到第三次握手请求的完成,如许才能获取预备好的Socket毗连并读取请求数据。
在多路复用模子下,会有两次系统调用select和recvfrom。所以服务端吸收到客户端的第一次握手请求后,不必创建线程然后通过壅闭来等待第三次握手请求的完成,而是可以直接通过轮询Selector(基于select系统调用),来获取所有已经完成第三次握手请求(已停当)的客户端Socket毗连,之后对这些Socket毗连进行recvfrom系统调用时不需要壅闭也能马上读取到请求数据了。
(5)一些概念增补
一.异步非壅闭IO
NIO类库支持非壅闭的读和写操作。相比BIO同步壅闭的读和写操作,NIO是异步的,所以也称NIO为异步非壅闭IO。
二.多路复用Selector
NIO的实现关键是多路复用IO技术。多路复用的核心是通过Selector来轮询注册在其上的Channel,执行Selector.select()方法进行轮询时是同步壅闭的。当发现有Channel处于停当状态后,Selector.select()方法就会从壅闭状态中返回,然后就可以通过Selector.selectedKeys()方法获取停当的Channel进行IO操作。
三.伪异步IO
在通信线程和业务线程之间做一个缓冲区,这个缓冲区用于隔离IO线程和业务线程间的直接访问,如许业务线程就不会被IO线程壅闭了。
对于后端的业务侧来说,将消息或者Task放到线程池后就返回了,它不再直接访问IO线程或者进行IO读写,如许也就不会被同步壅闭。
7.NIO服务端的创建流程
步调一:通过ServerSocketChannel的opne()方法打开ServerSocketChannel。
步调二:设置ServerSocketChannel为非壅闭模式,绑定监听地址和端口。
步调三:通过Selector的open()方法创建多路复用器Selector,将已打开的ServerSocketChannel注册到多路复用器Selector上以及监听ACCEPT事件。
步调四:多路复用器Selector通过select()方法轮询预备停当的SelectionKey。
步调五:如果这个SelectionKey是acceptable,说明有客户端发起了毗连请求。此时需要调用ServerSocketChannel的accept()方法来处理该接入请求,也就是完成TCP三次握手并创建物理链路以及得到该客户端毗连SocketChannel。
步调六:然后将新接入的客户端毗连SocketChannel注册到多路复用器Selector上以及监听READ事件。
步调七:如果这个SelectionKey是readable,说明有客户端发送了数据过来。此时需要调用SocketChannel的read()方法异步读取客户端发送的数据到ByteBuffer缓冲区,同时将客户端毗连SocketChannel注册到多路复用器Selector上以及监听WRITE事件。
步调八:接着对ByteBuffer缓冲区的数据进行decode解码处理并完成业务逻辑,然后再将处理效果对象encode编码放入ByteBuffer缓冲区,末了调用SocketChannel的write()方法异步发送给客户端,以及将客户端毗连SocketChannel注册到多路复用器Selector上以及监听READ事件。
- public class NIOServer {
- private static Selector selector;
-
- public static void main(String[] args) {
- init();
- listen();
- }
- private static void init() {
- //serverSocketChannel其实就是ServerSocket,负责跟各个客户端连接连接请求
- //SelectionKey.OP_ACCEPT的意思是仅仅关注ServerSocketChannel接收到的TCP连接请求,也就是监听ACCEPT事件
- //ServerSocketChannel是注册在Selector上面的
- ServerSocketChannel serverSocketChannel = null;
- try {
- selector = Selector.open();
- serverSocketChannel = ServerSocketChannel.open();//步骤一:打开ServerSocketChannel
- serverSocketChannel.configureBlocking(false);//步骤二:NIO就是支持非阻塞的
- serverSocketChannel.socket().bind(new InetSocketAddress(9000), 100);//步骤二
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//步骤三:注册到selector + 关注OP_ACCEPT事件
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- private static void listen() {
- while(true) {
- try {
- //select()方法是阻塞的,看注册到Selector上的ServerSocketChannel是否接收到了请求
- selector.select();//步骤四
- Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
- while(keysIterator.hasNext()) {
- SelectionKey key = (SelectionKey) keysIterator.next();
- //可以认为一个SelectionKey代表了一个请求
- keysIterator.remove();
- handleRequest(key);
- }
- } catch(Throwable t){
- t.printStackTrace();
- }
- }
- }
- private static void handleRequest(SelectionKey key) throws IOException, ClosedChannelException {
- //会有个线程池获取到了请求,下面的代码都应该在线程池的工作线程里处理
- SocketChannel channel = null;
- try {
- //如果这个SelectionKey是acceptable,也就是连接请求
- //那么注册对应的SocketChannel到selector上,并关注OP_READ事件
- if (key.isAcceptable()) {//步骤五
- System.out.println("[" + Thread.currentThread().getName() + "]接收到连接请求");
- //从SelectionKey中拿出ServerSocketChannel
- ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
- //调用ServerSocketChannel的accept方法,就可以跟客户端进行TCP三次握手
- channel = serverSocketChannel.accept();
- System.out.println("[" + Thread.currentThread().getName() + "]建立连接时获取到的channel=" + channel);
- //如果三次握手成功了之后,就可以获取到一个建立好TCP连接的SocketChannel
- //这个SocketChannel大概可以理解为,底层有一个Socket是跟客户端进行连接的
- //这个SocketChannel就是联通到那个Socket上去,负责进行网络数据的读写的
- //下面配置成非阻塞的
- channel.configureBlocking(false);
- //将该SocketChannel注册到selector上,且仅仅关注READ请求,也就是关注发送数据过来的请求
- channel.register(selector, SelectionKey.OP_READ);//步骤六:注册到selector + 关注OP_READ事件
- } else if (key.isReadable()) {//步骤七
- //如果这个SelectionKey是readable,也就是发送了数据过来,此时需要读取客户端发送过来的数据
- channel = (SocketChannel) key.channel();
- //读取请求数据的buffer缓冲
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- //通过底层的socket读取数据,写入buffer中,position可能就会变成比如21之类的
- //socket读取到了多少个字节,此时buffer的position就会变成多少
- int count = channel.read(buffer);//步骤七
- System.out.println("[" + Thread.currentThread().getName() + "]接收到请求");
- if (count > 0) {
- //设置position = 0,limit = 21,仅仅读取buffer中,0~21这段刚刚写入进去的数据
- buffer.flip();
- System.out.println("服务端接收请求:" + new String(buffer.array(), 0, count));
- channel.register(selector, SelectionKey.OP_WRITE);//步骤七:注册到selector + 关注OP_WRITE事件
- }
- } else if (key.isWritable()) {//步骤八
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- buffer.put("Server收到了".getBytes());
- buffer.flip();
- channel = (SocketChannel) key.channel();
- channel.write(buffer);//步骤八
- channel.register(selector, SelectionKey.OP_READ);//步骤八:注册到selector + 关注OP_READ事件
- }
- } catch(Throwable t) {
- t.printStackTrace();
- if (channel != null) {
- channel.close();
- }
- }
- }
- }
复制代码
8.NIO客户端的创建流程
步调一:通过SocketChannel的open()方法打开SocketChannel。
步调二:设置SocketChannel为非壅闭模式,同时设置TCP参数。
步调三:调用SocketChannel的connect()方法异步毗连服务端,完成TCP三次握手并创建物理链路。
步调四:通过Selector的open()方法创建多路复用器Selector,并将已打开的SocketChannel注册到多路复用器Selector上以及监听CONNECT事件。
步调五:多路复用器Selector通过select()方法轮询预备停当的SelectionKey。
步调六:如果这个SelectionKey是connectable,说明服务端继承了发起的毗连请求,于是将SocketChannel注册到多路复用器Selector上以及监听READ事件。
步调七:如果这个SelectionKey是readable,说明服务端返回了数据。于是调用SocketChannel的read()方法异步读取服务端返回的数据到ByteBuffer缓冲区,同时将客户端毗连SocketChannel注册到多路复用器Selector上以及监听WRITE事件。
步调八:接着对ByteBuffer缓冲区的数据进行decode解码处理并完成业务逻辑,然后再将处理效果对象encode编码放入ByteBuffer缓冲区,末了调用SocketChannel的write()方法异步发送给客户端,以及将客户端毗连SocketChannel注册到多路复用器Selector上以及监听READ事件。
- public class NIOClient {
- public static void main(String[] args) {
- //启动10个线程去和服务端建立通信
- for (int i = 0; i < 10; i++) {
- new Worker().start();
- }
- }
- static class Worker extends Thread {
- @Override
- public void run() {
- SocketChannel channel = null;
- Selector selector = null;
- try {
- //SocketChannel底层就是封装了一个Socket
- //SocketChannel是通过底层的Socket网络连接上服务端的数据通道,负责基于网络读写数据
- channel = SocketChannel.open();//步骤一
- //配置成非阻塞的
- channel.configureBlocking(false);//步骤二
- //底层会发起了一个TCP三次握手尝试建立连接
- channel.connect(new InetSocketAddress("localhost", 9000));//步骤三
- selector = Selector.open();
- //监听connect行为
- channel.register(selector, SelectionKey.OP_CONNECT);//步骤四:注册到selector + 关注OP_CONNECT事件
- while(true) {
- //三次握手成功后,服务端会给客户端返回一个响应请求,通过如下代码把响应请求拿到
- selector.select();//步骤五
- Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
- while(keysIterator.hasNext()) {
- SelectionKey key = (SelectionKey) keysIterator.next();
- keysIterator.remove();
- //如果server返回的是一个connectable的消息
- if (key.isConnectable()) {//步骤六
- channel = (SocketChannel) key.channel();
- if (channel.isConnectionPending()) {
- //一旦建立连接成功了以后,此时就可以给server发送一个数据了
- channel.finishConnect();
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- buffer.put("你好".getBytes());
- buffer.flip();
- channel.write(buffer);
- }
- //接下来监听READ事件就是准备读服务端返回的数据
- channel.register(selector, SelectionKey.OP_READ);//步骤六:注册到selector + 关注OP_READ事件
- } else if (key.isReadable()) {//步骤七:说明服务器端返回了一条数据可以读了
- channel = (SocketChannel) key.channel();
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- int len = channel.read(buffer);//把数据写入buffer,position推进到读取的字节数数字
- if (len > 0) {
- System.out.println("[" + Thread.currentThread().getName() + "]收到响应:" + new String(buffer.array(), 0, len));
- Thread.sleep(5000);
- //睡眠5秒后,接下来继续监听WRITE事件,并准备写数据给服务端
- channel.register(selector, SelectionKey.OP_WRITE);//步骤七:注册到selector + 关注OP_WRITE事件
- }
- } else if (key.isWritable()) {//步骤八
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- buffer.put("重复你好了".getBytes());
- buffer.flip();
- channel = (SocketChannel) key.channel();
- channel.write(buffer);
- //再次重复发数据给服务端后,接下来要监听READ事件就是准备读服务端返回的数据
- channel.register(selector, SelectionKey.OP_READ);//步骤八:注册到selector + 关注OP_READ事件
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (channel != null){
- try {
- channel.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- if (selector != null) {
- try {
- selector.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
- }
复制代码
9.NIO优点总结
优点一:SocketChannel的毗连操作是异步的
也就是客户端发起的毗连操作SocketChannel.connect()是异步的。可以将SocketChannel注册到多路复用器上并关注OP_CONNECT事件等待后续效果,不需要像BIO的客户端那样被同步壅闭。
优点二:SocketChannel的读写操作都是异步的
也就是客户端发起的读写操作SocketChannel.read()和write()是异步的。如果没有可读写的数据它不会同步等待,而会直接返回。如许IO通信链路就可以处理其他链路了,不需要同步等待这个链路可用。
优点三:优化了线程模子
由于JDK的Selector在Linux等主流操作系统上通过epoll实现,从而没有了毗连句柄数的限制。这意味着一个Selector线程可以同时处理成千上万个客户端毗连,而且性能不会随客户端增长而线性下降。注意:Selector.select()是同步壅闭的。
优点四:优化了IO读写
BIO的读写是面向流的,一次性只能从流中读取一子节或多字节,而且读完后流无法再读取,需要本身缓存数据。NIO的读写是面向Buffer的,可随意读取里面任何字节的数据。不需要本身缓存数据,只需要移动读写指针即可。
10.NIO标题总结
标题一:NIO的类库和API繁杂,需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
标题二:处理常见标题的工作量和难度比较大,比如客户断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流等。
标题三:NIO的epoll bug会导致Selector空轮询,最终导致CPU到达100%。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |