Go操作RabbitMQ_go调用rabbitmq

诗林  金牌会员 | 2025-2-19 12:57:01 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 578|帖子 578|积分 1734

RabbitMQ 常用端口以及作用

                端口   功能       5672   AMQP(Advanced Message Queuing Protocol)协议的默认端口,用于客户端与RabbitMQ服务器之间的通信。       15672   RabbitMQ的管理界面,默认利用HTTP协议,用于监控和管理RabbitMQ服务器。       4369   Erlang分布式节点通信端口,用于RabbitMQ节点之间的通信。       25672   Erlang分布式节点通信端口,用于集群中的内部通信。       5671   安全的AMQP端口,利用TLS/SSL进行加密通信。   架构图



  • publisher:生产者,也就是发送消息的一方
  • consumer:斲丧者,也就是斲丧消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待斲丧者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:假造主机,起到数据隔离的作用。每个假造主机相互独立,有各自的exchange、queue

工作模式

1. 简朴模式(Simple)




  • 特点


    •    
    • 生产者直接将消息发送到队列中,斲丧者从队列中获取消息并进行处理。
             
      • 消息一旦被斲丧,就会从队列中移除。   
         
      
  • 真实应用场景


    •    
    • 电子邮件关照系统:生产者将待发送的邮件放入队列,斲丧者负责从队列中获取邮件并发送。  
      
  • 优点


    •    
    • 实现简朴,易于明白和部署。
             
      • 适用于单一斲丧者场景。   
         
      
  • 缺点


    •    
    • 没有消息持久化,一旦 RabbitMQ 服务器重启,队列中的消息将会丢失。  

1.1. 目录结构

simple
-mq
–-rabbitmq.go //这个是RabbitMQ的封装
-SimplePublish
–-mainSimplePublish.go //生产者发送消息
-SimpleRecieve
–-mainSimpleRecieve.go // 斲丧者接受消息

1.2. 代码实现

1.2.1. rabbitmq.go

  1. package mq
  2. import (
  3.     "fmt"
  4.     "log"
  5.     "github.com/streadway/amqp"
  6. )
  7. // MQURL amqp://user:password@host:port/vhost
  8. // amqp://是固定参数,这个信息是固定不变的。后面两个是用户名密码ip地址端口号Virtual Host
  9. // 如果vhost是“/”就输入/%2F,/%2F代表斜杠
  10. const MQURL = "amqp://test:123123@127.0.0.1:5672/%2F"
  11. // RabbitMQ rabbitMQ结构体
  12. type RabbitMQ struct {
  13.     conn    *amqp.Connection
  14.     channel *amqp.Channel
  15.     //队列名称
  16.     QueueName string
  17.     //交换机名称
  18.     Exchange string
  19.     //bind Key 名称
  20.     Key string
  21.     //连接信息
  22.     Mqurl string
  23. }
  24. // NewRabbitMQ 创建结构体实例
  25. func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
  26.     return &RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, Mqurl: MQURL}
  27. }
  28. // Destory 断开channel 和 connection
  29. func (r *RabbitMQ) Destory() {
  30.     err := r.channel.Close()
  31.     if err != nil {
  32.         return
  33.     }
  34.     err = r.conn.Close()
  35.     if err != nil {
  36.         return
  37.     }
  38. }
  39. // 错误处理函数
  40. func (r *RabbitMQ) failOnErr(err error, message string) {
  41.     if err != nil {
  42.         log.Fatalf("%s:%s", message, err)
  43.     }
  44. }
  45. // NewRabbitMQSimple 创建简单模式下RabbitMQ实例
  46. func NewRabbitMQSimple(queueName string) *RabbitMQ {
  47.     //创建RabbitMQ实例
  48.     rabbitmq := NewRabbitMQ(queueName, "", "")
  49.     var err error
  50.     //获取connection
  51.     rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
  52.     rabbitmq.failOnErr(err, "failed to connect rabb"+
  53.                        "itmq!")
  54.     //获取channel
  55.     rabbitmq.channel, err = rabbitmq.conn.Channel()
  56.     rabbitmq.failOnErr(err, "failed to open a channel")
  57.     return rabbitmq
  58. }
  59. // PublishSimple simple模式队列生产
  60. func (r *RabbitMQ) PublishSimple(message string) {
  61.     //1.申请队列,如果队列不存在会自动创建,存在则跳过创建
  62.     _, err := r.channel.QueueDeclare(
  63.         // queue:队列名称
  64.         r.QueueName,
  65.         //durable:是否持久化,当mq重启之后,还在
  66.         true,
  67.         //exclusive:是否独占即只能有一个消费者监听这个队列
  68.         false,
  69.         //autoDelete:是否自动删除。当没有Consumer时,自动删除掉
  70.         false,
  71.         //noWait:是否阻塞处理。true:不阻塞,false:阻塞
  72.         false,
  73.         //arguments:其他属性
  74.         nil,
  75.     )
  76.     if err != nil {
  77.         fmt.Println(err)
  78.     }
  79.     //调用channel 发送消息到队列中
  80.     r.channel.Publish(
  81.         r.Exchange,
  82.         r.QueueName,
  83.         //如果为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
  84.         false,
  85.         //如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
  86.         false,
  87.         amqp.Publishing{
  88.             ContentType: "text/plain",
  89.             Body:        []byte(message),
  90.         })
  91. }
  92. // ConsumeSimple simple模式下消费者
  93. func (r *RabbitMQ) ConsumeSimple() {
  94.     //1.申请队列,如果队列不存在会自动创建,存在则跳过创建
  95.     q, err := r.channel.QueueDeclare(
  96.         //队列名称
  97.         r.QueueName,
  98.         //是否持久化
  99.         true,
  100.         //是否自动删除
  101.         false,
  102.         //是否具有排他性
  103.         false,
  104.         //是否阻塞处理
  105.         false,
  106.         //额外的属性
  107.         nil,
  108.     )
  109.     if err != nil {
  110.         fmt.Println(err)
  111.     }
  112.     //接收消息
  113.     msgs, err := r.channel.Consume(
  114.         q.Name, // queue
  115.         //用来区分多个消费者
  116.         "", // consumer
  117.         //是否自动应答
  118.         true, // auto-ack
  119.         //是否独有
  120.         false, // exclusive
  121.         //设置为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
  122.         false, // no-local
  123.         //列是否阻塞
  124.         false, // no-wait
  125.         nil,   // args
  126.     )
  127.     if err != nil {
  128.         fmt.Println(err)
  129.     }
  130.     forever := make(chan bool)
  131.     //启用协程处理消息
  132.     go func() {
  133.         for d := range msgs {
  134.             //消息逻辑处理,可以自行设计逻辑
  135.             log.Printf("Received a message: %s", d.Body)
  136.         }
  137.     }()
  138.     log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
  139.     <-forever
  140. }
复制代码
1.2.2. SimplePublish.go

  1. package main
  2. import (
  3.     "fmt"
  4.     "qiutian.com/study/02.rabbitmq/Simple/mq"
  5. )
  6. func main() {
  7.     rabbitmq := mq.NewRabbitMQSimple("hello.queue1")
  8.     rabbitmq.PublishSimple("Hello akita!")
  9.     fmt.Println("发送成功!")
  10. }
复制代码
1.2.3. mainSimpleRecieve.go

  1. <
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

诗林

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表