ToB企服应用市场:ToB评测及商务社交产业平台

标题: 细谈 Linux 中的多路复用epoll [打印本页]

作者: 半亩花草    时间: 2024-11-5 09:19
标题: 细谈 Linux 中的多路复用epoll
各人好,我是 V 哥。在 Linux 中,epoll 是一种多路复用机制,用于高效地处置惩罚大量文件形貌符(file descriptor, FD)事件。与传统的select和poll相比,epoll具有更高的性能和可扩展性,特别是在大规模并发场景下,好比高并发服务器。
以下是epoll的焦点数据结构和实现原理:
1. epoll的焦点数据结构

在 Linux 内核中,epoll的实现涉及多个焦点数据结构,紧张包罗以下几个:
(1) epoll实例

epoll在创建时,会天生一个与之关联的实例,这个实例在内核中是一个epoll文件对象(struct file),并且与用户态的epoll文件形貌符(FD)对应。该实例负责维护和管理所有到场的事件。
(2) 事件等待队列(epitem)

epoll中的每个事件都被封装成一个epitem结构。该结构体紧张包罗以下几个关键内容:
(3) 红黑树(RB-Tree)

为了快速查找和管理epitem,epoll使用红黑树将所有的epitem组织起来。每个被监听的文件形貌符及其事件类型会存储在红黑树中,通过这种方式,可以在事件添加、删除、修改时实现高效的查找和管理。
(4) 停当队列(Ready List)

当监听的文件形貌符上发生指定的事件时,epoll会将该文件形貌符的事件到场一个停当队列。这个队列是一个双向链表,存储所有准备利益理的epitem。当用户调用epoll_wait时,内核从该队列中取出满意条件的事件并返回。
2. epoll的三种操作

epoll提供三种紧张的操作接口:epoll_create、epoll_ctl 和 epoll_wait。
(1) epoll_create

epoll_create用于创建一个epoll实例,并返回一个文件形貌符。它会在内核中分配epoll数据结构,并初始化停当队列、红黑树等结构。它紧张完成以下使命:
(2) epoll_ctl

epoll_ctl用于将事件添加到epoll实例中,或从epoll实例中移除,或修改现有事件。具体操作包罗:
通过红黑树结构,epoll_ctl操作的添加、删除、修改事件在平均时间复杂度上为 (O(\log N)),相较于poll的线性复杂度更具性能优势。
(3) epoll_wait

epoll_wait用于等待文件形貌符上的事件,直到有事件触发或超时。其紧张过程包罗:
epoll_wait只需遍历停当队列中的事件,而不是遍历所有的监听事件,这使得性能相较于select和poll有明显提拔。特别是在大量文件形貌符中仅有少数活跃时,epoll_wait的优势更为明显。
3. epoll的触发模式

epoll提供两种触发模式来控制事件的触发方式:
(1) 水平触发(LT, Level Triggered)

在默认的水平触发模式下,只要文件形貌符上有指定的事件(如数据可读),每次调用epoll_wait都会返回此事件,除非事件被处置惩罚(如数据被读走)。这是与poll和select一致的举动。
(2) 边缘触发(ET, Edge Triggered)

在边缘触发模式下,epoll_wait只会在事件第一次发生时关照,之后即使该事件条件一直满意(如数据仍可读),也不会再次触发,除非事件条件有新的变化。该模式能够减少不须要的系统调用次数,但要求应用程序在接收到关照后必须一次性处置惩罚所有数据,否则可能会错过事件。
4. epoll的优缺点

优点:

缺点:

5. Java NIO 如何使用多路复用

下面 V 哥用案例来详细说一说Java 中的多路复用。在 Java NIO 中,Selector 类实现了多路复用机制,底层使用 epoll 或 poll 实现。Java NIO 中的多路复用非常适合处置惩罚大量并发连接,好比在高并发的服务器场景中。以下是使用 Java NIO 和 Selector 创建一个简化的谈天服务器示例,通过多路复用处置惩罚多个客户端连接。
示例:NIO 实现的谈天服务器

