Java 网络编程 —— 实现非阻塞式的服务器

打印 上一主题 下一主题

主题 917|帖子 917|积分 2751

创建阻塞的服务器

当 ServerSocketChannel 与 SockelChannel 采用默认的阻塞模式时,为了同时处理多个客户的连接,必须使用多线程
  1. public class EchoServer {
  2.    
  3.         private int port = 8000;
  4.     private ServerSocketChannel serverSocketChannel = null;
  5.     private ExecutorService executorService; //线程池
  6.     private static final int POOL_MULTIPLE = 4; //线程池中工作线程的数目
  7.    
  8.     public EchoServer() throws IOException {
  9.         //创建一个线程池
  10.         executorService = Executors.newFixedThreadPool(
  11.             Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE);
  12.         //创建一个ServerSocketChannel对象
  13.         serverSocketChannel = ServerSocketChannel.open();
  14.         //使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时,可以顺利绑定相同的端口
  15.         serverSocketChannel.socket().setReuseAddress(true);
  16.         //把服务器进程与一个本地端口绑定
  17.         serverSocketChannel.socket().bind(new InetSocketAddress(port));
  18.         System.out.println("服务器启动");
  19.     }
  20.    
  21.     public void service() {
  22.         while (true) {
  23.             SocketChannel socketChannel = null;
  24.             try {
  25.                 socketChannel = serverSocketChannel.accept();
  26.                 //处理客户连接
  27.                 executorService.execute(new Handler(socketChannel));
  28.             } catch(IOException e) {
  29.                 e.printStackTrace();
  30.             }
  31.         }
  32.     }
  33.    
  34.     public static void main(String args[])throws IOException {
  35.         new EchoServer().service();
  36.     }
  37.    
  38.     //处理客户连按
  39.     class Handler implements Runnable {
  40.         private SocketChannel socketChannel;
  41.                
  42.         public Handler(SocketChannel socketChannel) {
  43.             this.socketChannel = socketChannel;
  44.         }
  45.         
  46.         public void run() {
  47.             handle(socketChannel);
  48.         }
  49.         
  50.         public void handle(SocketChannel socketChannel) {
  51.             try {
  52.                 //获得与socketChannel关联的Socket对象
  53.                 Socket socket = socketChannel.socket();
  54.                 System.out.println("接收到客户连接,来自:" + socket.getInetAddress() + ":" + socket.getPort());
  55.                
  56.                 BufferedReader br = getReader(socket);
  57.                 PrintWriter pw = getWriter(socket);
  58.                
  59.                 String msg = null;
  60.                 while ((msg = br.readLine()) != null) {
  61.                     System.out.println(msg);
  62.                     pw.println(echo(msg));
  63.                     if (msg.equals("bye")) {
  64.                         break;
  65.                     }
  66.                 }
  67.             } catch (IOException e) {
  68.                 e.printStackTrace();
  69.             } finally {
  70.                 try {
  71.                     if(socketChannel != null) {
  72.                         socketChannel.close();
  73.                     } catch (IOException e) {
  74.                         e.printStackTrace();
  75.                     }
  76.                 }
  77.             }
  78.         }
  79.     }
  80.    
  81.     private PrintWriter getWriter(Socket socket) throws IOException {
  82.         OutputStream socketOut = socket.getOutputStream();
  83.         return new PrintWriter(socketOut,true);
  84.     }
  85.    
  86.     private BufferedReader getReader(Socket socket) throws IOException {
  87.         InputStream socketIn = socket.getInputStream();
  88.         return new BufferedReader(new InputStreamReader(socketIn));
  89.     }
  90.    
  91.     public String echo(String msg) {
  92.         return "echo:" + msg;
  93.     }
  94. }
复制代码
创建非阻塞的服务器

在非阻塞模式下,EchoServer 只需要启动一个主线程,就能同时处理三件事:

  • 接收客户的连接
  • 接收客户发送的数据
  • 向客户发回响应数据
