IT评测·应用市场-qidao123.com技术社区
标题:
Netty条记3:NIO编程
[打印本页]
作者:
笑看天下无敌手
时间:
2025-3-5 16:13
标题:
Netty条记3:NIO编程
Netty条记1:线程模型
Netty条记2:零拷贝
Netty条记3:NIO编程
Netty条记4:Epoll
Netty条记5:Netty开发实例
Netty条记6:Netty组件
Netty条记7:ChannelPromise关照处理
Netty条记8:ByteBuf利用介绍
Netty条记9:粘包半包
Netty条记10:LengthFieldBasedFrameDecoder很简朴
Netty条记11:编解码器
Netty条记12:模仿Web服务器
Netty条记13:序列化
前言
想要快速明白NIO编程,必要先明白上篇的零拷贝技术和线程模型,本篇是对这两个知识的实践,也是netty的过度。
编程示例
我们尝试写一个NIO程序:
必要注意的是:
举行网络传输时,涉及到的数据,必须要颠末缓冲区,不管是发送还是接收,团结用户态和内核态的切换过程就可以明白;
NIO中的缓冲可以利用堆内存缓存和直接内存缓冲,这个必要团结零拷贝技术可以明白;
多路复用利用selector模式,必要循环遍历socket;
注:buf在堆上。在举行数据发送时,如果利用堆内存,在JVM之外创建一个DirectBuf,然后把堆上的数据拷贝的这个DirectBuf,再写到SendBuf中,因为JVM中存在GC机制,如果利用引用方式,在拷贝过程中出现GC,会重新分配地点,导致数据出现题目。
服务端:
public class ServerHandle implements Runnable{
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public ServerHandle(int port) {
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
// channel必须处于非阻塞模式下,不然会报错,所以不能同FileChannel一起使用
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(port));
// 注册对应的事件,这里注册的是accept事件,只要监听到就会调用对应的处理器
// 注册还可以添加附加对象,也就是第三个参数,获取方式:key.attachment();
// 如果注册了事件,需要取消,需要调用channel.cancel()
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务端已准备好:" + port);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void run() {
while (true) {
try {
// 阻塞直到通道就绪,这边设置了超时时间
// 返回值:有多少通道就绪
selector.select(1000);
// 在通道就绪时,获取对应的键,也就是事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
// 根据事件的类型进行对应的处理
handlerInput(key);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private void handlerInput(SelectionKey key) throws IOException {
// 在这个循环中,可能存在key.cancel() 或者移除,所以这里需要判断是否有效
if (!key.isValid()) {
return;
}
try {
if (key.isAcceptable()) {
// 这里处理客户端连接服务端的事件
ServerSocketChannel channel = (ServerSocketChannel)key.channel();
try {
// 接收请求
SocketChannel sc = channel.accept();
System.out.println("-----建立连接------");
// 设置该通道非阻塞
sc.configureBlocking(false);
// 并注册read事件,监听着
sc.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
System.out.println("连接客户端失败!");
key.cancel();
channel.close();
}
}
if (key.isReadable()) {
SocketChannel sc = (SocketChannel)key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = sc.read(buffer);
if (read > 0) {
// 反转:将这个缓冲中的数据从现在的位置变成从0开始
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String msg = new String(bytes, StandardCharsets.UTF_8);
System.out.println("服务器收到消息:" + msg);
doWrite(sc, "hello,收到消息:" + msg);
}
}
} catch (IOException e) {
System.out.println("数据处理失败!");
// 再处理失败,或异常时要退出通道,不然每次循环都会检查通道导致一致报错
key.channel().close();
key.cancel();
}
}
private void doWrite(SocketChannel sc, String msg) throws IOException {
byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
sc.write(buffer);
}
}
复制代码
服务端启动:
public static void main(String[] args) {
ServerHandle serverHandle = new ServerHandle(8080);
new Thread(serverHandle).start();
}
复制代码
客户端:
public class ClientHandle implements Runnable{
private String ip;
private int port;
private SocketChannel socketChannel;
private Selector selector;
public ClientHandle(String ip, int port) {
try {
this.ip = ip;
this.port = port;
selector = Selector.open();
socketChannel = SocketChannel.open();
// 非阻塞状态
socketChannel.configureBlocking(false);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void run() {
try {
// 启动后,执行连接操作
if (socketChannel.connect(new InetSocketAddress(ip, port))) {
// 连接服务端成功后,注册读取事件
socketChannel.register(selector, SelectionKey.OP_READ);
} else {
// 如果连接失败,则再注册连接事件,之后再进行处理
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
while (true) {
try {
// 阻塞1000秒直到有通道就绪
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
handleInput(key);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public void sendMsg(String msg) throws IOException {
byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
socketChannel.write(buffer);
}
private void handleInput(SelectionKey key) throws IOException {
if (!key.isValid()) {
return;
}
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (sc.finishConnect()) {
socketChannel.register(selector, SelectionKey.OP_READ);
} else {
System.exit(1);
}
}
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = sc.read(buffer);
if (read > 0) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String msg = new String(bytes, StandardCharsets.UTF_8);
System.out.println("客户端收到消息:" + msg);
} else if (read < 0) {
key.cancel();;
sc.close();
}
}
}
}
复制代码
客户端启动:
public static void main(String[] args) throws IOException {
ClientHandle handle = new ClientHandle("localhost", 8080);
new Thread(handle).start();
Scanner scanner = new Scanner(System.in);
// 死循环保持监听
while (true) {
// 每次控制台输入,就发送给服务端
handle.sendMsg(scanner.nextLine());
}
}
复制代码
总结
该示例对应于reactor单线程模型,服务端是一个单线程,通过selector单线程循环接收客户端的哀求,并辨认客户端哀求事件范例,举行分发处理,相对于BIO很显着的区别,就是它不会比及上一个哀求处理完成。在消息接收与发送的过程中,我们必要对缓冲数据举行处理,也就是对应于零拷贝知识点中提到的缓冲区概念,实例中用到了ByteBuffer对象,它是NIO中一个比较重要的对象,下一篇会举行说明。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/)
Powered by Discuz! X3.4