花瓣小跑 发表于 2024-5-24 15:26:29

Rabbitmq 搭建使用案例 [附源码]

Rabbitmq 搭建使用案例
@
目录

[*]RabbitMQ搭建

[*]docker

[*]代码

[*]golang

[*]生产者
[*]消耗者


[*]可视化

[*]消耗进度


RabbitMQ搭建

docker

docker run -d --hostname rabbitmq --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_MANAGEMENT_PLUGIN=rabbitmq_management_agent -p 15672:15672 -p 5672:5672 rabbitmq:management代码

golang

生产者

package main

import (
        "flag"
        "fmt"
        amqp "github.com/rabbitmq/amqp091-go"
        "log"
        "strconv"
        "time"
)

func main() {
        var url = flag.String("url", "amqp://admin:admin@localhost:5672/my_vhost", "RabbitMQ server URL")
        var exchange = flag.String("exchange", "logs", "Exchange name")
        var key = flag.String("key", "log", "Routing key")

        flag.Parse()

        // 连接到RabbitMQ服务器
        conn, err := amqp.Dial(*url)
        if err != nil {
                log.Fatalf("Failed to connect to RabbitMQ: %v", err)
        }
        defer conn.Close()

        // 创建一个通道
        ch, err := conn.Channel()
        if err != nil {
                log.Fatalf("Failed to open a channel: %v", err)
        }
        defer ch.Close()

        // 声明一个交换机
        err = ch.ExchangeDeclare(
                *exchange, // name: 交换机名称
                "fanout",// kind: 交换机类型
                true,      // durable: 是否持久化
                false,   // autoDelete: 没有队列绑定时是否自动删除
                false,   // internal: 是否是内部交换机
                false,   // noWait: 是否需要等待服务器响应
                nil,       // args: 其他参数
        )
        if err != nil {
                log.Fatalf("Failed to declare an exchange: %v", err)
        }

        // 发送消息
        body := "Hello World!" + fmt.Sprintf(time.Now().String())

        for i := 0; i < 20; i++ {
                body = strconv.Itoa(i) + body
                err = ch.Publish(
                        *exchange, // 交换机名称
                        *key,      // 路由键
                        false,   // 强制发布
                        false,   // 立即发布
                        amqp.Publishing{
                                ContentType:"text/plain",
                                DeliveryMode: amqp.Persistent,
                                Body:         []byte(body),
                                Expiration:   "10000", // 3000 3秒
                        })
        }

        if err != nil {
                log.Fatalf("Failed to publish a message: %v", err)
        }

        fmt.Printf(" Sent %s", body)
}消耗者

package main

import (
        "flag"
        "fmt"
        "log"

        amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
        var url = flag.String("url", "amqp://admin:admin@localhost:5672/my_vhost", "RabbitMQ server URL")
        var exchange = flag.String("exchange", "logs", "Exchange name")
        var key = flag.String("key", "log", "Routing key")

        flag.Parse()

        // 连接到RabbitMQ服务器
        conn, err := amqp.Dial(*url)
        if err != nil {
                log.Fatalf("Failed to connect to RabbitMQ: %v", err)
        }
        defer conn.Close()

        // 创建一个通道
        ch, err := conn.Channel()
        if err != nil {
                log.Fatalf("Failed to open a channel: %v", err)
        }
        defer ch.Close()

        // 声明一个交换机
        err = ch.ExchangeDeclare(
                *exchange, // name: 交换机名称
                "fanout",// kind: 交换机类型
                true,      // durable: 是否持久化
                false,   // autoDelete: 没有队列绑定时是否自动删除
                false,   // internal: 是否是内部交换机
                false,   // noWait: 是否需要等待服务器响应
                nil,       // args: 其他参数
        )
        if err != nil {
                log.Fatalf("Failed to declare an exchange: %v", err)
        }

        // 声明一个队列
        q, err := ch.QueueDeclare(
                "queue01", // 随机生成队列名称
                true,      // 持久化
                false,   // 删除
                false,   // 独占
                false,   // 不等消息
                nil,       // 其他参数
        )
        if err != nil {
                log.Fatalf("Failed to declare a queue: %v", err)
        }

        // 绑定队列到交换机
        err = ch.QueueBind(
                q.Name,    // 队列名称
                *key,      // 路由键
                *exchange, // 交换机名称
                false,   // 现在绑定
                nil,       // 其他参数
        )
        if err != nil {
                log.Fatalf("Failed to bind a queue: %v", err)
        }

        // 接收消息
        msgs, err := ch.Consume(
                q.Name,       // 队列名称
                "consumer01", // 消费者标签
                false,      // 自动ack
                false,      // 不独占
                false,      // 不等消息
                false,      // 不从服务器获取消息
                nil,          // 其他参数
        )
        if err != nil {
                log.Fatalf("Failed to register a consumer: %v", err)
        }

        fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")
        for d := range msgs {
                // 输出接收到的消息
                fmt.Printf(" Received %s\n", d.Body)
                err = ch.Ack(d.DeliveryTag, true)
                if err != nil {
                        log.Fatalf("Failed to ack message: %v", err)
                }
        }
}可视化

看板
http://localhost:15672/
账户密码
admin
adminhttps://img-blog.csdnimg.cn/direct/6b28d9a6fb694f18b3d39a36aaf26672.png
消耗进度

http://localhost:15672/#/queues
https://img-blog.csdnimg.cn/direct/b9f98d491d284ba392c6f3ba1d3bacda.png

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Rabbitmq 搭建使用案例 [附源码]