使用LVS+NGinx+Netty实现数据接入

打印 上一主题 下一主题

主题 1973|帖子 1973|积分 5919

数据接入

链接参考文档 LVS+Keepalived项目
车辆数据上收,TBox通过TCP协议连接到TSP平台 创建连接后举行数据上传。也可借由该连接实现远程控制等操纵。
通过搭建 LV—NGinx—Netty实现高并发数据接入


  • LVS:四层负载平衡(位于内核层):根据请求报文中的目标地址和端口举行调理
  • NGinx:七层负载平衡(位于应用层):根据请求报文的内容举行调理,这种调理属于「署理」的方式
组件角色主机名称假造ip/端口LVS+keepalivedactive-0007LVS+keepalivedbackup-0006Nginx负载-00058050Nginx负载-00048050Netty真实服务器-00038050Netty真实服务器-00028050Netty真实服务器-00038092Netty真实服务器-00028092Netty真实服务器-00018092Netty真实服务器-00018092 使用华为云服务器 安装LVS 必要有VPC服务(免费),在控制台页面做假造ip绑定

一、安装LVS 服务

使用的是 DR 模式


  • NET模式:LVS将数据请求包转发给真实服务器的时间,会修改成真实服务器的IP地址;在复兴时真实服务器会把复兴包发往LVS调理服务器 再发往客户端。
  • TUN隧道模式:将原始数据包封装并添加新的包头(内容包括新的源地址及端口、目标地址及端口),从而实现将一个目标为调理器的VIP地址的数据包封装,通过隧道转发给后端的真实服务器(RealServer)感觉很复杂。
  • DR模式:要求LVS调理服务器要和后端服务器在同一局域网下,为后端服务器添加lo回环地址为VIP(假造IP地址)这样复兴给客户端 客户端会以为是连接的VIP举行复兴的
DR模式不支持端口映射
  1. #查看网卡 eth0
  2. ifconfig
  3. #执行 虚拟ip:172.25.94.187    广播地址(不变):172.25.94.191 子网掩码(不变):255.255.255.192 up:立即启用vip(虚拟ip)
  4. ifconfig eth0:1 172.25.94.187 broadcast 172.25.94.191 netmask 255.255.255.192 up
  5. #查看当前网卡信息
  6. ip a
  7. #安装keepalived
  8. sudo yum install keepalived
  9. #启动keepalived
  10. systemctl start keepalived
  11. #加入开机启动keepalived
  12. systemctl enable keepalived
  13. #重新启动keepalived
  14. systemctl restart keepalived  
  15. #查看keepalived状态
  16. systemctl status keepalived
复制代码

LVS 模块内嵌lvs模块,只必要ipvsadm和keepalived安装
  1. #查看Linux 内核版本
  2. uname -r
  3. #查看内核是否集成lvs模块
  4. find /lib/modules/$(uname -r)/ -iname "**.ko*" | cut -d/ -f5-
  5. #安装LVS管理工具:ipvsadm
  6. yum install -y gcc gcc-c++ makepcre pcre-devel kernel-devel openssl-devel libnl-devel popt*  
  7. yum -y install ipvsadm  
  8. #启动ipvs
  9. sudo ipvsadm
  10. #查看是否支持lvs
  11. sudo lsmod |grep ip_vs
  12. #查看ipvsadm 版本
  13. ipvsadm -v
  14. #服务器添加路由规则
  15. route add -host 172.25.94.187 dev ens33:0
  16. route add -host 172.25.110.124 dev eth:0
  17. #启用系统的包转发功能 #1:启用ip转发,0:禁止ip转发
  18. echo "1" >/proc/sys/net/ipv4/ip_forward
  19. #清除原有转发规则
  20. systemctl restart keepalived
  21. systemctl status ipvsadm
  22. ipvsadm --clear
  23. #添加虚拟ip规则 rr:负载均衡算法 轮询
  24. ipvsadm -A -t 172.25.94.187:8043 -s rr
  25. ipvsadm -a -t 172.25.94.187:8043 -r 172.25.94.151:8043 -g
  26. ipvsadm -a -t 172.25.94.187:8043 -r 172.25.94.152:8043 -g
  27. ipvsadm -l
  28. #配置tcp/tcpfin/udp超时时间
  29.   ipvsadm --set 900 120 300
  30.   #添加虚机IP规则也可以通过修改文件实现
  31.   vim /etc/keepalived/keepalived.conf
