Rabbitmq 搭建使用案例 [附源码]

打印 上一主题 下一主题

主题 512|帖子 512|积分 1536

Rabbitmq 搭建使用案例
@
目录

RabbitMQ搭建

docker
  1. docker run -d --hostname rabbitmq --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_MANAGEMENT_PLUGIN=rabbitmq_management_agent -p 15672:15672 -p 5672:5672 rabbitmq:management
复制代码
代码

golang

生产者
  1. package main
  2. import (
  3.         "flag"
  4.         "fmt"
  5.         amqp "github.com/rabbitmq/amqp091-go"
  6.         "log"
  7.         "strconv"
  8.         "time"
  9. )
  10. func main() {
  11.         var url = flag.String("url", "amqp://admin:admin@localhost:5672/my_vhost", "RabbitMQ server URL")
  12.         var exchange = flag.String("exchange", "logs", "Exchange name")
  13.         var key = flag.String("key", "log", "Routing key")
  14.         flag.Parse()
  15.         // 连接到RabbitMQ服务器
  16.         conn, err := amqp.Dial(*url)
  17.         if err != nil {
  18.                 log.Fatalf("Failed to connect to RabbitMQ: %v", err)
  19.         }
  20.         defer conn.Close()
  21.         // 创建一个通道
  22.         ch, err := conn.Channel()
  23.         if err != nil {
  24.                 log.Fatalf("Failed to open a channel: %v", err)
  25.         }
  26.         defer ch.Close()
  27.         // 声明一个交换机
  28.         err = ch.ExchangeDeclare(
  29.                 *exchange, // name: 交换机名称
  30.                 "fanout",  // kind: 交换机类型
  31.                 true,      // durable: 是否持久化
  32.                 false,     // autoDelete: 没有队列绑定时是否自动删除
  33.                 false,     // internal: 是否是内部交换机
  34.                 false,     // noWait: 是否需要等待服务器响应
  35.                 nil,       // args: 其他参数
  36.         )
  37.         if err != nil {
  38.                 log.Fatalf("Failed to declare an exchange: %v", err)
  39.         }
  40.         // 发送消息
  41.         body := "Hello World!" + fmt.Sprintf(time.Now().String())
  42.         for i := 0; i < 20; i++ {
  43.                 body = strconv.Itoa(i) + body
  44.                 err = ch.Publish(
  45.                         *exchange, // 交换机名称
  46.                         *key,      // 路由键
  47.                         false,     // 强制发布
  48.                         false,     // 立即发布
  49.                         amqp.Publishing{
  50.                                 ContentType:  "text/plain",
  51.                                 DeliveryMode: amqp.Persistent,
  52.                                 Body:         []byte(body),
  53.                                 Expiration:   "10000", // 3000 3秒
  54.                         })
  55.         }
  56.         if err != nil {
  57.                 log.Fatalf("Failed to publish a message: %v", err)
  58.         }
  59.         fmt.Printf(" [x] Sent %s", body)
  60. }
复制代码
消耗者
  1. package main
  2. import (
  3.         "flag"
  4.         "fmt"
  5.         "log"
  6.         amqp "github.com/rabbitmq/amqp091-go"
  7. )
  8. func main() {
  9.         var url = flag.String("url", "amqp://admin:admin@localhost:5672/my_vhost", "RabbitMQ server URL")
  10.         var exchange = flag.String("exchange", "logs", "Exchange name")
  11.         var key = flag.String("key", "log", "Routing key")
  12.         flag.Parse()
  13.         // 连接到RabbitMQ服务器
  14.         conn, err := amqp.Dial(*url)
  15.         if err != nil {
  16.                 log.Fatalf("Failed to connect to RabbitMQ: %v", err)
  17.         }
  18.         defer conn.Close()
  19.         // 创建一个通道
  20.         ch, err := conn.Channel()
  21.         if err != nil {
  22.                 log.Fatalf("Failed to open a channel: %v", err)
  23.         }
  24.         defer ch.Close()
  25.         // 声明一个交换机
  26.         err = ch.ExchangeDeclare(
  27.                 *exchange, // name: 交换机名称
  28.                 "fanout",  // kind: 交换机类型
  29.                 true,      // durable: 是否持久化
  30.                 false,     // autoDelete: 没有队列绑定时是否自动删除
  31.                 false,     // internal: 是否是内部交换机
  32.                 false,     // noWait: 是否需要等待服务器响应
  33.                 nil,       // args: 其他参数
  34.         )
  35.         if err != nil {
  36.                 log.Fatalf("Failed to declare an exchange: %v", err)
  37.         }
  38.         // 声明一个队列
  39.         q, err := ch.QueueDeclare(
  40.                 "queue01", // 随机生成队列名称
  41.                 true,      // 持久化
  42.                 false,     // 删除
  43.                 false,     // 独占
  44.                 false,     // 不等消息
  45.                 nil,       // 其他参数
  46.         )
  47.         if err != nil {
  48.                 log.Fatalf("Failed to declare a queue: %v", err)
  49.         }
  50.         // 绑定队列到交换机
  51.         err = ch.QueueBind(
  52.                 q.Name,    // 队列名称
  53.                 *key,      // 路由键
  54.                 *exchange, // 交换机名称
  55.                 false,     // 现在绑定
  56.                 nil,       // 其他参数
  57.         )
  58.         if err != nil {
  59.                 log.Fatalf("Failed to bind a queue: %v", err)
  60.         }
  61.         // 接收消息
  62.         msgs, err := ch.Consume(
  63.                 q.Name,       // 队列名称
  64.                 "consumer01", // 消费者标签
  65.                 false,        // 自动ack
  66.                 false,        // 不独占
  67.                 false,        // 不等消息
  68.                 false,        // 不从服务器获取消息
  69.                 nil,          // 其他参数
  70.         )
  71.         if err != nil {
  72.                 log.Fatalf("Failed to register a consumer: %v", err)
  73.         }
  74.         fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")
  75.         for d := range msgs {
  76.                 // 输出接收到的消息
  77.                 fmt.Printf(" [x] Received %s\n", d.Body)
  78.                 err = ch.Ack(d.DeliveryTag, true)
  79.                 if err != nil {
  80.                         log.Fatalf("Failed to ack message: %v", err)
  81.                 }
  82.         }
  83. }
复制代码
可视化

看板
http://localhost:15672/
账户密码
  1. admin
  2. admin
复制代码

消耗进度

http://localhost:15672/#/queues


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

花瓣小跑

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表