创建阻塞的服务器
当 ServerSocketChannel 与 SockelChannel 采用默认的阻塞模式时,为了同时处理多个客户的连接,必须使用多线程- public class EchoServer {
-
- private int port = 8000;
- private ServerSocketChannel serverSocketChannel = null;
- private ExecutorService executorService; //线程池
- private static final int POOL_MULTIPLE = 4; //线程池中工作线程的数目
-
- public EchoServer() throws IOException {
- //创建一个线程池
- executorService = Executors.newFixedThreadPool(
- Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE);
- //创建一个ServerSocketChannel对象
- serverSocketChannel = ServerSocketChannel.open();
- //使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时,可以顺利绑定相同的端口
- serverSocketChannel.socket().setReuseAddress(true);
- //把服务器进程与一个本地端口绑定
- serverSocketChannel.socket().bind(new InetSocketAddress(port));
- System.out.println("服务器启动");
- }
-
- public void service() {
- while (true) {
- SocketChannel socketChannel = null;
- try {
- socketChannel = serverSocketChannel.accept();
- //处理客户连接
- executorService.execute(new Handler(socketChannel));
- } catch(IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- public static void main(String args[])throws IOException {
- new EchoServer().service();
- }
-
- //处理客户连按
- class Handler implements Runnable {
- private SocketChannel socketChannel;
-
- public Handler(SocketChannel socketChannel) {
- this.socketChannel = socketChannel;
- }
-
- public void run() {
- handle(socketChannel);
- }
-
- public void handle(SocketChannel socketChannel) {
- try {
- //获得与socketChannel关联的Socket对象
- Socket socket = socketChannel.socket();
- System.out.println("接收到客户连接,来自:" + socket.getInetAddress() + ":" + socket.getPort());
-
- BufferedReader br = getReader(socket);
- PrintWriter pw = getWriter(socket);
-
- String msg = null;
- while ((msg = br.readLine()) != null) {
- System.out.println(msg);
- pw.println(echo(msg));
- if (msg.equals("bye")) {
- break;
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- try {
- if(socketChannel != null) {
- socketChannel.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
-
- private PrintWriter getWriter(Socket socket) throws IOException {
- OutputStream socketOut = socket.getOutputStream();
- return new PrintWriter(socketOut,true);
- }
-
- private BufferedReader getReader(Socket socket) throws IOException {
- InputStream socketIn = socket.getInputStream();
- return new BufferedReader(new InputStreamReader(socketIn));
- }
-
- public String echo(String msg) {
- return "echo:" + msg;
- }
- }
复制代码 创建非阻塞的服务器
在非阻塞模式下,EchoServer 只需要启动一个主线程,就能同时处理三件事:
- 接收客户的连接
- 接收客户发送的数据
- 向客户发回响应数据
EchoServer 委托 Selector 来负责监控接收连接就绪事件、读就绪事件和写就绪事件如果有特定事件发生,就处理该事件- // 创建一个Selector对象
- selector = Selector.open();
- //创建一个ServerSocketChannel对象
- serverSocketChannel = ServerSocketChannel.open();
- //使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时
- //可以顺利绑定到相同的端口
- serverSocketChannel.socket().setReuseAddress(true);
- //使ServerSocketChannel工作于非阻塞模式
- serverSocketChannel.configureBlocking(false):
- //把服务器进程与一个本地端口绑定
- serverSocketChannelsocket().bind(new InetSocketAddress(port));
复制代码 EchoServer 类的 service() 方法负责处理本节开头所说的三件事,体现其主要流程的代码如下:- public void service() throws IOException {
- serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);
- //第1层while循环
- while(selector.select() > 0) {
- //获得Selector的selected-keys集合
- Set readyKeys = selector.selectedKeys();
- Iterator it = readyKeys.iterator();
- //第2层while循环
- while (it.hasNext()) {
- SelectionKey key = null;
- //处理SelectionKey
- try {
- //取出一个SelectionKey
- key = (SelectionKey) it.next();
- //把 SelectionKey从Selector 的selected-key 集合中删除
- it.remove();
- 1f (key.isAcceptable()) { 处理接收连接就绪事件; }
- if (key.isReadable()) { 处理读就绪水件; }
- if (key.isWritable()) { 处理写就绪事件; }
- } catch(IOException e) {
- e.printStackTrace();
- try {
- if(key != null) {
- //使这个SelectionKey失效
- key.cancel();
- //关闭与这个SelectionKey关联的SocketChannel
- key.channel().close();
- }
- } catch(Exception ex) {
- e.printStackTrace();
- }
- }
- }
- }
- }
复制代码
- 首先由 ServerSocketChannel 向 Selector 注册接收连接就绪事件,如果 Selector 监控到该事件发生,就会把相应的 SelectionKey 对象加入 selected-keys 集合
- 第一层 while 循环,不断询问 Selector 已经发生的事件,select() 方法返回当前相关事件已经发生的 SelectionKey 的个数,如果当前没有任何事件发生,该方法会阻塞下去,直到至少有一个事件发生。Selector 的 selectedKeys() 方法返回 selected-keys 集合,它存放了相关事件已经发生的 SelectionKey 对象
- 第二层 while 循环,从 selected-keys 集合中依次取出每个 SelectionKey 对象并从集合中删除,,然后调用 isAcceptable()、isReadable() 和 isWritable() 方法判断到底是哪种事件发生了,从而做出相应的处理
1. 处理接收连接就绪事件
- if (key.isAcceptable()) {
- //获得与SelectionKey关联的ServerSocketChannel
- ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
- //获得与客户连接的SocketChannel
- SocketChannel socketChannel = (SocketChannel) ssc.accept();
- //把Socketchannel设置为非阻塞模式
- socketChannel.configureBlocking(false);
- //创建一个用于存放用户发送来的数据的级冲区
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- //Socketchannel向Selector注册读就绪事件和写就绪事件
- socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
- }
复制代码 2. 处理读就绪事件
- public void receive(SelectionKey key) throws IOException {
- //获得与SelectionKey关联的附件
- ByteBuffer buffer = (ByteBuffer) key.attachment();
- //获得与SelectionKey关联的Socketchannel
- SocketChannel socketChannel = (SocketChannel)key.channel();
- //创建一个ByteBuffer用于存放读到的数据
- ByteBuffer readBuff = ByteBuffer.allocate(32);
- socketChannel.read(readBuff);
- readBuff.flip();
- //把buffer的极限设为容量
- buffer.limit(buffer.capacity());
- //把readBuff中的内容拷贝到buffer
- buffer.put(readBuff);
- }
复制代码 3. 处理写就绪事件
- public void send(SelectionKey key) throws IOException {
- //获得与SelectionKey关联的ByteBuffer
- ByteBuffer buffer = (ByteBuffer) key.attachment();
- //获得与SelectionKey关联的SocketChannel
- SocketChannel socketChannel = (SocketChannel) key.channel();
- buffer.flip();
- //按照GBK编码把buffer中的字节转换为字符串
- String data = decode(buffer);
- //如果还没有读到一行数据就返回
- if(data.indexOf("\r\n") == -1)
- return;
- //截取一行数据
- String outputData = data.substring(0, data.indexOf("\n") + 1);
- //把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中
- ByteBuffer outputBuffer = encode("echo:" + outputData);
- //输出outputBuffer的所有字节
- while(outputBuffer,hasRemaining())
- socketChannel.write(outputBuffer);
- //把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer
- ByteBuffer temp = encode(outputData);
- //把buffer的位置设为temp的极限
- buffer.position(temp.limit()):
- //删除buffer已经处理的数据
- buffer.compact();
- //如果已经输出了字符串“bye\r\n”,就使SelectionKey失效,并关闭SocketChannel
- if(outputData.equals("bye\r\n")) {
- key.cancel();
- socketChannel.close();
- }
- }
复制代码 完整代码如下:阻塞模式与非阻塞模式混合使用
使用非阻塞模式时,ServerSocketChannel 以及 SocketChannel 都被设置为非阻塞模式,这使得接收连接、接收数据和发送数据的操作都采用非阻塞模式,EchoServer 采用一个线程同时完成这些操作
假如有许多客户请求连接,可以把接收客户连接的操作单独由一个线程完成,把接收数据和发送数据的操作由另一个线程完成,这可以提高服务器的并发性能
负责接收客户连接的线程按照阻塞模式工作,如果收到客户连接,就向 Selector 注册读就绪和写就绪事件,否则进入阻塞状态,直到接收到了客户的连接。负责接收数据和发送数据的线程按照非阻塞模式工作,只有在读就绪或写就绪事件发生时,才执行相应的接收数据和发送数据操作- public class EchoServer {
-
- private int port = 8000;
- private ServerSocketChannel serverSocketChannel = null;
- private Selector selector = null;
- private Charset charset = Charset.forName("GBK");
- public EchoServer() throws IOException {
- selector = Selector.open();
- serverSocketChannel = ServerSocketChannel.open();
- serverSocketChannel.socket().setReuseAddress(true);
- serverSocketChannelsocket().bind(new InetSocketAddress(port));
- }
-
- public void accept() {
- while(true) {
- try {
- SocketChannel socketChannel = serverSocketChannel.accept();
- socketChannel.configureBlocking(false);
-
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- synchronized(gate) {
- selector.wakeup();
- socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
- }
- } catch(IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- private Object gate=new Object();
-
- public void service() throws IOException {
- while(true) {
- synchronized(gate){}
- int n = selector.select();
- if(n == 0) continue;
- Set readyKeys = selector.selectedKeys();
- Iterator it = readyKeys.iterator();
- while (it.hasNext()) {
- SelectionKey key = null;
- try {
- it.remove();
- if (key.isReadable()) {
- receive(key);
- }
- if (key.isWritable()) {
- send(key);
- }
- } catch(IOException e) {
- e.printStackTrace();
- try {
- if(key != null) {
- key.cancel();
- key.channel().close();
- }
- } catch(Exception ex) { e.printStackTrace(); }
- }
- }
- }
- }
-
- public void receive(SelectionKey key) throws IOException {
- ...
- }
-
- public void send(SelectionKey key) throws IOException {
- ...
- }
-
- public String decode(ByteBuffer buffer) {
- ...
- }
-
- public ByteBuffer encode(String str) {
- ...
- }
-
- public static void main(String args[])throws Exception {
- final EchoServer server = new EchoServer();
- Thread accept = new Thread() {
- public void run() {
- server.accept();
- }
- };
- accept.start();
- server.service();
- }
- }
复制代码 注意一点:主线程的 selector select() 方法和 Accept 线程的 register(...) 方法都会造成阻塞,因为他们都会操作 Selector 对象的共享资源 all-keys 集合,这有可能会导致死锁
导致死锁的具体情形是:Selector 中尚没有任何注册的事件,即 all-keys 集合为空,主线程执行 selector.select() 方法时将进入阻塞状态,只有当 Accept 线程向 Selector 注册了事件,并且该事件发生后,主线程才会从 selector.select() 方法返回。然而,由于主线程正在 selector.select() 方法中阻塞,这使得 Acccept 线程也在 register() 方法中阻塞。Accept 线程无法向 Selector 注册事件,而主线程没有任何事件可以监控,所以这两个线程将永远阻塞下去
为了避免对共享资源的竞争,同步机制使得一个线程执行 register() 时,不允许另一个线程同时执行 select() 方法,反之亦然
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |