十年大数据架构师,带各人深入Apache Mina通信框架架构与应用 ...

海哥  金牌会员 | 2024-6-14 21:19:05 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 535|帖子 535|积分 1605

Apache Mina Server 是一个网络通信应用框架,也就是说,它主要是对基于 TCP/IP、UDP/IP协议栈的通信框架(当然,也可以提供 JAVA 对象的序列化服务、虚拟机管道通信服务等),Mina 可以资助我们快速开发高性能、高扩展性的网络通信应用,Mina 提供了事件驱动、异步(Mina 的异步 IO 默认利用的是 JAVA NIO 作为底层支持)操作的编程模型。从官网文档“MINA based Application Architecture”中可以看到Mina作为一个通信层框架,在实际应用所处的位置,如图所示:


Mina位于用户应用步伐和底层Java网络API(和in-VM通信)之间,我们开发基于Mina的网络应用步伐,就无需关心复杂的通信细节。
应用团体架构
再看一下,Mina提供的基本组件,如图所示:


也就是说,无论是客户端照旧服务端,利用Mina框架实现通信的逻辑分层在概念上统一的,即包罗如下三层:


  • I/O Service – Performs actual I/O
  • I/O Filter Chain – Filters/Transforms bytes into desired Data Structures and vice-versa
  • I/O Handler – Here resides the actual business logic
想要开发基于MIna的应用步伐,你只必要做如下事情:


  • Create an I/O service – Choose from already available Services (*Acceptor) or create your own
  • Create a Filter Chain – Choose from already existing Filters or create a custom Filter for transforming request/response
  • Create an I/O Handler – Write business logic, on handling different messages
下面看一下利用Mina的应用步伐,在服务器端和客户端的架构细节:
服务器端架构服务器端监听指定端口上到来的哀求,对这些哀求经过处理后,回复相应。它也会创建并处理一个链接过来的客户会话对象(Session)。服务器端架构如图所示:


对服务器端的阐明,引用官网文档,如下所示:


  • IOAcceptor listens on the network for incoming connections/packets
  • For a new connection, a new session is created and all subsequent request from IP Address/Port combination are handled in that Session
  • All packets received for a Session, traverses the Filter Chain as specified in the diagram. Filters can be used to modify the content of packets (like converting to Objects, adding/removing information etc). For converting to/from raw bytes to High Level Objects, PacketEncoder/Decoder are particularly useful
  • Finally the packet or converted object lands in IOHandler. IOHandlers can be used to fulfill business needs.
客户端架构客户端主要做了如下工作:


  • 毗连到服务器端
  • 向服务器发送消息
  • 等候服务器端相应,并处理相应
客户端架构,如图所示:


对客户端架构的阐明,引用官网文档内容,如下所示:


  • Client first creates an IOConnector (MINA Construct for connecting to Socket), initiates a bind with Server
  • Upon Connection creation, a Session is created and is associated with Connection
  • Application/Client writes to the Session, resulting in data being sent to Server, after traversing the Filter Chain
  • All the responses/messages received from Server are traverses the Filter Chain and lands at IOHandler, for processing
应用实例开发
下面根据上面给出的架构设计描述,看一下Mina(版本2.0.7)自带的例子,如何实现一个简朴的C/S通信的步伐,非常轻易。服务端首先,服务器端必要利用的组件有IoAdaptor、IoHandler、IoFilter,其中IoFilter可选.我们基于Mina自带的例子举行了简朴地修改,实现服务端IoHandler的代码如下所示:
  1. package org.shirdrn.mina.server;
  2.  
  3. import org.apache.mina.core.service.IoHandlerAdapter;
  4. import org.apache.mina.core.session.IdleStatus;
  5. import org.apache.mina.core.session.IoSession;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8.  
  9. public class TinyServerProtocolHandler extends IoHandlerAdapter {
  10.     private final static Logger LOGGER = LoggerFactory.getLogger(TinyServerProtocolHandler.class);
  11.     
  12.     @Override
  13.     public void sessionCreated(IoSession session) {
  14.         session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
  15.     }
  16.  
  17.     @Override
  18.     public void sessionClosed(IoSession session) throws Exception {
  19.         LOGGER.info("CLOSED");
  20.     }
  21.  
  22.     @Override
  23.     public void sessionOpened(IoSession session) throws Exception {
  24.         LOGGER.info("OPENED");
  25.     }
  26.  
  27.     @Override
  28.     public void sessionIdle(IoSession session, IdleStatus status) {
  29.         LOGGER.info("*** IDLE #" + session.getIdleCount(IdleStatus.BOTH_IDLE) + " ***");
  30.     }
  31.  
  32.     @Override
  33.     public void exceptionCaught(IoSession session, Throwable cause) {
  34.         session.close(true);
  35.     }
  36.  
  37.     @Override
  38.     public void messageReceived(IoSession session, Object message)
  39.             throws Exception {
  40.         LOGGER.info( "Received : " + message );
  41.        if(!session.isConnected()) {
  42.             session.close(true);
  43.        }
  44.     }
  45. }
