Netty条记3:NIO编程

打印 上一主题 下一主题

主题 1960|帖子 1960|积分 5880

Netty条记1:线程模型
Netty条记2:零拷贝
Netty条记3:NIO编程
Netty条记4:Epoll
Netty条记5:Netty开发实例
Netty条记6:Netty组件
Netty条记7:ChannelPromise关照处理
Netty条记8:ByteBuf利用介绍
Netty条记9:粘包半包
Netty条记10:LengthFieldBasedFrameDecoder很简朴
Netty条记11:编解码器
Netty条记12:模仿Web服务器
Netty条记13:序列化

  
前言

想要快速明白NIO编程,必要先明白上篇的零拷贝技术和线程模型,本篇是对这两个知识的实践,也是netty的过度。
编程示例

我们尝试写一个NIO程序:
必要注意的是:

  • 举行网络传输时,涉及到的数据,必须要颠末缓冲区,不管是发送还是接收,团结用户态和内核态的切换过程就可以明白;
  • NIO中的缓冲可以利用堆内存缓存和直接内存缓冲,这个必要团结零拷贝技术可以明白;
  • 多路复用利用selector模式,必要循环遍历socket;
注:buf在堆上。在举行数据发送时,如果利用堆内存,在JVM之外创建一个DirectBuf,然后把堆上的数据拷贝的这个DirectBuf,再写到SendBuf中,因为JVM中存在GC机制,如果利用引用方式,在拷贝过程中出现GC,会重新分配地点,导致数据出现题目。
服务端:
  1. public class ServerHandle implements Runnable{
  2.     private Selector selector;
  3.     private ServerSocketChannel serverSocketChannel;
  4.     public ServerHandle(int port) {
  5.         try {
  6.             selector = Selector.open();
  7.             serverSocketChannel = ServerSocketChannel.open();
  8.             // channel必须处于非阻塞模式下,不然会报错,所以不能同FileChannel一起使用
  9.             serverSocketChannel.configureBlocking(false);
  10.             serverSocketChannel.socket().bind(new InetSocketAddress(port));
  11.             // 注册对应的事件,这里注册的是accept事件,只要监听到就会调用对应的处理器
  12.             // 注册还可以添加附加对象,也就是第三个参数,获取方式:key.attachment();
  13.             // 如果注册了事件,需要取消,需要调用channel.cancel()
  14.             serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  15.             System.out.println("服务端已准备好:" + port);
  16.         } catch (IOException e) {
  17.             throw new RuntimeException(e);
  18.         }
  19.     }
  20.     @Override
  21.     public void run() {
  22.         while (true) {
  23.             try {
  24.                 // 阻塞直到通道就绪,这边设置了超时时间
  25.                 // 返回值:有多少通道就绪
  26.                 selector.select(1000);
  27.                 // 在通道就绪时,获取对应的键,也就是事件
  28.                 Set<SelectionKey> selectionKeys = selector.selectedKeys();
  29.                 Iterator<SelectionKey> iterator = selectionKeys.iterator();
  30.                 while (iterator.hasNext()) {
  31.                     SelectionKey key = iterator.next();
  32.                     iterator.remove();
  33.                     // 根据事件的类型进行对应的处理
  34.                     handlerInput(key);
  35.                 }
  36.             } catch (Exception e) {
  37.                 throw new RuntimeException(e);
  38.             }
  39.         }
  40.     }
  41.     private void handlerInput(SelectionKey key) throws IOException {
  42.         // 在这个循环中,可能存在key.cancel() 或者移除,所以这里需要判断是否有效
  43.         if (!key.isValid()) {
  44.             return;
  45.         }
  46.         try {
  47.             if (key.isAcceptable()) {
  48.                 // 这里处理客户端连接服务端的事件
  49.                 ServerSocketChannel channel = (ServerSocketChannel)key.channel();
  50.                 try {
  51.                     // 接收请求
  52.                     SocketChannel sc = channel.accept();
  53.                     System.out.println("-----建立连接------");
  54.                     // 设置该通道非阻塞
  55.                     sc.configureBlocking(false);
  56.                     // 并注册read事件,监听着
  57.                     sc.register(selector, SelectionKey.OP_READ);
  58.                 } catch (IOException e) {
  59.                     System.out.println("连接客户端失败!");
  60.                     key.cancel();
  61.                     channel.close();
  62.                 }
  63.             }
  64.             if (key.isReadable()) {
  65.                 SocketChannel sc = (SocketChannel)key.channel();
  66.                 ByteBuffer buffer = ByteBuffer.allocate(1024);
  67.                 int read = sc.read(buffer);
  68.                 if (read > 0) {
  69.                     // 反转:将这个缓冲中的数据从现在的位置变成从0开始
  70.                     buffer.flip();
  71.                     byte[] bytes = new byte[buffer.remaining()];
  72.                     buffer.get(bytes);
  73.                     String msg = new String(bytes, StandardCharsets.UTF_8);
  74.                     System.out.println("服务器收到消息:" + msg);
  75.                     doWrite(sc, "hello,收到消息:" + msg);
  76.                 }
  77.             }
  78.         } catch (IOException e) {
  79.             System.out.println("数据处理失败!");
  80.             // 再处理失败,或异常时要退出通道,不然每次循环都会检查通道导致一致报错
  81.             key.channel().close();
  82.             key.cancel();
  83.         }
  84.     }
  85.     private void doWrite(SocketChannel sc, String msg) throws IOException {
  86.         byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
  87.         ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
  88.         buffer.put(bytes);
  89.         buffer.flip();
  90.         sc.write(buffer);
  91.     }
  92. }