复制代码
  1. global_defs {
  2.    router_id chery_21
  3. }
  4. vrrp_instance VI_1 {
  5.     state MASTER
  6.     interface eth0
  7.     virtual_router_id 51
  8.     priority 100
  9.     advert_int 1
  10.     authentication
  11. {
  12.         auth_type PASS
  13.         auth_pass 1111
  14.     }
  15.     virtual_ipaddress {
  16.         172.25.110.124
  17.     }
  18. }
  19. virtual_server 172.25.110.124 8050 {
  20.     delay_loop 6
  21.     lb_algo rr
  22.     lb_kind DR
  23.     persistence_timeout 50
  24.     protocol TCP
  25.     real_server 172.25.110.19 8050 {
  26.         weight 1
  27.         TCP_CHECK
  28. {
  29.         connect_timeout 30
  30.         delay_before_retry 3
  31.     }
  32. }
  33.     real_server 172.25.110.18 8050 {
  34.         weight 2
  35.         TCP_CHECK
  36. {
  37.         connect_timeout 30
  38.         }
  39. }
  40. }
复制代码
全局定义(global_defs)


  • router_id chery_21:定义了当前Keepalived实例的路由ID,这是唯一的标识符,用于在VRRP组中区分不同的Keepalived实例。
VRRP实例(vrrp_instance VI_1)


  • state MASTER:设置当前实例的初始状态为MASTER。在VRRP组中,MASTER负责处理对假造IP的流量。
  • interface eth0:指定VRRP通信使用的网络接口。
  • virtual_router_id 51:假造路由的ID,用于在VRRP组中标识不同的假造路由器。
  • priority 100:设置当前实例的优先级,优先级高的实例将成为MASTER。
  • advert_int 1:VRRP告示的间隔时间,单位为秒。MASTER每隔这个时间会向其他节点发送VRRP告示。
    1. authentication
    复制代码
    :VRRP认证设置,确保只有授权的设备可以到场VRRP组。

    • auth_type PASS:使用密码认证。
    • auth_pass 1111:认证密码。

  • virtual_ipaddress:定义了假造IP地址,即VIP,客户端将访问此IP地址来访问服务。
假造服务器(virtual_server)


  • 172.25.110.124 8050:定义了假造服务器的IP地址和端口号,这里与VRRP的VIP相同,表明这个假造服务器是通过VIP来访问的。
  • delay_loop 6:健康检查的时间间隔,单位为秒。
  • lb_algo rr:负载平衡算法,这里使用的是轮询(rr)。
  • lb_kind DR:负载平衡类型,这里使用的是直接路由(DR),必要确保后端服务器(real_server)设置正确以支持DR模式。
  • persistence_timeout 50:会话保持时间,单位为秒。在指定时间内,来自同一客户端的请求将被转发到同一台后端服务器。
  • protocol TCP:使用TCP协议举行健康检查和负载平衡。
后端服务器(real_server)


  • 定义了多个后端服务器,每个服务器都设置了IP地址、端口号、权重和健康检查设置。
  • weight:权重,用于负载平衡时决定服务器的优先级。
    1. TCP_CHECK
    复制代码
    :TCP健康检查设置。

    • connect_timeout 30:连接超时时间,单位为秒。
    • delay_before_retry 3:在重试之前等待的时间,单位为秒。

LVS负载平衡(LVS简介、三种工作模式、十种调理算法)
  1. #列出当前LVS表中的所有配置,包括虚拟服务器和真实服务器的信息。
  2. ipvsadm -Ln
  3. #显示统计信息,包括已转发的连接数、入包个数、出包个数等。
  4. ipvsadm -L --stats
  5. #显示转发速率信息,包括每秒连接数、每秒入包个数、每秒出包个数等
  6. ipvsadm -L --rate
  7. #keepalived 日志
  8. vim /var/log/message
复制代码




二、安装nginx 服务

在nginx服务器和后端服务器 设置lo回环地址 否则复兴将不成功
服务器上一般还必要修改lo网卡 设置成假造IP。华为云服务器使用的是Centos 8版本 没有 lo设置文件,通过 ifconfig lo:0 172.25.94.187 netmask 255.255.255.255 broadcast 172.25.94.187 up 华为云服务器不支持修改网卡,所以修改了 eth0网卡设置 ip addr add 172.25.94.187/24 dev eth0

  1. yum -y nginx
  2. #检查是否有 stream
  3. nginx -V 2>&1 | grep --color -o with-stream
  4. #如果没有stream需要对nginx源码安装进行二次编译
  5. tar -zxvf nginx-*.tar.gz  
  6. cd nginx-*  
  7. ./configure --prefix=/usr/local/nginx --with-http_ssl_module --with-stream  
  8. make  
  9. sudo make install
  10. #重新加载Nginx配置文件
  11. nginx -s reload
  12. #强制停止Nginx服务
  13. nginx -s stop
  14. #重启nginx
  15. nginx -s reopen
  16. #修改配置文件
  17. vim /etc/nginx/nginx.conf
  18. netstat -anput | grep nginx
  19. nginx -c nginx.conf
复制代码

三、Netty服务

**1、在Linux上摆设启用了 epoll **
epoll:是Linux内核为处理大批量文件形貌符而作的改进的poll,是Linux下多路复用IO接口select/poll的加强版本
应用于Linux体系下的应用步调,特别是必要处理大量并发连接的高性能网络服务器。
BIO:同步阻塞IO,也就是传统阻塞型的IO,服务器实现模式是一个连接对应一个线程。客户端有连接请求时服务器端就必要启动一个线程举行处理,如果这个链接不做任何事情会造成不必要的线程开销。
NIO:同步非阻塞IO,服务器实现模式是一个线程处理多个请求,客户端发送的链接请求都会注册到多路复用器上,多路复用器轮询到链接有IO请求就举行处理。
AIO:异步非阻塞,AIO引入了异步通道的概念,采用了Proactor模式,简化了步调编写,有效的请求才启动线程,他的特点是先由操纵体系完成后才关照服务端步调启动线程行止理,一般适用于连接数较多且链接时间较长的应用。
  1. public class NettyServer {
  2.     private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
  3.     @Resource
  4.     private NettyServerInitializer nettyServerInitializer;
  5.     ServerBootstrap serverBootstrap = new ServerBootstrap();
  6.     EventLoopGroup boss = null;
  7.     EventLoopGroup worker = null;
  8.     ChannelFuture future = null;
  9.     ChannelFuture future2 = null;
  10.     //厂商编码
  11.     Integer factoryCode = null;
  12.     @Value("${netty.server.use-epoll}")
  13.     boolean epoll = false;
  14.     @Value("${netty.server.port1}")
  15.     private int port = 8030;
  16.     @Value("${netty.server.port2}")
  17.     private int port2 = 8050;
  18.     @PreDestroy
  19.     public void stop() {
  20.         if (future != null) {
  21.             future.channel().close().addListener(ChannelFutureListener.CLOSE);
  22.             future.awaitUninterruptibly();
  23.             boss.shutdownGracefully();
  24.             worker.shutdownGracefully();
  25.             future = null;
  26.             logger.info(" 服务关闭 ");
  27.         }
  28.     }
  29.     public void start() {
  30.         logger.info(" nettyServer 正在启动");
  31.         if (epoll) {
  32.             logger.info(" nettyServer 使用epoll模式");
  33.             boss = new EpollEventLoopGroup(4);
  34.             //指定线程32
  35.             worker = new EpollEventLoopGroup(32);
  36.         } else {
  37.             logger.info(" nettyServer 使用nio模式");
  38.             boss = new NioEventLoopGroup(4);
  39.             worker = new NioEventLoopGroup(32);
  40.         }
  41.         logger.info("netty服务器在[" + this.port + "]端口启动监听");
  42.         logger.info("netty服务器在[" + this.port2 + "]端口启动监听");
  43.         serverBootstrap.group(boss, worker)
  44.         // tcp缓冲区:将不能处理的客户端连接请求放到队列里等待
  45.                 .option(ChannelOption.SO_BACKLOG, 10240)
  46.                 //多个进程或者线程绑定到同一端口,提高服务器程序的性能
  47.                 .option(EpollChannelOption.SO_REUSEPORT, true)
  48.                 //打印info级别的日志
  49. //                .handler(new LoggingHandler(LogLevel.INFO))
  50.                 // 将网络数据积累到一定的数量后,服务器端才发送出去,会造成一定的延迟。希望服务是低延迟的,建议将TCP_NODELAY设置为true
  51.                 .childOption(ChannelOption.TCP_NODELAY, true)
  52.                 // 可以确保连接在因网络问题中断时能够被及时检测并处理。
  53.                 .childOption(ChannelOption.SO_KEEPALIVE, false)
  54.                 // 配置ByteBuf内存分配器
  55.                 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
  56.                 // 配置 编码器、解码器、业务处理
  57.                 .childHandler(nettyServerInitializer);
  58.         if (epoll) {
  59.             serverBootstrap.channel(EpollServerSocketChannel.class);
  60.         } else {
  61.             serverBootstrap.channel(NioServerSocketChannel.class);
  62.         }
  63.         try {
  64.             future = serverBootstrap.bind(port).sync();
  65.             future2 = serverBootstrap.bind(port2).sync();
  66.             future.channel().closeFuture().addListener(new ChannelFutureListener() {
  67.                 @Override
  68.                 public void operationComplete(ChannelFuture future) throws Exception {       //通过回调只关闭自己监听的channel
  69.                     future.channel().close();
  70.                 }
  71.             });
  72.             future2.channel().closeFuture().addListener(new ChannelFutureListener() {
  73.                 @Override
  74.                 public void operationComplete(ChannelFuture future) throws Exception {
  75.                     future.channel().close();
  76.                 }
  77.             });
  78.             // 等待服务端监听端口关闭
  79.             // future.channel().closeFuture().sync();
  80.         } catch (Exception e) {
  81.             logger.info("nettyServer 启动时发生异常---------------{}", e);
  82.             logger.info(e.getMessage());
  83.         } finally {
  84.             //这里一定要注释,因为上面没有阻塞了,不注释的话,这里会直接关闭的
  85.             //boss.shutdownGracefully();
  86.             //worker.shutdownGracefully();
  87.         }
  88.     }
复制代码
2、超时设置
  1.     protected void initChannel(SocketChannel socketChannel) throws Exception {
  2.         ChannelPipeline pipeline = socketChannel.pipeline();
  3.         // readerIdleTimeSeconds 读超时;writerIdleTimeSeconds 写超时;allIdaleTimes 读写全超时 300 秒;断开连接
  4.         pipeline.addLast(new IdleStateHandler(0, 0, 300, TimeUnit.SECONDS));
  5.         pipeline.addLast(new LengthFieldBasedFrameDecoder(64 * 1024, 22, 2, 1, 0));
  6.        //根据端口动态的选择解码器
  7.         Integer localPort = socketChannel.localAddress().getPort();
  8.         if (localPort == 8050 || localPort == 8055) {
  9.             pipeline.addLast("authHandler", authHandler);
  10.             pipeline.addLast("messageHandler", messageHandler);
  11.         } else if (localPort == 8030 || localPort == 8035) {
  12.                 pipeline.addLast("authHandler", authHandler2);
  13.                 pipeline.addLast("messageHandler", messageHandler2);
  14.         }
  15.     }
复制代码
在处理器中的应用
  1.     /**
  2.      * 用户事件触发,发现读超时会调用 根据心跳检测状态去关闭连接
  3.      */
  4.     @Override
  5.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  6.         if (evt instanceof IdleStateEvent) {
  7.             IdleStateEvent event = (IdleStateEvent) evt;
  8.             String clientId = ChannelStore.getClientId(ctx);
  9.             Attribute<Integer> timesAttr = ctx.channel().attr(AttributeKey.valueOf("times"));
  10.             Integer timeInt = timesAttr.get();
  11.             if (timeInt == null) {
  12.                 timeInt = 0;
  13.             }
  14.             String eventDesc = null;
  15.             switch (event.state()) {
  16.                 case READER_IDLE:
  17.                     eventDesc = "读空闲";
  18.                     break;
  19.                 case WRITER_IDLE:
  20.                     eventDesc = "写空闲";
  21.                     break;
  22.                 case ALL_IDLE:
  23.                     eventDesc = "读写空闲";
  24.                     break;
  25.             }
  26.             //获取ip地址信息
  27.             InetAddress ip = InetAddress.getLocalHost();
  28.             String hostAddress = ip.getHostAddress();
  29.             log.info(clientId + " 地址:" + hostAddress + "发生超时事件--" + eventDesc);
  30.             timeInt++;
  31.             timesAttr.set(timeInt);
  32.             if (timeInt > 1) {
  33.                 //删除ip地址信息
  34.                 String redisIpAddress = redisTemplateNew.get(clientId + "_IP");
  35.                 boolean hostBoolean = hostAddress.equals(redisIpAddress);
  36.                 log.info(hostBoolean + "check :" + clientId + " redisTemplateNewDelete:" + hostAddress + "redisIP:" + redisIpAddress);
  37.                 if (redisIpAddress != null && hostBoolean) {
  38.                     redisTemplateNew.delete(clientId + "_IP");
  39.                 }
  40.                 log.info(clientId + " 地址:" + hostAddress + ":" + ctx.channel().remoteAddress() + "空闲次数为" + timeInt + "次 关闭连接 " + clientId);
  41.                 ctx.channel().close();
  42.             }
  43.         }
  44.     }
复制代码
3、下行API
  1. public class SendApi {
  2.     @Resource
  3.     private RedisTemplateNew redisTemplateNew;
  4.     @Resource
  5.     private KafkaTemplate<String, String> kafkaTemplate;
  6.     @GetMapping(value = "/userinfo")
  7.     public UserDto gerUserInfo() {
  8.         UserDto user = new UserDto();
  9.         user.setUserId("888888");
  10.         user.setUserName("holmium");
  11.         user.setSex("1");
  12.         return user;
  13.     }
  14. }
复制代码
}
}
}
[code][/code]
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

科技颠覆者

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表