EchoServer 委托 Selector 来负责监控接收连接就绪事件、读就绪事件和写就绪事件如果有特定事件发生,就处理该事件
  1. // 创建一个Selector对象
  2. selector = Selector.open();
  3. //创建一个ServerSocketChannel对象
  4. serverSocketChannel = ServerSocketChannel.open();
  5. //使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时
  6. //可以顺利绑定到相同的端口
  7. serverSocketChannel.socket().setReuseAddress(true);
  8. //使ServerSocketChannel工作于非阻塞模式
  9. serverSocketChannel.configureBlocking(false):
  10. //把服务器进程与一个本地端口绑定
  11. serverSocketChannelsocket().bind(new InetSocketAddress(port));
复制代码
EchoServer 类的 service() 方法负责处理本节开头所说的三件事,体现其主要流程的代码如下:
  1. public void service() throws IOException {
  2.     serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);
  3.     //第1层while循环
  4.     while(selector.select() > 0) {
  5.         //获得Selector的selected-keys集合
  6.         Set readyKeys = selector.selectedKeys();
  7.         Iterator it = readyKeys.iterator();
  8.         //第2层while循环
  9.         while (it.hasNext()) {
  10.             SelectionKey key = null;
  11.             //处理SelectionKey
  12.             try {
  13.                 //取出一个SelectionKey
  14.                 key = (SelectionKey) it.next();
  15.                 //把 SelectionKey从Selector 的selected-key 集合中删除
  16.                 it.remove();
  17.                 1f (key.isAcceptable()) { 处理接收连接就绪事件; }
  18.                 if (key.isReadable()) { 处理读就绪水件; }
  19.                 if (key.isWritable()) { 处理写就绪事件; }
  20.             } catch(IOException e) {
  21.                 e.printStackTrace();
  22.                 try {
  23.                     if(key != null) {
  24.                         //使这个SelectionKey失效
  25.                         key.cancel();
  26.                         //关闭与这个SelectionKey关联的SocketChannel
  27.                         key.channel().close();
  28.                     }
  29.                 } catch(Exception ex) {
  30.                     e.printStackTrace();
  31.                 }
  32.             }
  33.         }
  34.     }
  35. }
复制代码

  • 首先由 ServerSocketChannel 向 Selector 注册接收连接就绪事件,如果 Selector 监控到该事件发生,就会把相应的 SelectionKey 对象加入 selected-keys 集合
  • 第一层 while 循环,不断询问 Selector 已经发生的事件,select() 方法返回当前相关事件已经发生的 SelectionKey 的个数,如果当前没有任何事件发生,该方法会阻塞下去,直到至少有一个事件发生。Selector 的 selectedKeys() 方法返回 selected-keys 集合,它存放了相关事件已经发生的 SelectionKey 对象
  • 第二层 while 循环,从 selected-keys 集合中依次取出每个 SelectionKey 对象并从集合中删除,,然后调用 isAcceptable()、isReadable() 和 isWritable() 方法判断到底是哪种事件发生了,从而做出相应的处理
1. 处理接收连接就绪事件
  1. if (key.isAcceptable()) {
  2.     //获得与SelectionKey关联的ServerSocketChannel
  3.     ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
  4.     //获得与客户连接的SocketChannel
  5.     SocketChannel socketChannel = (SocketChannel) ssc.accept();
  6.     //把Socketchannel设置为非阻塞模式
  7.     socketChannel.configureBlocking(false);
  8.     //创建一个用于存放用户发送来的数据的级冲区
  9.     ByteBuffer buffer = ByteBuffer.allocate(1024);
  10.     //Socketchannel向Selector注册读就绪事件和写就绪事件
  11.     socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
  12. }
复制代码
2. 处理读就绪事件
  1. public void receive(SelectionKey key) throws IOException {
  2.     //获得与SelectionKey关联的附件
  3.     ByteBuffer buffer = (ByteBuffer) key.attachment();
  4.     //获得与SelectionKey关联的Socketchannel
  5.     SocketChannel socketChannel = (SocketChannel)key.channel();
  6.     //创建一个ByteBuffer用于存放读到的数据
  7.     ByteBuffer readBuff = ByteBuffer.allocate(32);
  8.     socketChannel.read(readBuff);
  9.     readBuff.flip();
  10.     //把buffer的极限设为容量
  11.     buffer.limit(buffer.capacity());
  12.     //把readBuff中的内容拷贝到buffer
  13.     buffer.put(readBuff);
  14. }
