ToB企服应用市场:ToB评测及商务社交产业平台

标题: Java 网络编程 —— 异步通道和异步运算结果 [打印本页]

作者: 汕尾海湾    时间: 2023-5-21 13:26
标题: Java 网络编程 —— 异步通道和异步运算结果
从 JDK7 开始,引入了表示异步通道的 AsynchronousSockerChannel 类和 AsynchronousServerSocketChannel 类,这两个类的作用与 SocketChannel 类和 ServerSockelChannel 相似,区别在于异步通道的一些方法总是采用非阻塞模式,并且它们的非阻塞方法会立即返回一个 Future 对象,用来存放方法的异步运算结果
AsynchronousSocketChannel 类有以下非阻塞方法:
  1. // 连接远程主机
  2. Future<Void> connect(SocketAddress remote);
  3. // 从通道中读入数据,存放到ByteBuffer中
  4. // Future对象包含了实际从通道中读到的字节数
  5. Future<Inleger> read(ByteBuffer dst);
  6. // 把ByteBuffer的数据写入通道
  7. // Future对象包含了实际写入通道的字节数
  8. Future<Integer> write(ByteBuffer src);
复制代码
AsynchronousServerSocketChannel 类有以下非阻塞方法:
  1. // 接受客户连接请求
  2. // Future对象包含连接建立成功后创建的AsynchronousSockelChannel对象
  3. Future<AsynchronousSocketChannel> accept();
复制代码
使用异步通道,可以使程序并行执行多个异步操作,例如:
  1. SocketAddress socketAddress = ...;
  2. AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
  3. //请求建立连接
  4. Future<Void> connected = client.connect(socketAddress);
  5. ByteBuffer byteBuffer = ByteBuffer.allocate(128);
  6. //执行其他操作
  7. //...
  8. //等待连接完成
  9. connected.get();
  10. //读取数据
  11. Future<Integer> future = client.read(byteBuffer);
  12. //执行其他操作
  13. //...
  14. //等待从通道读取数据完成
  15. future.get();
  16. byteBuffer.flip();
  17. WritableByteChannel out = Channels.newChannel(System.out);
  18. out.write(byteBuffer);
复制代码
下例的代码演示了异步通道的用法,它不断接收用户输入的域名并尝试建立连接,最后打印建立连接所花费的时间。如果程序无法连接到指定的主机,就打印相关错误信息。如果用户输入 bye,就结束程序
  1. //表示连接一个主机的结果
  2. class PingResult {
  3.    
  4.     InetSocketAddress address;
  5.     long connectStart; //开始连接时的时间
  6.     long connectFinish = 0; //连接成功时的时间
  7.     String failure;
  8.     Future<Void> connectResult; //连接操作的异步运算结果
  9.     AsynchronousSocketChannel socketChannel;
  10.     String host;
  11.     final String ERROR = "连接失败";
  12.         
  13.     PingResult(String host) {
  14.         try {
  15.             this.host = host;
  16.             address = new InetSocketAddress(InetAddress.getByName(host), 80);
  17.         } catch (IOException x) {
  18.             failure = ERROR;
  19.         }
  20.     }
  21.    
  22.     //打印连接一个主机的执行结果
  23.     public void print() {
  24.         String result;
  25.         if (connectFinish != 0) {
  26.             result = Long.toString(connectFinish - connectStart) + "ms";
  27.         } else if (failure != null) {
  28.                         result = failure;
  29.         } else {
  30.             result = "Timed out";
  31.         }
  32.         System,out,println("ping "+ host + "的结果" + ":" + result);
  33.     }
  34.    
  35.     public class PingClient {
  36.         //存放所有PingResult结果的队列
  37.         private LinkedList<PingResult> pingResults = new Linkedlist<PingResult>();
  38.         boolean shutdown = false;
  39.         ExecutorService executorService;
  40.         
  41.         public PingClient() throws IOException {
  42.             executorService = Executors.newFixedThreadPool(4);
  43.             executorService.execute(new Printer());
  44.             receivePingAddress();
  45.         }
  46.     }
  47.    
  48.     public static void main(String args[]) throws IOException {
  49.         new PingClient();
  50.     }
  51.    
  52.     /*接收用户输入的主机地址,由线程池执行PingHandler任务 */
  53.     public void receivePingAddress() {
  54.         try {
  55.             BufferedReader localReader = new BufferedReader(new InputStreamReader(System.in));
  56.             String msg = null;
  57.             //接收用户输入的主机地址
  58.             while((msg = localReader.readLine()) != null) {
  59.                 if(msg.equals("bye")) {
  60.                     shutdown = true;
  61.                     executorService.shutdown();
  62.                     break;
  63.                 }
  64.                 executorService.execute(new PingHandler(msg));
  65.             }
  66.         } catch(IOException e) {}
  67.     }
  68.    
  69.     /* 尝试连接特定主机,生成一个PingResult对象,把它加入PingResults结果队列中 */
  70.     public class PingHandler implements Runnable {
  71.         String msg;
  72.         public PingHandler(String msg) {
  73.             this.msg = msg;
  74.         }
  75.         public void run() {
  76.             if(!msg.equals("bye")) {
  77.                 PingResult pingResult = new PingResult(msg);
  78.                 AsynchronousSocketChannel socketChannel = null;
  79.                 try {
  80.                     socketChannel = AsynchronousSocketChannel.open();
  81.                     pingResult.connectStart = System.currentTimeMillis();
  82.                     synchronized (pingResults) {
  83.                         //向pingResults队列加入一个PingResult对象
  84.                         pingResults.add(pingResult);
  85.                         pingResults,notify();
  86.                     }
  87.                     Future<Void> connectResult = socketChannel.connect(pingResult.address);
  88.                     pingResult.connectResult = connectResult;
  89.                 } catch (Exception x) {
  90.                     if (socketChannel != null) {
  91.                         try { socketChannel.close();} catch (IOException e) {)
  92.                     }
  93.                     pingResult.failure = pingResult.ERROR;
  94.                 }
  95.             }
  96.         }
  97.     }
  98.    
  99.     /* 打印PingResults结果队列中已经执行完毕的任务的结果 */
  100.     public class Printer implements Runnable {
  101.         public void run() {
  102.             PingResult pingResult = null;
  103.             while(!shutdown) {
  104.                 synchronized (pingResults) {
  105.                     while (!shutdown && pingResults.size() == 0 ) {
  106.                         try {
  107.                             pingResults.wait(100);
  108.                         } catch(InterruptedException e) {
  109.                             e.printStackTrace();
  110.                         }
  111.                     }
  112.                     if(shutdown && pingResults.size() == 0 ) break;
  113.                     pingResult = pingResults.getFirst();
  114.                     
  115.                     try {
  116.                         if(pingResult.connectResult != null) {
  117.                             pingResult.connectResult.get(500, TimeUnit,MILLISECONDS);
  118.                         } catch(Exception e) {
  119.                             pingResult.failure = pingResult.ERROR;
  120.                         }
  121.                     }
  122.                     
  123.                     if(pingResult.connectResult != null && pingResult.connectResult.isDone()) {
  124.                         pingResult.connectFinish = System.currentTimeMillis();
  125.                     }
  126.                     
  127.                     if(pingResult,connectResult != null && pingResult.connectResult.isDone() || || pingResult,failure != null) {
  128.                         pingResult.print();
  129.                         pingResults.removeFirst();
  130.                         try {
  131.                             pingResult.socketChannel.close();
  132.                         } catch (IOException e) {}
  133.                     }
  134.                 }
  135.             }
  136.         }
  137.     }
  138. }
复制代码
PingClient 类定义了两个表示特定任务的内部类:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4