ToB企服应用市场:ToB评测及商务社交产业平台

标题: Golang学习条记_RabbitMQ的原理架构和使用 [打印本页]

作者: 小秦哥    时间: 2024-6-19 20:40
标题: Golang学习条记_RabbitMQ的原理架构和使用
RabbitMQ 简介

RabbitMQ 架构明确

     
  1. rabbitMQ.* == rabbitMQ.topic != rabbitMQ.topic.topic
  2. rabbitMQ.# == rabbit.topic == rabbit.topic.topic
复制代码
RabbitMQ 工作流程

Producer方向

Consumer方向

RabbitMQ的两种摆设方式

Meta Data : 元数据(形貌数据的数据)

普通模式

对于该模式的两个节点,消息只会存在其中一个节点,另一个节点只保存mate data,当consumer 连接节点2访问节点1的数据信息时,消息会在两个节点中传递。
该模式下p和c应尽量连接每个节点,这样起到线性拓展的作用。
但存在一个问题,如果节点上另有未消费的消息,但是节点挂了。如果节点设置了持久化,则需要在节点重启的时候消息才会规复。如果未设置持久化,则消息会丢失。
镜像模式

消息存在多个节点中,消息会在节点与节点之间同步,可实现高可用(当一个节点挂了,另一个节点可以接替其位置,继承工作)但会低落性能,由于大量消息进入和同步,会占用大量带宽,但是为了保证高可靠性需要弃取。
面试题


RabbitMQ的使用(Golang使用amqp包)

代码部分参考 upup小亮的博客
代码只是简朴的操作,主要是认识流程。对于如何创建Queue和绑定Exchange之类的操作有个相识。

Simple(简朴收发模式,只有一个Queue)

Simple运行机制与WorkQueue相似,只是一个Consumer与多个Consumer的区别。多个Consumer之间存在竞争关系,以是工作队列是创建多个Consumer,多个竞争只有一个可以获取消息消费。消费乐成后ack消息删除。
演示代码放到一起了:
WorkQueue 工作队列

生产者
  1. // simple and work queue
  2. func main2() {
  3.         // 连接到 rabbitMQ
  4.         conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  5.         if err != nil {
  6.                 log.Fatalf("无法创建连接:%s", err)
  7.                 return
  8.         }
  9.         // 默认关闭
  10.         defer conn.Close()
  11.         // 创建通道Channel
  12.         ch, err := conn.Channel()
  13.         if err != nil {
  14.                 log.Fatalf("无法创建channel:%s", err)
  15.                 return
  16.         }
  17.         // 通道关闭
  18.         defer ch.Close()
  19.         // 创建存储队列
  20.         queue, err := ch.QueueDeclare(
  21.                 "hello", // 队列名称
  22.                 false, // 持久化设置,可以为true根据需求选择
  23.                 false, // 自动删除,没有用户连接删除queue一般不选用
  24.                 false, //独占
  25.                 false, //等待服务器确认
  26.                 nil)   //参数
  27.         if err != nil {
  28.                 fmt.Println(err)
  29.                 log.Fatalf("无法声明队列:%s", err)
  30.                 return
  31.         }
  32.         var body string
  33.         // 发送信息
  34.         for i := 0; i < 10; i++ {
  35.                 fmt.Println(i)
  36.                 body = "Hello RabbitMQ" + string(i)
  37.                 err = ch.Publish(
  38.                         "",
  39.                         queue.Name,
  40.                         false, // 必须发送到消息队列
  41.                         false, // 不等待服务器确认
  42.                         amqp.Publishing{
  43.                                 ContentType: "text/plain",
  44.                                 Body:        []byte(body),
  45.                         })
  46.                 if err != nil {
  47.                         log.Fatalf("消息生产失败:%s", err)
  48.                         continue
  49.                 }
  50.         }
  51. }
复制代码
消费者
  1.         // create conn
  2.         // 如果同时运行两个这样的consumer代码,就是工作队列。只有一个consumer就是simple
  3.         conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  4.         if err != nil {
  5.                 log.Fatalf("无法创建连接:%s", err)
  6.                 return
  7.         }
  8.         defer conn.Close()
  9.         // create channel
  10.         ch, err := conn.Channel()
  11.         if err != nil {
  12.                 log.Fatalf("无法创建channel:%s", err)
  13.                 return
  14.         }
  15.         defer ch.Close()
  16.         // create queue
  17.         queue, err := ch.QueueDeclare(
  18.                 "hello",
  19.                 false,
  20.                 false,
  21.                 false,
  22.                 false,
  23.                 nil)
  24.         if err != nil {
  25.                 log.Fatalf("无法创建queue:%s", err)
  26.                 return
  27.         }
  28.        
  29.         // 消费信息
  30.         msgs, err := ch.Consume(
  31.                 queue.Name,
  32.                 "",
  33.                 true,
  34.                 false,
  35.                 false,
  36.                 false,
  37.                 nil)
  38.         if err != nil {
  39.                 log.Fatalf("无法消费信息:%s", err)
  40.                 return
  41.         }
  42.         for msg := range msgs {
  43.                 log.Println(string(msg.Body))
  44.         }
  45.         return
复制代码
pub/sub 发布订阅模式

发布订阅模式可以创建两个Queue,绑定到同一个Exchange中
生产者这边只需要跟互换机对接,而互换机类型为fanout
  1. func main() {
  2.         // 连接到 rabbitMQ
  3.         conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  4.         if err != nil {
  5.                 log.Fatalf("无法创建连接:%s", err)
  6.         }
  7.         // 默认关闭
  8.         defer conn.Close()
  9.         // 创建通道Channel
  10.         ch, err := conn.Channel()
  11.         if err != nil {
  12.                 log.Fatalf("无法创建channel:%s", err)
  13.         }
  14.         defer ch.Close()
  15.         // create exchange
  16.         ex := ch.ExchangeDeclare(
  17.                 "exchange1", // 交换机名称
  18.                 "fanout",    // 交换机类型
  19.                 true,        // 是否持久化
  20.                 false,       // 是否自动删除
  21.                 false,       // 是否内部使用
  22.                 false,       // 是否等待服务器响应
  23.                 nil,         // 其他属性
  24.         )
  25.         fmt.Println(ex)
  26.         body := "Hello RabbitMQ for Pub/Sub"
  27.         err = ch.Publish(
  28.                 "exchange1",
  29.                 "", // routing key 可以为空,因为fanout不看routing key
  30.                 false,
  31.                 false,
  32.                 amqp.Publishing{
  33.                         ContentType: "text/plain",
  34.                         Body:        []byte(body),
  35.                 })
  36.         if err != nil {
  37.                 log.Fatalf("err %s:", err)
  38.         }
  39.         log.Println(body)
  40. }
复制代码
消费者:创建互换机,类型为fanout,创建队列,绑定互换机(创建多个consumer绑定同一个queue和同一个互换机。这样发送一个消息,所有的consumer都能收到。== 发布订阅模型)
  1.         // Pub/Sub
  2.         // Create conn
  3.         conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  4.         if err != nil{
  5.                 log.Fatalf(err)
  6.         }
  7.         defer conn.Close()
  8.         // channel create
  9.         ch, err := conn.Channel()
  10.         if err != nil{
  11.                 log.Fatalf(err)
  12.         }
  13.         defer ch.Close()
  14.         // exchange create
  15.         ex := ch.ExchangeDeclare(
  16.                 "exchange1",
  17.                 "fanout",
  18.                 true,
  19.                 false,
  20.                 false,
  21.                 false,
  22.                 nil)
  23.         fmt.Println(ex)
  24.         // queue create
  25.         queue, err := ch.QueueDeclare(
  26.                 "hello",
  27.                 false,
  28.                 false,
  29.                 false,
  30.                 false,
  31.                 nil)
  32.         if err != nil{
  33.                 log.Fatalf(err)
  34.         }
  35.         err = ch.QueueBind(
  36.                 queue.Name,
  37.                 "",
  38.                 "exchange1",
  39.                 false,
  40.                 nil)
  41.         if err != nil{
  42.                 log.Fatalf(err)
  43.         }
  44.         msgs, err := ch.Consume(
  45.                 queue.Name,
  46.                 "",
  47.                 true,
  48.                 false,
  49.                 false,
  50.                 false,
  51.                 nil)
  52.         if err != nil{
  53.                 log.Fatalf(err)
  54.         }
  55.         go func() {
  56.                 for d := range msgs {
  57.                         log.Printf("Received a message: %s", d.Body)
  58.                 }
  59.         }()
  60.         log.Printf("Waiting for messages. To exit press CTRL+C")
  61.         <-make(chan struct{}) // 阻塞主goroutine
  62. }
复制代码
Routing 模式(对特定的队列投递消息)

生产者
  1. func main() {
  2.         // 连接到 rabbitMQ
  3.         conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  4.         if err != nil {
  5.                 log.Fatalf("无法创建连接:%s", err)
  6.         }
  7.         // 默认关闭
  8.         defer conn.Close()
  9.         // 创建通道Channel
  10.         ch, err := conn.Channel()
  11.         if err != nil {
  12.                 log.Fatalf("无法创建channel:%s", err)
  13.         }
  14.         defer ch.Close()
  15.         // create exchange
  16.         ex := ch.ExchangeDeclare(
  17.                 "exchange1", // 交换机名称
  18.                 "direct",    // 交换机类型
  19.                 true,        // 是否持久化
  20.                 false,       // 是否自动删除
  21.                 false,       // 是否内部使用
  22.                 false,       // 是否等待服务器响应
  23.                 nil,         // 其他属性
  24.         )
  25.         fmt.Println(ex)
  26.         body := "Hello RabbitMQ for direct routing"
  27.                 // 发布消息到交换机,并指定路由键
  28.         err = ch.Publish(
  29.                 "logs_direct", // 交换机名称
  30.                 "routing_key", // 路由键
  31.                 false,         // 是否等待服务器响应
  32.                 false,         // 是否立即将消息写入磁盘
  33.                 amqp.Publishing{
  34.                         ContentType: "text/plain",
  35.                         Body:        []byte(body),
  36.                 },
  37.         )
  38.         if err != nil{
  39.                 log.Fatalf("无法创建send msg:%s", err)
  40.         }
  41.         log.Printf("Sent message: %s", message)
复制代码
消费者
  1. func main() {
  2.         // 连接到RabbitMQ服务器
  3.         conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  4.         if err != nil{
  5.                 log.Fatalf("无法创建send msg:%s", err)
  6.         }
  7.         defer conn.Close()
  8.         // 创建一个通道
  9.         ch, err := conn.Channel()
  10.         if err != nil{
  11.                 log.Fatalf("无法创建send msg:%s", err)
  12.         }
  13.         defer ch.Close()
  14.         // 声明一个交换机
  15.         err = ch.ExchangeDeclare(
  16.                 "logs_direct", // 交换机名称
  17.                 "direct",      // 交换机类型
  18.                 true,          // 是否持久化
  19.                 false,         // 是否自动删除
  20.                 false,         // 是否内部使用
  21.                 false,         // 是否等待服务器响应
  22.                 nil,           // 其他属性
  23.         )
  24.         if err != nil{
  25.                 log.Fatalf("无法创建send msg:%s", err)
  26.         }
  27.         // 声明一个临时队列
  28.         q, err := ch.QueueDeclare(
  29.                 "",    // 队列名称,留空表示由RabbitMQ自动生成,因为定义了key所以队列名可以是随意的,毕竟是依靠key来进行匹配的
  30.                 false, // 是否持久化
  31.                 false, // 是否自动删除(当没有任何消费者连接时)
  32.                 true,  // 是否排他队列(仅限于当前连接)
  33.                 false, // 是否等待服务器响应
  34.                 nil,   // 其他属性
  35.         )
  36.         // 将队列绑定到交换机上,并指定要接收的路由键
  37.         err = ch.QueueBind(
  38.                 q.Name,        // 队列名称
  39.                 "routing_key",      // 路由键
  40.                 "logs_direct", // 交换机名称
  41.                 false,         // 是否等待服务器响应
  42.                 nil,           // 其他属性
  43.         )
  44.         if err != nil{
  45.                 log.Fatalf("无法创建send msg:%s", err)
  46.         }
  47.         // 订阅消息
  48.         msgs, err := ch.Consume(
  49.                 q.Name, // 队列名称
  50.                 "",     // 消费者标识符,留空表示由RabbitMQ自动生成
  51.                 true,   // 是否自动应答
  52.                 false,  // 是否独占模式(仅限于当前连接)
  53.                 false,  // 是否等待服务器响应
  54.                 false,  // 其他属性
  55.                 nil,    // 其他属性
  56.         )
  57.         failOnError(err, "Failed to register a consumer")
  58.         // 接收消息的goroutine
  59.         go func() {
  60.                 for d := range msgs {
  61.                         log.Printf("Received a message: %s", d.Body)
  62.                 }
  63.         }()
  64.         log.Printf("Waiting for messages. To exit press CTRL+C")
  65.         <-make(chan struct{}) // 阻塞主goroutine
复制代码
topic

  1. func main() {
  2.         // 连接到RabbitMQ服务器
  3.         conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  4.         if err != nil{
  5.                 log.Fatalf(err)
  6.         }
  7.         defer conn.Close()
  8.         // 创建一个通道
  9.         ch, err := conn.Channel()
  10.         if err != nil{
  11.                 log.Fatalf(err)
  12.         }
  13.         defer ch.Close()
  14.         // 声明一个交换机
  15.         err = ch.ExchangeDeclare(
  16.                 "logs_topic", // 交换机名称
  17.                 "topic",      // 交换机类型
  18.                 true,         // 是否持久化
  19.                 false,        // 是否自动删除
  20.                 false,        // 是否内部使用
  21.                 false,        // 是否等待服务器响应
  22.                 nil,          // 其他属性
  23.         )
  24.         if err != nil{
  25.                 log.Fatalf(err)
  26.         }
  27.         // 定义要发送的消息的路由键和内容
  28.         routingKey := "example.key.das"
  29.         message := "Hello, RabbitMQ!"
  30.         // 发布消息到交换机,并指定路由键
  31.         err = ch.Publish(
  32.                 "logs_topic", // 交换机名称
  33.                 routingKey,   // 路由键
  34.                 false,        // 是否等待服务器响应
  35.                 false,        // 是否立即发送
  36.                 amqp.Publishing{
  37.                         ContentType: "text/plain",
  38.                         Body:        []byte(message),
  39.                 },
  40.         )
  41.         if err != nil{
  42.                 log.Fatalf(err)
  43.         }
  44.         log.Printf("Sent message: %s", message)
  45. }
复制代码
消费者
  1. func main() {
  2.         // 连接到RabbitMQ服务器
  3.         conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  4.         if err != nil{
  5.                 log.Fatalf(err)
  6.         }
  7.         defer conn.Close()
  8.         // 创建一个通道
  9.         ch, err := conn.Channel()
  10.         if err != nil{
  11.                 log.Fatalf(err)
  12.         }
  13.         defer ch.Close()
  14.         // 声明一个交换机
  15.         err = ch.ExchangeDeclare(
  16.                 "logs_topic", // 交换机名称
  17.                 "topic",      // 交换机类型
  18.                 true,         // 是否持久化
  19.                 false,        // 是否自动删除
  20.                 false,        // 是否内部使用
  21.                 false,        // 是否等待服务器响应
  22.                 nil,          // 其他属性
  23.         )
  24.         if err != nil{
  25.                 log.Fatalf(err)
  26.         }
  27.         // 声明一个临时队列
  28.         q, err := ch.QueueDeclare(
  29.                 "",    // 队列名称,留空表示由RabbitMQ自动生成
  30.                 false, // 是否持久化
  31.                 false, // 是否自动删除(当没有任何消费者连接时)
  32.                 true,  // 是否排他队列(仅限于当前连接)
  33.                 false, // 是否等待服务器响应
  34.                 nil,   // 其他属性
  35.         )
  36.         if err != nil{
  37.                 log.Fatalf(err)
  38.         }
  39.         // 将队列绑定到交换机上,并指定要接收的路由键
  40.         err = ch.QueueBind(
  41.                 q.Name,       // 队列名称
  42.                 "example.#",  // 路由键,可以使用通配符*匹配一个单词
  43.                 "logs_topic", // 交换机名称
  44.                 false,        // 是否等待服务器响应
  45.                 nil,          // 其他属性
  46.         )
  47.         if err != nil{
  48.                 log.Fatalf(err)
  49.         }
  50.         // 创建一个消费者通道
  51.         msgs, err := ch.Consume(
  52.                 q.Name, // 队列名称
  53.                 "",     // 消费者标识符,留空表示由RabbitMQ自动生成
  54.                 true,   // 是否自动应答
  55.                 false,  // 是否排他消费者
  56.                 false,  // 是否阻塞
  57.                 false,  // 是否等待服务器响应
  58.                 nil,    // 其他属性
  59.         )
  60.         if err != nil{
  61.                 log.Fatalf(err)
  62.         }
  63.         // 接收和处理消息
  64.         forever := make(chan bool)
  65.         go func() {
  66.                 for d := range msgs {
  67.                         log.Printf("Received a message: %s", d.Body)
  68.                 }
  69.         }()
  70.         log.Printf("Waiting for messages...")
  71.         // 阻塞
  72.         <-forever
  73. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4