复制代码
3. 处理写就绪事件
  1. public void send(SelectionKey key) throws IOException {
  2.     //获得与SelectionKey关联的ByteBuffer
  3.     ByteBuffer buffer = (ByteBuffer) key.attachment();
  4.     //获得与SelectionKey关联的SocketChannel
  5.     SocketChannel socketChannel = (SocketChannel) key.channel();
  6.     buffer.flip();
  7.     //按照GBK编码把buffer中的字节转换为字符串
  8.     String data = decode(buffer);
  9.     //如果还没有读到一行数据就返回
  10.     if(data.indexOf("\r\n") == -1)
  11.         return;
  12.     //截取一行数据
  13.     String outputData = data.substring(0, data.indexOf("\n") + 1);
  14.     //把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中
  15.     ByteBuffer outputBuffer = encode("echo:" + outputData);
  16.     //输出outputBuffer的所有字节
  17.     while(outputBuffer,hasRemaining())
  18.         socketChannel.write(outputBuffer);
  19.     //把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer
  20.     ByteBuffer temp = encode(outputData);
  21.     //把buffer的位置设为temp的极限
  22.     buffer.position(temp.limit()):
  23.     //删除buffer已经处理的数据
  24.     buffer.compact();
  25.     //如果已经输出了字符串“bye\r\n”,就使SelectionKey失效,并关闭SocketChannel
  26.     if(outputData.equals("bye\r\n")) {
  27.         key.cancel();
  28.         socketChannel.close();
  29.     }
  30. }
