数据接入
链接参考文档 LVS+Keepalived项目
车辆数据上收,TBox通过TCP协议连接到TSP平台 创建连接后举行数据上传。也可借由该连接实现远程控制等操纵。
通过搭建 LV—NGinx—Netty实现高并发数据接入
- LVS:四层负载平衡(位于内核层):根据请求报文中的目标地址和端口举行调理
- NGinx:七层负载平衡(位于应用层):根据请求报文的内容举行调理,这种调理属于「署理」的方式
组件角色主机名称假造ip/端口LVS+keepalivedactive-0007LVS+keepalivedbackup-0006Nginx负载-00058050Nginx负载-00048050Netty真实服务器-00038050Netty真实服务器-00028050Netty真实服务器-00038092Netty真实服务器-00028092Netty真实服务器-00018092Netty真实服务器-00018092 使用华为云服务器 安装LVS 必要有VPC服务(免费),在控制台页面做假造ip绑定
一、安装LVS 服务
使用的是 DR 模式
- NET模式:LVS将数据请求包转发给真实服务器的时间,会修改成真实服务器的IP地址;在复兴时真实服务器会把复兴包发往LVS调理服务器 再发往客户端。
- TUN隧道模式:将原始数据包封装并添加新的包头(内容包括新的源地址及端口、目标地址及端口),从而实现将一个目标为调理器的VIP地址的数据包封装,通过隧道转发给后端的真实服务器(RealServer)感觉很复杂。
- DR模式:要求LVS调理服务器要和后端服务器在同一局域网下,为后端服务器添加lo回环地址为VIP(假造IP地址)这样复兴给客户端 客户端会以为是连接的VIP举行复兴的
DR模式不支持端口映射
- #查看网卡 eth0
- ifconfig
- #执行 虚拟ip:172.25.94.187 广播地址(不变):172.25.94.191 子网掩码(不变):255.255.255.192 up:立即启用vip(虚拟ip)
- ifconfig eth0:1 172.25.94.187 broadcast 172.25.94.191 netmask 255.255.255.192 up
- #查看当前网卡信息
- ip a
- #安装keepalived
- sudo yum install keepalived
- #启动keepalived
- systemctl start keepalived
- #加入开机启动keepalived
- systemctl enable keepalived
- #重新启动keepalived
- systemctl restart keepalived
- #查看keepalived状态
- systemctl status keepalived
复制代码
LVS 模块内嵌lvs模块,只必要ipvsadm和keepalived安装
- #查看Linux 内核版本
- uname -r
- #查看内核是否集成lvs模块
- find /lib/modules/$(uname -r)/ -iname "**.ko*" | cut -d/ -f5-
- #安装LVS管理工具:ipvsadm
- yum install -y gcc gcc-c++ makepcre pcre-devel kernel-devel openssl-devel libnl-devel popt*
- yum -y install ipvsadm
- #启动ipvs
- sudo ipvsadm
- #查看是否支持lvs
- sudo lsmod |grep ip_vs
- #查看ipvsadm 版本
- ipvsadm -v
- #服务器添加路由规则
- route add -host 172.25.94.187 dev ens33:0
- route add -host 172.25.110.124 dev eth:0
- #启用系统的包转发功能 #1:启用ip转发,0:禁止ip转发
- echo "1" >/proc/sys/net/ipv4/ip_forward
- #清除原有转发规则
- systemctl restart keepalived
- systemctl status ipvsadm
- ipvsadm --clear
- #添加虚拟ip规则 rr:负载均衡算法 轮询
- ipvsadm -A -t 172.25.94.187:8043 -s rr
- ipvsadm -a -t 172.25.94.187:8043 -r 172.25.94.151:8043 -g
- ipvsadm -a -t 172.25.94.187:8043 -r 172.25.94.152:8043 -g
- ipvsadm -l
- #配置tcp/tcpfin/udp超时时间
- ipvsadm --set 900 120 300
- #添加虚机IP规则也可以通过修改文件实现
- vim /etc/keepalived/keepalived.conf
复制代码- global_defs {
- router_id chery_21
- }
- vrrp_instance VI_1 {
- state MASTER
- interface eth0
- virtual_router_id 51
- priority 100
- advert_int 1
- authentication
- {
- auth_type PASS
- auth_pass 1111
- }
- virtual_ipaddress {
- 172.25.110.124
- }
- }
- virtual_server 172.25.110.124 8050 {
- delay_loop 6
- lb_algo rr
- lb_kind DR
- persistence_timeout 50
- protocol TCP
- real_server 172.25.110.19 8050 {
- weight 1
- TCP_CHECK
- {
- connect_timeout 30
- delay_before_retry 3
- }
- }
- real_server 172.25.110.18 8050 {
- weight 2
- TCP_CHECK
- {
- connect_timeout 30
- }
- }
- }
复制代码 全局定义(global_defs)
- router_id chery_21:定义了当前Keepalived实例的路由ID,这是唯一的标识符,用于在VRRP组中区分不同的Keepalived实例。
VRRP实例(vrrp_instance VI_1)
- state MASTER:设置当前实例的初始状态为MASTER。在VRRP组中,MASTER负责处理对假造IP的流量。
- interface eth0:指定VRRP通信使用的网络接口。
- virtual_router_id 51:假造路由的ID,用于在VRRP组中标识不同的假造路由器。
- priority 100:设置当前实例的优先级,优先级高的实例将成为MASTER。
- advert_int 1:VRRP告示的间隔时间,单位为秒。MASTER每隔这个时间会向其他节点发送VRRP告示。
- :VRRP认证设置,确保只有授权的设备可以到场VRRP组。
- auth_type PASS:使用密码认证。
- auth_pass 1111:认证密码。
- virtual_ipaddress:定义了假造IP地址,即VIP,客户端将访问此IP地址来访问服务。
假造服务器(virtual_server)
- 172.25.110.124 8050:定义了假造服务器的IP地址和端口号,这里与VRRP的VIP相同,表明这个假造服务器是通过VIP来访问的。
- delay_loop 6:健康检查的时间间隔,单位为秒。
- lb_algo rr:负载平衡算法,这里使用的是轮询(rr)。
- lb_kind DR:负载平衡类型,这里使用的是直接路由(DR),必要确保后端服务器(real_server)设置正确以支持DR模式。
- persistence_timeout 50:会话保持时间,单位为秒。在指定时间内,来自同一客户端的请求将被转发到同一台后端服务器。
- protocol TCP:使用TCP协议举行健康检查和负载平衡。
后端服务器(real_server)
- 定义了多个后端服务器,每个服务器都设置了IP地址、端口号、权重和健康检查设置。
- weight:权重,用于负载平衡时决定服务器的优先级。
- :TCP健康检查设置。
- connect_timeout 30:连接超时时间,单位为秒。
- delay_before_retry 3:在重试之前等待的时间,单位为秒。
LVS负载平衡(LVS简介、三种工作模式、十种调理算法)
- #列出当前LVS表中的所有配置,包括虚拟服务器和真实服务器的信息。
- ipvsadm -Ln
- #显示统计信息,包括已转发的连接数、入包个数、出包个数等。
- ipvsadm -L --stats
- #显示转发速率信息,包括每秒连接数、每秒入包个数、每秒出包个数等
- ipvsadm -L --rate
- #keepalived 日志
- vim /var/log/message
复制代码
二、安装nginx 服务
在nginx服务器和后端服务器 设置lo回环地址 否则复兴将不成功
服务器上一般还必要修改lo网卡 设置成假造IP。华为云服务器使用的是Centos 8版本 没有 lo设置文件,通过 ifconfig lo:0 172.25.94.187 netmask 255.255.255.255 broadcast 172.25.94.187 up 华为云服务器不支持修改网卡,所以修改了 eth0网卡设置 ip addr add 172.25.94.187/24 dev eth0
- yum -y nginx
- #检查是否有 stream
- nginx -V 2>&1 | grep --color -o with-stream
- #如果没有stream需要对nginx源码安装进行二次编译
- tar -zxvf nginx-*.tar.gz
- cd nginx-*
- ./configure --prefix=/usr/local/nginx --with-http_ssl_module --with-stream
- make
- sudo make install
- #重新加载Nginx配置文件
- nginx -s reload
- #强制停止Nginx服务
- nginx -s stop
- #重启nginx
- nginx -s reopen
- #修改配置文件
- vim /etc/nginx/nginx.conf
- netstat -anput | grep nginx
- nginx -c nginx.conf
复制代码
三、Netty服务
**1、在Linux上摆设启用了 epoll **
epoll:是Linux内核为处理大批量文件形貌符而作的改进的poll,是Linux下多路复用IO接口select/poll的加强版本
应用于Linux体系下的应用步调,特别是必要处理大量并发连接的高性能网络服务器。
BIO:同步阻塞IO,也就是传统阻塞型的IO,服务器实现模式是一个连接对应一个线程。客户端有连接请求时服务器端就必要启动一个线程举行处理,如果这个链接不做任何事情会造成不必要的线程开销。
NIO:同步非阻塞IO,服务器实现模式是一个线程处理多个请求,客户端发送的链接请求都会注册到多路复用器上,多路复用器轮询到链接有IO请求就举行处理。
AIO:异步非阻塞,AIO引入了异步通道的概念,采用了Proactor模式,简化了步调编写,有效的请求才启动线程,他的特点是先由操纵体系完成后才关照服务端步调启动线程行止理,一般适用于连接数较多且链接时间较长的应用。
- public class NettyServer {
- private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
- @Resource
- private NettyServerInitializer nettyServerInitializer;
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- EventLoopGroup boss = null;
- EventLoopGroup worker = null;
- ChannelFuture future = null;
- ChannelFuture future2 = null;
- //厂商编码
- Integer factoryCode = null;
- @Value("${netty.server.use-epoll}")
- boolean epoll = false;
- @Value("${netty.server.port1}")
- private int port = 8030;
- @Value("${netty.server.port2}")
- private int port2 = 8050;
- @PreDestroy
- public void stop() {
- if (future != null) {
- future.channel().close().addListener(ChannelFutureListener.CLOSE);
- future.awaitUninterruptibly();
- boss.shutdownGracefully();
- worker.shutdownGracefully();
- future = null;
- logger.info(" 服务关闭 ");
- }
- }
- public void start() {
- logger.info(" nettyServer 正在启动");
- if (epoll) {
- logger.info(" nettyServer 使用epoll模式");
- boss = new EpollEventLoopGroup(4);
- //指定线程32
- worker = new EpollEventLoopGroup(32);
- } else {
- logger.info(" nettyServer 使用nio模式");
- boss = new NioEventLoopGroup(4);
- worker = new NioEventLoopGroup(32);
- }
- logger.info("netty服务器在[" + this.port + "]端口启动监听");
- logger.info("netty服务器在[" + this.port2 + "]端口启动监听");
- serverBootstrap.group(boss, worker)
- // tcp缓冲区:将不能处理的客户端连接请求放到队列里等待
- .option(ChannelOption.SO_BACKLOG, 10240)
- //多个进程或者线程绑定到同一端口,提高服务器程序的性能
- .option(EpollChannelOption.SO_REUSEPORT, true)
- //打印info级别的日志
- // .handler(new LoggingHandler(LogLevel.INFO))
- // 将网络数据积累到一定的数量后,服务器端才发送出去,会造成一定的延迟。希望服务是低延迟的,建议将TCP_NODELAY设置为true
- .childOption(ChannelOption.TCP_NODELAY, true)
- // 可以确保连接在因网络问题中断时能够被及时检测并处理。
- .childOption(ChannelOption.SO_KEEPALIVE, false)
- // 配置ByteBuf内存分配器
- .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
- // 配置 编码器、解码器、业务处理
- .childHandler(nettyServerInitializer);
- if (epoll) {
- serverBootstrap.channel(EpollServerSocketChannel.class);
- } else {
- serverBootstrap.channel(NioServerSocketChannel.class);
- }
- try {
- future = serverBootstrap.bind(port).sync();
- future2 = serverBootstrap.bind(port2).sync();
- future.channel().closeFuture().addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception { //通过回调只关闭自己监听的channel
- future.channel().close();
- }
- });
- future2.channel().closeFuture().addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- future.channel().close();
- }
- });
- // 等待服务端监听端口关闭
- // future.channel().closeFuture().sync();
- } catch (Exception e) {
- logger.info("nettyServer 启动时发生异常---------------{}", e);
- logger.info(e.getMessage());
- } finally {
- //这里一定要注释,因为上面没有阻塞了,不注释的话,这里会直接关闭的
- //boss.shutdownGracefully();
- //worker.shutdownGracefully();
- }
- }
复制代码 2、超时设置
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- ChannelPipeline pipeline = socketChannel.pipeline();
- // readerIdleTimeSeconds 读超时;writerIdleTimeSeconds 写超时;allIdaleTimes 读写全超时 300 秒;断开连接
- pipeline.addLast(new IdleStateHandler(0, 0, 300, TimeUnit.SECONDS));
- pipeline.addLast(new LengthFieldBasedFrameDecoder(64 * 1024, 22, 2, 1, 0));
- //根据端口动态的选择解码器
- Integer localPort = socketChannel.localAddress().getPort();
- if (localPort == 8050 || localPort == 8055) {
- pipeline.addLast("authHandler", authHandler);
- pipeline.addLast("messageHandler", messageHandler);
- } else if (localPort == 8030 || localPort == 8035) {
- pipeline.addLast("authHandler", authHandler2);
- pipeline.addLast("messageHandler", messageHandler2);
- }
- }
复制代码 在处理器中的应用
- /**
- * 用户事件触发,发现读超时会调用 根据心跳检测状态去关闭连接
- */
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof IdleStateEvent) {
- IdleStateEvent event = (IdleStateEvent) evt;
- String clientId = ChannelStore.getClientId(ctx);
- Attribute<Integer> timesAttr = ctx.channel().attr(AttributeKey.valueOf("times"));
- Integer timeInt = timesAttr.get();
- if (timeInt == null) {
- timeInt = 0;
- }
- String eventDesc = null;
- switch (event.state()) {
- case READER_IDLE:
- eventDesc = "读空闲";
- break;
- case WRITER_IDLE:
- eventDesc = "写空闲";
- break;
- case ALL_IDLE:
- eventDesc = "读写空闲";
- break;
- }
- //获取ip地址信息
- InetAddress ip = InetAddress.getLocalHost();
- String hostAddress = ip.getHostAddress();
- log.info(clientId + " 地址:" + hostAddress + "发生超时事件--" + eventDesc);
- timeInt++;
- timesAttr.set(timeInt);
- if (timeInt > 1) {
- //删除ip地址信息
- String redisIpAddress = redisTemplateNew.get(clientId + "_IP");
- boolean hostBoolean = hostAddress.equals(redisIpAddress);
- log.info(hostBoolean + "check :" + clientId + " redisTemplateNewDelete:" + hostAddress + "redisIP:" + redisIpAddress);
- if (redisIpAddress != null && hostBoolean) {
- redisTemplateNew.delete(clientId + "_IP");
- }
- log.info(clientId + " 地址:" + hostAddress + ":" + ctx.channel().remoteAddress() + "空闲次数为" + timeInt + "次 关闭连接 " + clientId);
- ctx.channel().close();
- }
- }
- }
复制代码 3、下行API
- public class SendApi {
- @Resource
- private RedisTemplateNew redisTemplateNew;
- @Resource
- private KafkaTemplate<String, String> kafkaTemplate;
- @GetMapping(value = "/userinfo")
- public UserDto gerUserInfo() {
- UserDto user = new UserDto();
- user.setUserId("888888");
- user.setUserName("holmium");
- user.setSex("1");
- return user;
- }
- }
复制代码 }
}
}
[code][/code]
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |