立山 发表于 2024-7-18 23:55:34

RabbitMQ架构设计原理

目次
消息中心件
"异步" 和 "同步"
解耦:
“异步解耦”
流量削峰:
传统的http哀求有诸多缺点:
举例说明:
代码演示:
解决措施:
1、多线程处理惩罚业务逻辑(实现异步操作):
2、Mq处理惩罚业务逻辑(实现异步操作):
MQ实现的两个版本:
1、没有网络的情况下实现MQ :利用多线程制造生产者和消耗者
2、基于网络通讯版本mq netty实现
Mq与多线程之间区别:
Mq消息中心件名词

消息中心件

消息中心件基于队列模型实现异步/同步传输数据
作用:可以实现支撑高并发、异步解耦、流量削峰、降低耦合度。
"异步" 和 "同步"

通常指的是数据发送(生产)和消耗(处理惩罚)的方式。
https://img-blog.csdnimg.cn/direct/a01fb92f4710449780e03910695dab29.png
https://img-blog.csdnimg.cn/direct/cb0118fe6c3a48d7aec50de61bafdb2b.png
解耦:

解耦意味着将原本紧密关联或相互依靠的组件、功能或体系分离,使它们可以或许独立地运行、修改和扩展,而不必要影响其他部分。
在进程或线程中,解耦可以使得多步调的操作并行执行,而不是串行执行,从而提高效率。例如,在一个下载和解析的场景中,下载和解析函数可以放在进程池或线程池中并行运行,谁先下载完就先解析,从而实现解耦。
“异步解耦”

可以理解为在并发编程或体系设计中,通过异步处理惩罚的方式实现体系组件或功能之间的解耦,使得它们可以或许并行运行、独立扩展,并且不会相互阻塞或依靠。
流量削峰:

流量削峰是指通过一些技术手段来减弱瞬时的哀求高峰,使体系吞吐量在高峰哀求下保持可控

传统的http哀求有诸多缺点:

1.在高并发的情况下,发送大量的哀求到达服务器端导致服务器端处理惩罚哀求堆积。
2.Tomcat服务器处理惩罚每个哀求都有自己独立的线程,如果凌驾最大线程数,会将该哀求缓存到队列中,哀求堆积过多的情况下,可能导致tomcat服务器崩溃
所以一样平常都会在nginx入口实现限流,整合服务掩护框架。
3. http哀求处理惩罚业务逻辑比力耗时,容易造成客户端不停等待,阻塞等待过程中会导致客户端超时重发,引发幂等性问题。
留意事项:接口是为http协议的情况下,最好不要处理惩罚比力耗时的业务逻辑,耗时的业务逻辑应该单独交给多线程或者是mq处理惩罚。
举例说明:

https://img-blog.csdnimg.cn/direct/255cf17c1f9f489aabb56649ca7f5609.png
客户端发送哀求到达服务器端,服务器端实现会员注册业务逻辑,
1.insertMember() --插入会员数据  1s
2.sendSms()----发送登岸短信提醒 3s
3.sendCoupons()----发送新人优惠券  3s
总共响应必要7s时间,可能会导致客户端阻塞7s时间,对用户体验不是很好。
代码演示:

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MemberService {

    //@Autowired
    //public MemberServiceAsync memberServiceAsync;

    //@Bean
    @RequestMapping("/addMember")
    publicString addMember(){
      //1.数据库插入数据 log.info(">01<");
      System.out.println(">01<");
      sms();
      System.out.println(">04<");
      return "用户注册成功!!";
    }


    public String sms(){
      System.out.println(">02<");
      try{
            System.out.println(">正在发送短信<");
            Thread.sleep(3000);
      }catch(Exception e){
            e.printStackTrace();
      }
      System.out.println(">03<");
      return "短信发送完成!";
    }
} 解决措施:

多线程与MQ方式实现异步
小项目可用多线程实现异步,大项目必要利用MQ,由于多线程面临高并发会对CPU造成损耗
1、多线程处理惩罚业务逻辑(实现异步操作):

https://img-blog.csdnimg.cn/direct/a1408baad1ac44f9b315ae4d0650983f.png
用户向数据库中插入一条数据之后,在单独开启一个线程异步发送短信和优惠操作。客户端只必要等待1s时间
优点:适合于小项目 实现异步
缺点:有可能会消耗服务器cpu资源资源
代码演示:
https://img-blog.csdnimg.cn/direct/15918076f66144d9aab08f02a85daf7b.png
异步类:
https://img-blog.csdnimg.cn/direct/2c3adce0c76d4f18b414cd3e21cb5e6f.png
主配置类加注解:
https://img-blog.csdnimg.cn/direct/ed68537823724fb1b6f8af619f0174a7.png
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

//异步类实现sms方法
@Component
public class MemberServiceAsync
{
    @Async
    public String sms(){
      System.out.println(">02<");
      System.out.println(">正在发送短信<");
      try {
            Thread.sleep(3000);
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
      System.out.println(">03<");
      return "短信发送完成!!";
    }
} 解析:@Async的作用是什么
它用于声明一个方法是异步的。
当你在一个方法上利用了 @Async 注解,并正确配置了相关的异步支持,那么该方法将不会在主线程中同步执行,而是会由 Spring 的使命调度器(通常是一个线程池)异步地执行。
配置异步支持通过在配置类上添加 @EnableAsync 注解或在 XML 配置中添加 来实现。
2、Mq处理惩罚业务逻辑(实现异步操作):

https://img-blog.csdnimg.cn/direct/7c39a74a2ab74ca098b42aa50329c7d5.png
当生产者一操作数据库存储数据,就会给消息中心件发送一个msg的提示消息。消息中心件的消息第一次发送给消耗者时,会判断有没有消耗者的存在,如果有,消耗者会主动去消息队列里拉取消息。之后只要消耗者存在,服务端(消息中心件)就属于将消息推送给消耗端。

MQ实现的两个版本:

1、没有网络的情况下实现MQ :利用多线程制造生产者和消耗者

手撕代码:
package com.qcby.async;

import org.json.JSONObject;

import java.util.concurrent.LinkedBlockingDeque;

/**
* 没有网络的情况下实现MQ
* 利用多线程制造生产者和消费者
*/
public class BoyatopThreadMQ {

    //MQ服务器 初始化消息的队列
    private static LinkedBlockingDeque<JSONObject> msgs=new LinkedBlockingDeque();

    //主函数程序入口
    public static void main(String[] args) {
      //生产者生产线程
      Thread producerThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                  while(true){
                        Thread.sleep(1000);
                        JSONObject data = new JSONObject();
                        data.put("userId","12345");
                        //存入消息队列
                        msgs.offer(data);
                  }
                } catch (Exception e) {
                  e.printStackTrace();
                }
            }
      },"生产者");
      producerThread.start();
      //消费端消费线程
      Thread consumerThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                  while(true){
                        JSONObject data = msgs.poll();
                        if(data != null){
                            System.out.println(Thread.currentThread().getName() + ",获取到数据:" + data);
                        }
                  }
                } catch (Exception e) {
                  e.printStackTrace();
                }
            }
      },"消费者");
      consumerThread.start();
    }

}
2、基于网络通讯版本mq netty实现

https://img-blog.csdnimg.cn/direct/db9c2e25829c4ffe8248165cf0403232.png

解释:
消耗者netty客户端与nettyServer端MQ服务器端保持长连接,MQ服务器端保存消耗者连接。
生产者netty客户端发送哀求给nettyServer端MQ服务器端,MQ服务器端在将该消息内容发送给消耗者。

生产者投递消息给MQ服务器端,MQ服务器端必要缓存该消息。
如果mq服务器端宕机之后,消息如何保证不丢失?
答:持久化机制,会在磁盘中存储一份

如果mq接收到生产者投递消息,如果消耗者不在的情况下,该消息是否会丢失?
答:不会丢失,由于有消息确认机制,必须要消耗者消耗该消息成功之后,再通知给mq服务器端删除该消息。
(mq与消耗者之间的信息通报:先拉取,建立长连接,后推送)

对于多个消耗者,如果没有主题(或者在一个主题里面)会不会消耗同一条消息?
答:不会,每一个消耗者消耗完之后,会告诉MQ:消耗完毕,可以删除,消息确认机制会避免重复性消耗 同组消耗者不会出现重复性消耗的情况,由于有消息确认机制

Mq如何实现抗高并发头脑?
答:Mq消耗者根据自身能力情况 ,拉取mq服务器端消息消耗。默认的情况下是取出一条消息。
缺点:1、存在耽误的问题 2、必要考虑mq消耗者提高速率的问题
如何消耗者提高速率?
答:消耗者实现集群、消耗者批量获取消息即可。

Mq与多线程之间区别:

MQ可以实现异步/解耦/流量削峰问题;
多线程也可以实现异步,但是消耗到cpu资源,没有实现解耦。

Mq消息中心件名词

Producer 生产者:投递消息到MQ服务器端;
Consumer 消耗者:从MQ服务器端获取消息处理惩罚业务逻辑;
Broker MQ服务器端 :
Topic 主题:分类业务逻辑发送短信主题、发送优惠券主题
Queue 存放消息模型队列: 先进先出 后进后出原则 数组/链表
Message  生产者投递消息报文:json 举例: body:{"msg":{"userId":"123456","age":"23"},"type":"producer",”topic”:””}

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: RabbitMQ架构设计原理