复制代码
完整代码如下:
  1. public class EchoServer {
  2.    
  3.         private int port = 8000;
  4.     private ServerSocketChannel serverSocketChannel = null;
  5.     private Selector selector;
  6.     private Charset charset = Charset.forName("GBK");
  7.         public EchoServer() throws IOException {
  8.         // 创建一个Selector对象
  9.         selector = Selector.open();
  10.         //创建一个ServerSocketChannel对象
  11.         serverSocketChannel = ServerSocketChannel.open();
  12.         //使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时
  13.         //可以顺利绑定到相同的端口
  14.         serverSocketChannel.socket().setReuseAddress(true);
  15.         //使ServerSocketChannel工作于非阻塞模式
  16.         serverSocketChannel.configureBlocking(false):
  17.         //把服务器进程与一个本地端口绑定
  18.         serverSocketChannelsocket().bind(new InetSocketAddress(port));
  19.     }
  20.    
  21.     public void service() throws IOException {
  22.         serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);
  23.         //第1层while循环
  24.         while(selector.select() > 0) {
  25.             //获得Selector的selected-keys集合
  26.             Set readyKeys = selector.selectedKeys();
  27.             Iterator it = readyKeys.iterator();
  28.             //第2层while循环
  29.             while (it.hasNext()) {
  30.                 SelectionKey key = null;
  31.                 //处理SelectionKey
  32.                 try {
  33.                     //取出一个SelectionKey
  34.                     key = (SelectionKey) it.next();
  35.                     //把 SelectionKey从Selector 的selected-key 集合中删除
  36.                     it.remove();
  37.                     1f (key.isAcceptable()) {
  38.                          //获得与SelectionKey关联的ServerSocketChannel
  39.                         ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
  40.                         //获得与客户连接的SocketChannel
  41.                         SocketChannel socketChannel = (SocketChannel) ssc.accept();
  42.                         //把Socketchannel设置为非阻塞模式
  43.                         socketChannel.configureBlocking(false);
  44.                         //创建一个用于存放用户发送来的数据的级冲区
  45.                         ByteBuffer buffer = ByteBuffer.allocate(1024);
  46.                         //Socketchannel向Selector注册读就绪事件和写就绪事件
  47.                         socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
  48.                     }
  49.                     if (key.isReadable()) { receive(key); }
  50.                     if (key.isWritable()) { send(key); }
  51.                 } catch(IOException e) {
  52.                     e.printStackTrace();
  53.                     try {
  54.                         if(key != null) {
  55.                             //使这个SelectionKey失效
  56.                             key.cancel();
  57.                             //关闭与这个SelectionKey关联的SocketChannel
  58.                             key.channel().close();
  59.                         }
  60.                     } catch(Exception ex) {
  61.                         e.printStackTrace();
  62.                     }
  63.                 }
  64.             }
  65.         }
  66.     }
  67.    
  68.     public void receive(SelectionKey key) throws IOException {
  69.         //获得与SelectionKey关联的附件
  70.         ByteBuffer buffer = (ByteBuffer) key.attachment();
  71.         //获得与SelectionKey关联的Socketchannel
  72.         SocketChannel socketChannel = (SocketChannel)key.channel();
  73.         //创建一个ByteBuffer用于存放读到的数据
  74.         ByteBuffer readBuff = ByteBuffer.allocate(32);
  75.         socketChannel.read(readBuff);
  76.         readBuff.flip();
  77.         //把buffer的极限设为容量
  78.         buffer.limit(buffer.capacity());
  79.         //把readBuff中的内容拷贝到buffer
  80.         buffer.put(readBuff);
  81.     }
  82.    
  83.     public void send(SelectionKey key) throws IOException {
  84.         //获得与SelectionKey关联的ByteBuffer
  85.         ByteBuffer buffer = (ByteBuffer) key.attachment();
  86.         //获得与SelectionKey关联的SocketChannel
  87.         SocketChannel socketChannel = (SocketChannel) key.channel();
  88.         buffer.flip();
  89.         //按照GBK编码把buffer中的字节转换为字符串
  90.         String data = decode(buffer);
  91.         //如果还没有读到一行数据就返回
  92.         if(data.indexOf("\r\n") == -1)
  93.             return;
  94.         //截取一行数据
  95.         String outputData = data.substring(0, data.indexOf("\n") + 1);
  96.         //把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中
  97.         ByteBuffer outputBuffer = encode("echo:" + outputData);
  98.         //输出outputBuffer的所有字节
  99.         while(outputBuffer,hasRemaining())
  100.             socketChannel.write(outputBuffer);
  101.         //把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer
  102.         ByteBuffer temp = encode(outputData);
  103.         //把buffer的位置设为temp的极限
  104.         buffer.position(temp.limit()):
  105.         //删除buffer已经处理的数据
  106.         buffer.compact();
  107.         //如果已经输出了字符串“bye\r\n”,就使SelectionKey失效,并关闭SocketChannel
  108.         if(outputData.equals("bye\r\n")) {
  109.             key.cancel();
  110.             socketChannel.close();
  111.         }
  112.     }
  113.    
  114.     //解码
  115.     public String decode(ByteBuffer buffer) {
  116.         CharBuffer charBuffer = charset.decode(buffer);
  117.         return charBuffer.toStrinq();
  118.     }
  119.    
  120.     //编码
  121.     public ByteBuffer encode(String str) {
  122.         return charset.encode(str);
  123.     }
  124.    
  125.     public static void main(String args[])throws Exception {
  126.         EchoServer server = new EchoServer();
  127.         server.service();
  128.     }
  129. }
复制代码
阻塞模式与非阻塞模式混合使用

使用非阻塞模式时,ServerSocketChannel 以及 SocketChannel 都被设置为非阻塞模式,这使得接收连接、接收数据和发送数据的操作都采用非阻塞模式,EchoServer 采用一个线程同时完成这些操作
假如有许多客户请求连接,可以把接收客户连接的操作单独由一个线程完成,把接收数据和发送数据的操作由另一个线程完成,这可以提高服务器的并发性能
负责接收客户连接的线程按照阻塞模式工作,如果收到客户连接,就向 Selector 注册读就绪和写就绪事件,否则进入阻塞状态,直到接收到了客户的连接。负责接收数据和发送数据的线程按照非阻塞模式工作,只有在读就绪或写就绪事件发生时,才执行相应的接收数据和发送数据操作
  1. public class EchoServer {
  2.    
  3.         private int port = 8000;
  4.     private ServerSocketChannel serverSocketChannel = null;
  5.     private Selector selector = null;
  6.     private Charset charset = Charset.forName("GBK");
  7.         public EchoServer() throws IOException {
  8.         selector = Selector.open();
  9.         serverSocketChannel = ServerSocketChannel.open();
  10.         serverSocketChannel.socket().setReuseAddress(true);
  11.         serverSocketChannelsocket().bind(new InetSocketAddress(port));
  12.     }
  13.    
  14.     public void accept() {
  15.         while(true) {
  16.             try {
  17.                 SocketChannel socketChannel = serverSocketChannel.accept();
  18.                 socketChannel.configureBlocking(false);
  19.                
  20.                 ByteBuffer buffer = ByteBuffer.allocate(1024);
  21.                 synchronized(gate) {
  22.                     selector.wakeup();
  23.                     socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
  24.                 }
  25.             } catch(IOException e) {
  26.                 e.printStackTrace();
  27.             }
  28.         }
  29.     }
  30.    
  31.     private Object gate=new Object();
  32.    
  33.     public void service() throws IOException {
  34.         while(true) {
  35.             synchronized(gate){}
  36.             int n = selector.select();
  37.             if(n == 0) continue;
  38.             Set readyKeys = selector.selectedKeys();
  39.             Iterator it = readyKeys.iterator();
  40.             while (it.hasNext()) {
  41.                 SelectionKey key = null;
  42.                 try {
  43.                                     it.remove();
  44.                     if (key.isReadable()) {
  45.                         receive(key);
  46.                     }
  47.                     if (key.isWritable()) {
  48.                         send(key);
  49.                     }
  50.                 } catch(IOException e) {
  51.                     e.printStackTrace();
  52.                     try {
  53.                         if(key != null) {
  54.                             key.cancel();
  55.                             key.channel().close();
  56.                         }
  57.                     } catch(Exception ex) { e.printStackTrace(); }
  58.                 }
  59.             }
  60.         }
  61.     }
  62.    
  63.     public void receive(SelectionKey key) throws IOException {
  64.         ...
  65.     }
  66.    
  67.     public void send(SelectionKey key) throws IOException {
  68.         ...
  69.     }
  70.    
  71.     public String decode(ByteBuffer buffer) {
  72.         ...
  73.     }
  74.    
  75.     public ByteBuffer encode(String str) {
  76.         ...
  77.     }
  78.    
  79.     public static void main(String args[])throws Exception {
  80.         final EchoServer server = new EchoServer();
  81.         Thread accept = new Thread() {
  82.             public void run() {
  83.                 server.accept();
  84.             }
  85.         };
  86.         accept.start();
  87.                 server.service();
  88.     }
  89. }
复制代码
注意一点:主线程的 selector select() 方法和 Accept 线程的 register(...) 方法都会造成阻塞,因为他们都会操作 Selector 对象的共享资源 all-keys 集合,这有可能会导致死锁
导致死锁的具体情形是:Selector 中尚没有任何注册的事件,即 all-keys 集合为空,主线程执行 selector.select() 方法时将进入阻塞状态,只有当 Accept 线程向 Selector 注册了事件,并且该事件发生后,主线程才会从 selector.select() 方法返回。然而,由于主线程正在 selector.select() 方法中阻塞,这使得 Acccept 线程也在 register() 方法中阻塞。Accept 线程无法向 Selector 注册事件,而主线程没有任何事件可以监控,所以这两个线程将永远阻塞下去
为了避免对共享资源的竞争,同步机制使得一个线程执行 register() 时,不允许另一个线程同时执行 select() 方法,反之亦然


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

石小疯

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表