这个服务器使用 ServerSocketChannel 来监听客户端连接,通过 Selector 监听和管理事件,并使用 SocketChannel 处置惩罚每个连接。客户端连接后可以发送消息,服务器会将消息广播给所有其他连接的客户端。
  1. import java.io.IOException;
  2. import java.net.InetSocketAddress;
  3. import java.nio.ByteBuffer;
  4. import java.nio.channels.*;
  5. import java.util.*;
  6. public class WGNioChatServer {
  7.     private final int port;
  8.     private Selector selector;
  9.     private ServerSocketChannel serverSocketChannel;
  10.     private final Map<SocketChannel, String> clientNames = new HashMap<>(); // 保存客户端名称
  11.     public WGNioChatServer(int port) {
  12.         this.port = port;
  13.     }
  14.     public void start() throws IOException {
  15.         // 初始化服务器通道和选择器
  16.         selector = Selector.open();
  17.         serverSocketChannel = ServerSocketChannel.open();
  18.         serverSocketChannel.configureBlocking(false);
  19.         serverSocketChannel.bind(new InetSocketAddress(port));
  20.         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  21.         System.out.println("Chat server started on port " + port);
  22.         while (true) {
  23.             // 轮询准备就绪的事件
  24.             selector.select();
  25.             Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
  26.             
  27.             while (keyIterator.hasNext()) {
  28.                 SelectionKey key = keyIterator.next();
  29.                 keyIterator.remove();
  30.                 if (key.isAcceptable()) {
  31.                     handleAccept();
  32.                 } else if (key.isReadable()) {
  33.                     handleRead(key);
  34.                 }
  35.             }
  36.         }
  37.     }
  38.     // 处理新客户端连接
  39.     private void handleAccept() throws IOException {
  40.         SocketChannel clientChannel = serverSocketChannel.accept();
  41.         clientChannel.configureBlocking(false);
  42.         clientChannel.register(selector, SelectionKey.OP_READ);
  43.         
  44.         String clientAddress = clientChannel.getRemoteAddress().toString();
  45.         clientNames.put(clientChannel, clientAddress);
  46.         System.out.println("Connected: " + clientAddress);
  47.         
  48.         broadcast("User " + clientAddress + " joined the chat", clientChannel);
  49.     }
  50.     // 读取客户端消息并广播给其他客户端
  51.     private void handleRead(SelectionKey key) throws IOException {
  52.         SocketChannel clientChannel = (SocketChannel) key.channel();
  53.         ByteBuffer buffer = ByteBuffer.allocate(256);
  54.         int bytesRead = clientChannel.read(buffer);
  55.         if (bytesRead == -1) {
  56.             // 客户端断开连接
  57.             String clientName = clientNames.get(clientChannel);
  58.             System.out.println("Disconnected: " + clientName);
  59.             clientNames.remove(clientChannel);
  60.             key.cancel();
  61.             clientChannel.close();
  62.             broadcast("User " + clientName + " left the chat", clientChannel);
  63.             return;
  64.         }
  65.         buffer.flip();
  66.         String message = new String(buffer.array(), 0, bytesRead);
  67.         System.out.println(clientNames.get(clientChannel) + ": " + message.trim());
  68.         broadcast(clientNames.get(clientChannel) + ": " + message, clientChannel);
  69.     }
  70.     // 向所有客户端广播消息
  71.     private void broadcast(String message, SocketChannel sender) throws IOException {
  72.         ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
  73.         for (SelectionKey key : selector.keys()) {
  74.             Channel targetChannel = key.channel();
  75.             if (targetChannel instanceof SocketChannel && targetChannel != sender) {
  76.                 SocketChannel clientChannel = (SocketChannel) targetChannel;
  77.                 clientChannel.write(buffer.duplicate());
  78.             }
  79.         }
  80.     }
  81.     public static void main(String[] args) throws IOException {
  82.         int port = 123456;
  83.         new WGNioChatServer(port).start();
  84.     }
  85. }
复制代码
代码说明

业务场景扩展

在现实业务中,可以进一步优化或扩展这个代码,好比:
6. 优化一下