复制代码
服务端启动:
  1.     public static void main(String[] args) {
  2.         ServerHandle serverHandle = new ServerHandle(8080);
  3.         new Thread(serverHandle).start();
  4.     }
复制代码
客户端:
  1. public class ClientHandle implements Runnable{
  2.     private String ip;
  3.     private int port;
  4.     private SocketChannel socketChannel;
  5.     private Selector selector;
  6.     public ClientHandle(String ip, int port) {
  7.         try {
  8.             this.ip = ip;
  9.             this.port = port;
  10.             selector = Selector.open();
  11.             socketChannel = SocketChannel.open();
  12.             // 非阻塞状态
  13.             socketChannel.configureBlocking(false);
  14.         } catch (IOException e) {
  15.             throw new RuntimeException(e);
  16.         }
  17.     }
  18.     @Override
  19.     public void run() {
  20.         try {
  21.             // 启动后,执行连接操作
  22.             if (socketChannel.connect(new InetSocketAddress(ip, port))) {
  23.                 // 连接服务端成功后,注册读取事件
  24.                 socketChannel.register(selector, SelectionKey.OP_READ);
  25.             } else {
  26.                 // 如果连接失败,则再注册连接事件,之后再进行处理
  27.                 socketChannel.register(selector, SelectionKey.OP_CONNECT);
  28.             }
  29.         } catch (IOException e) {
  30.             throw new RuntimeException(e);
  31.         }
  32.         while (true) {
  33.             try {
  34.                 // 阻塞1000秒直到有通道就绪
  35.                 selector.select(1000);
  36.                 Set<SelectionKey> selectionKeys = selector.selectedKeys();
  37.                 Iterator<SelectionKey> iterator = selectionKeys.iterator();
  38.                 while (iterator.hasNext()) {
  39.                     SelectionKey key = iterator.next();
  40.                     iterator.remove();
  41.                     handleInput(key);
  42.                 }
  43.             } catch (IOException e) {
  44.                 throw new RuntimeException(e);
  45.             }
  46.         }
  47.     }
  48.     public void sendMsg(String msg) throws IOException {
  49.         byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
  50.         ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
  51.         buffer.put(bytes);
  52.         buffer.flip();
  53.         socketChannel.write(buffer);
  54.     }
  55.     private void handleInput(SelectionKey key) throws IOException {
  56.         if (!key.isValid()) {
  57.             return;
  58.         }
  59.         SocketChannel sc = (SocketChannel) key.channel();
  60.         if (key.isConnectable()) {
  61.             if (sc.finishConnect()) {
  62.                 socketChannel.register(selector, SelectionKey.OP_READ);
  63.             } else {
  64.                 System.exit(1);
  65.             }
  66.         }
  67.         if (key.isReadable()) {
  68.             ByteBuffer buffer = ByteBuffer.allocate(1024);
  69.             int read = sc.read(buffer);
  70.             if (read > 0) {
  71.                 buffer.flip();
  72.                 byte[] bytes = new byte[buffer.remaining()];
  73.                 buffer.get(bytes);
  74.                 String msg = new String(bytes, StandardCharsets.UTF_8);
  75.                 System.out.println("客户端收到消息:" + msg);
  76.             } else if (read < 0) {
  77.                 key.cancel();;
  78.                 sc.close();
  79.             }
  80.         }
  81.     }
  82. }
复制代码
客户端启动:
  1. public static void main(String[] args) throws IOException {
  2.         ClientHandle handle = new ClientHandle("localhost", 8080);
  3.         new Thread(handle).start();
  4.         Scanner scanner = new Scanner(System.in);
  5.      // 死循环保持监听
  6.         while (true) {
  7.             // 每次控制台输入,就发送给服务端
  8.             handle.sendMsg(scanner.nextLine());
  9.         }
  10.     }
复制代码

总结

该示例对应于reactor单线程模型,服务端是一个单线程,通过selector单线程循环接收客户端的哀求,并辨认客户端哀求事件范例,举行分发处理,相对于BIO很显着的区别,就是它不会比及上一个哀求处理完成。在消息接收与发送的过程中,我们必要对缓冲数据举行处理,也就是对应于零拷贝知识点中提到的缓冲区概念,实例中用到了ByteBuffer对象,它是NIO中一个比较重要的对象,下一篇会举行说明。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

笑看天下无敌手

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表