复制代码
这个版本中,IoHandlerAdapter实现了IoHandler接口,里面封装了一组用于事件处理的空方法,其中包罗服务端和客户端的事件。在实际应用中,客户端可以选择客户端具有的事件,服务器端选择服务器端具有的事件,然后分别对这两类事件举行处理(有重叠的事件,如毗连事件、关闭事件、异常事件等)。客户端的IoHandler的具体实现也是类似的,不过多累述。下面看启动服务器的主方法类,代码如下所示:
  1. package org.shirdrn.mina.server;
  2.  
  3. import java.net.InetSocketAddress;
  4.  
  5. import org.apache.mina.filter.codec.ProtocolCodecFilter;
  6. import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
  7. import org.apache.mina.transport.socket.SocketAcceptor;
  8. import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11.  
  12. public class TinyMinaServer {
  13.  
  14.     private final static Logger LOG = LoggerFactory.getLogger(TinyMinaServer.class);
  15.     /** Choose your favorite port number. */
  16.     private static final int PORT = 8080;
  17.  
  18.     public static void main(String[] args) throws Exception {
  19.         SocketAcceptor acceptor = new NioSocketAcceptor();
  20.         acceptor.setReuseAddress(true);
  21.         acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));
  22.  
  23.         // Bind
  24.         acceptor.setHandler(new TinyServerProtocolHandler());
  25.         acceptor.bind(new InetSocketAddress(PORT));
  26.         LOG.info("Listening on port " + PORT);
  27.  
  28.         LOG.info("Server started!");
  29.  
  30.         for (;;) {
  31.             LOG.info("R: " + acceptor.getStatistics().getReadBytesThroughput() + ", W: " + acceptor.getStatistics().getWrittenBytesThroughput());
  32.             Thread.sleep(3000);
  33.         }
  34.     }
  35.  
  36. }
复制代码
客户端实现客户端IoHandler的代码如下所示:
  1. package org.shirdrn.mina.client;
  2.  
  3. import org.apache.mina.core.service.IoHandlerAdapter;
  4. import org.apache.mina.core.session.IdleStatus;
  5. import org.apache.mina.core.session.IoSession;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8.  
  9. public class TinyClientProtocolHandler extends IoHandlerAdapter {
  10.  
  11.     private final static Logger LOGGER = LoggerFactory
  12.             .getLogger(TinyClientProtocolHandler.class);
  13.  
  14.     @Override
  15.     public void sessionCreated(IoSession session) {
  16.         LOGGER.info("CLIENT::CREATED");
  17.     }
  18.  
  19.     @Override
  20.     public void sessionClosed(IoSession session) throws Exception {
  21.         LOGGER.info("CLIENT::CLOSED");
  22.     }
  23.  
  24.     @Override
  25.     public void sessionOpened(IoSession session) throws Exception {
  26.         LOGGER.info("CLIENT::OPENED");
  27.     }
  28.  
  29.     @Override
  30.     public void sessionIdle(IoSession session, IdleStatus status) {
  31.         LOGGER.info("CLIENT::*** IDLE #"
  32.                 + session.getIdleCount(IdleStatus.BOTH_IDLE) + " ***");
  33.     }
  34.  
  35.     @Override
  36.     public void exceptionCaught(IoSession session, Throwable cause) {
  37.         LOGGER.info("CLIENT::EXCEPTIONCAUGHT");
  38.         cause.printStackTrace();
  39.     }
  40.  
  41.     public void messageSent(IoSession session, Object message) throws Exception {
  42.         LOGGER.info("CLIENT::MESSAGESENT: " + message);
  43.     }
  44. }
复制代码
下面看启动客户端的主方法类,代码如下所示:
  1. package org.shirdrn.mina.client;
  2.  
  3. import java.net.InetSocketAddress;
  4.  
  5. import org.apache.mina.core.future.ConnectFuture;
  6. import org.apache.mina.filter.codec.ProtocolCodecFilter;
  7. import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
  8. import org.apache.mina.transport.socket.SocketConnector;
  9. import org.apache.mina.transport.socket.nio.NioSocketConnector;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12.  
  13. public class TinyMinaClient {
  14.  
  15.     private final static Logger LOG = LoggerFactory.getLogger(TinyMinaClient.class);
  16.     /** Choose your favorite port number. */
  17.     private static final int PORT = 8080;
  18.  
  19.     public static void main(String[] args) throws Exception {
  20.         SocketConnector connector = new NioSocketConnector();
  21.  
  22.         // Connect
  23.         connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));
  24.         connector.setHandler(new TinyClientProtocolHandler());
  25.  
  26.         for (int i = 0; i < 10; i++) {
  27.             ConnectFuture future = connector.connect(new InetSocketAddress(PORT));
  28.             LOG.info("Connect to port " + PORT);
  29.             future.awaitUninterruptibly();
  30.             future.getSession().write(String.valueOf(i));
  31.             Thread.sleep(1500);
  32.         }
  33.  
  34.     }
  35. }
复制代码
我们只是发送了十个数字,每发一次间隔1500ms。测试上述服务器端与客户端交互,首先启动服务器端,监听8080端口。接着启动客户端,毗连到服务器端8080端口,然后发送消息,服务器端吸收到消息后,直接将到客户端的毗连关闭掉。(时延军(包罗链接:http://shiyanjun.cn))

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

海哥

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表