在现实业务场景中,我们可以基于 Java NIO 对该谈天服务器进行如下优化:
下面是优化后的代码实现:
  1. import java.io.IOException;
  2. import java.net.InetSocketAddress;
  3. import java.nio.ByteBuffer;
  4. import java.nio.channels.*;
  5. import java.util.*;
  6. import java.util.concurrent.*;
  7. import com.fasterxml.jackson.databind.ObjectMapper;
  8. public class EnhancedNioChatServer {
  9.     private final int port;
  10.     private Selector selector;  // 多路复用器,负责管理多个通道
  11.     private ServerSocketChannel serverSocketChannel;  // 服务器通道,用于监听客户端连接
  12.     private final Map<SocketChannel, String> clientNames = new HashMap<>();  // 存储客户端名称
  13.     private final Map<SocketChannel, Long> lastActiveTime = new ConcurrentHashMap<>();  // 存储客户端最后活动时间
  14.     private final ScheduledExecutorService heartbeatScheduler = Executors.newScheduledThreadPool(1);  // 心跳检测定时任务
  15.     private final ExecutorService workerPool = Executors.newFixedThreadPool(10);  // 处理客户端请求的线程池
  16.     private final ObjectMapper objectMapper = new ObjectMapper();  // 用于 JSON 序列化的对象
  17.     public EnhancedNioChatServer(int port) {
  18.         this.port = port;
  19.     }
  20.     public void start() throws IOException {
  21.         // 初始化服务器通道和选择器
  22.         selector = Selector.open();
  23.         serverSocketChannel = ServerSocketChannel.open();
  24.         serverSocketChannel.configureBlocking(false);  // 配置非阻塞模式
  25.         serverSocketChannel.bind(new InetSocketAddress(port));  // 绑定端口
  26.         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);  // 注册连接接收事件
  27.         System.out.println("Chat server started on port " + port);
  28.         // 启动心跳检测任务
  29.         startHeartbeatCheck();
  30.         while (true) {
  31.             selector.select();  // 阻塞直到至少有一个事件发生
  32.             Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
  33.             
  34.             while (keyIterator.hasNext()) {
  35.                 SelectionKey key = keyIterator.next();
  36.                 keyIterator.remove();  // 防止重复处理
  37.                 if (key.isAcceptable()) {
  38.                     handleAccept();  // 处理客户端连接
  39.                 } else if (key.isReadable()) {
  40.                     handleRead(key);  // 处理客户端的消息读取
  41.                 }
  42.             }
  43.         }
  44.     }
  45.     // 处理新的客户端连接
  46.     private void handleAccept() throws IOException {
  47.         SocketChannel clientChannel = serverSocketChannel.accept();  // 接受新的客户端连接
  48.         clientChannel.configureBlocking(false);  // 设置非阻塞模式
  49.         clientChannel.register(selector, SelectionKey.OP_READ);  // 注册读事件
  50.         
  51.         String clientAddress = clientChannel.getRemoteAddress().toString();
  52.         clientNames.put(clientChannel, clientAddress);  // 保存客户端地址
  53.         lastActiveTime.put(clientChannel, System.currentTimeMillis());  // 记录最后活动时间
  54.         
  55.         System.out.println("Connected: " + clientAddress);
  56.         broadcast(new Message("System", "User " + clientAddress + " joined the chat"), clientChannel);
  57.     }
  58.     // 处理读取客户端消息
  59.     private void handleRead(SelectionKey key) {
  60.         SocketChannel clientChannel = (SocketChannel) key.channel();
  61.         ByteBuffer buffer = ByteBuffer.allocate(256);  // 缓冲区用于读取客户端数据
  62.         // 使用线程池处理,以免阻塞主线程
  63.         workerPool.submit(() -> {
  64.             try {
  65.                 int bytesRead = clientChannel.read(buffer);  // 读取客户端数据
  66.                 if (bytesRead == -1) {
  67.                     disconnect(clientChannel);  // 客户端关闭连接
  68.                     return;
  69.                 }
  70.                 lastActiveTime.put(clientChannel, System.currentTimeMillis());  // 更新最后活动时间
  71.                 buffer.flip();  // 准备读取缓冲区内容
  72.                 String messageContent = new String(buffer.array(), 0, bytesRead).trim();
  73.                 Message message = new Message(clientNames.get(clientChannel), messageContent);
  74.                 System.out.println(message.getSender() + ": " + message.getContent());
  75.                 broadcast(message, clientChannel);  // 广播消息给其他客户端
  76.             } catch (IOException e) {
  77.                 disconnect(clientChannel);  // 处理异常情况下的客户端断开
  78.             }
  79.         });
  80.     }
  81.     // 处理客户端断开连接
  82.     private void disconnect(SocketChannel clientChannel) {
  83.         try {
  84.             String clientName = clientNames.get(clientChannel);
  85.             System.out.println("Disconnected: " + clientName);
  86.             clientNames.remove(clientChannel);  // 移除客户端信息
  87.             lastActiveTime.remove(clientChannel);  // 移除最后活动时间
  88.             clientChannel.close();  // 关闭连接
  89.             broadcast(new Message("System", "User " + clientName + " left the chat"), clientChannel);
  90.         } catch (IOException e) {
  91.             e.printStackTrace();
  92.         }
  93.     }
  94.     // 广播消息给所有连接的客户端(除了消息发送者)
  95.     private void broadcast(Message message, SocketChannel sender) {
  96.         ByteBuffer buffer;
  97.         try {
  98.             buffer = ByteBuffer.wrap(objectMapper.writeValueAsBytes(message));  // 将消息序列化为 JSON
  99.         } catch (IOException e) {
  100.             e.printStackTrace();
  101.             return;
  102.         }
  103.         for (SelectionKey key : selector.keys()) {
  104.             Channel targetChannel = key.channel();
  105.             if (targetChannel instanceof SocketChannel && targetChannel != sender) {  // 排除发送者
  106.                 SocketChannel clientChannel = (SocketChannel) targetChannel;
  107.                 try {
  108.                     clientChannel.write(buffer.duplicate());  // 写入消息
  109.                 } catch (IOException e) {
  110.                     disconnect(clientChannel);  // 处理写入失败的情况
  111.                 }
  112.             }
  113.         }
  114.     }
  115.     // 定期检查客户端是否超时未响应,超时则断开连接
  116.     private void startHeartbeatCheck() {
  117.         heartbeatScheduler.scheduleAtFixedRate(() -> {
  118.             long currentTime = System.currentTimeMillis();
  119.             for (SocketChannel clientChannel : lastActiveTime.keySet()) {
  120.                 long lastActive = lastActiveTime.get(clientChannel);
  121.                 if (currentTime - lastActive > 60000) {  // 如果超时 1 分钟
  122.                     System.out.println("Client timeout: " + clientNames.get(clientChannel));
  123.                     disconnect(clientChannel);  // 断开超时客户端
  124.                 }
  125.             }
  126.         }, 10, 30, TimeUnit.SECONDS);  // 每隔 30 秒执行一次
  127.     }
  128.     public static void main(String[] args) throws IOException {
  129.         int port = 123456;  // 定义端口号
  130.         new EnhancedNioChatServer(port).start();  // 启动服务器
  131.     }
  132.     // 用于封装消息的内部类
  133.     private static class Message {
  134.         private String sender;
  135.         private String content;
  136.         public Message(String sender, String content) {
  137.             this.sender = sender;
  138.             this.content = content;
  139.         }
  140.         public String getSender() {
  141.             return sender;
  142.         }
  143.         public String getContent() {
  144.             return content;
  145.         }
  146.     }
  147. }
复制代码
解释一下

优化说明

代码实行流程

这种优化使得服务器在高并发场景下更加结实、灵活,并支持更精确的消息协议。
小结一下

epoll的高效性紧张得益于两点:
好了,关于 epoll 多路复用你学会了吗,原创不易,感谢支持,关注威哥爱编程,编程路上 V 哥与你一起偕行。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4