Rabbitmq 搭建使用案例
@
目录
RabbitMQ搭建
docker
- docker run -d --hostname rabbitmq --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_MANAGEMENT_PLUGIN=rabbitmq_management_agent -p 15672:15672 -p 5672:5672 rabbitmq:management
复制代码 代码
golang
生产者
- package main
- import (
- "flag"
- "fmt"
- amqp "github.com/rabbitmq/amqp091-go"
- "log"
- "strconv"
- "time"
- )
- func main() {
- var url = flag.String("url", "amqp://admin:admin@localhost:5672/my_vhost", "RabbitMQ server URL")
- var exchange = flag.String("exchange", "logs", "Exchange name")
- var key = flag.String("key", "log", "Routing key")
- flag.Parse()
- // 连接到RabbitMQ服务器
- conn, err := amqp.Dial(*url)
- if err != nil {
- log.Fatalf("Failed to connect to RabbitMQ: %v", err)
- }
- defer conn.Close()
- // 创建一个通道
- ch, err := conn.Channel()
- if err != nil {
- log.Fatalf("Failed to open a channel: %v", err)
- }
- defer ch.Close()
- // 声明一个交换机
- err = ch.ExchangeDeclare(
- *exchange, // name: 交换机名称
- "fanout", // kind: 交换机类型
- true, // durable: 是否持久化
- false, // autoDelete: 没有队列绑定时是否自动删除
- false, // internal: 是否是内部交换机
- false, // noWait: 是否需要等待服务器响应
- nil, // args: 其他参数
- )
- if err != nil {
- log.Fatalf("Failed to declare an exchange: %v", err)
- }
- // 发送消息
- body := "Hello World!" + fmt.Sprintf(time.Now().String())
- for i := 0; i < 20; i++ {
- body = strconv.Itoa(i) + body
- err = ch.Publish(
- *exchange, // 交换机名称
- *key, // 路由键
- false, // 强制发布
- false, // 立即发布
- amqp.Publishing{
- ContentType: "text/plain",
- DeliveryMode: amqp.Persistent,
- Body: []byte(body),
- Expiration: "10000", // 3000 3秒
- })
- }
- if err != nil {
- log.Fatalf("Failed to publish a message: %v", err)
- }
- fmt.Printf(" [x] Sent %s", body)
- }
复制代码 消耗者
- package main
- import (
- "flag"
- "fmt"
- "log"
- amqp "github.com/rabbitmq/amqp091-go"
- )
- func main() {
- var url = flag.String("url", "amqp://admin:admin@localhost:5672/my_vhost", "RabbitMQ server URL")
- var exchange = flag.String("exchange", "logs", "Exchange name")
- var key = flag.String("key", "log", "Routing key")
- flag.Parse()
- // 连接到RabbitMQ服务器
- conn, err := amqp.Dial(*url)
- if err != nil {
- log.Fatalf("Failed to connect to RabbitMQ: %v", err)
- }
- defer conn.Close()
- // 创建一个通道
- ch, err := conn.Channel()
- if err != nil {
- log.Fatalf("Failed to open a channel: %v", err)
- }
- defer ch.Close()
- // 声明一个交换机
- err = ch.ExchangeDeclare(
- *exchange, // name: 交换机名称
- "fanout", // kind: 交换机类型
- true, // durable: 是否持久化
- false, // autoDelete: 没有队列绑定时是否自动删除
- false, // internal: 是否是内部交换机
- false, // noWait: 是否需要等待服务器响应
- nil, // args: 其他参数
- )
- if err != nil {
- log.Fatalf("Failed to declare an exchange: %v", err)
- }
- // 声明一个队列
- q, err := ch.QueueDeclare(
- "queue01", // 随机生成队列名称
- true, // 持久化
- false, // 删除
- false, // 独占
- false, // 不等消息
- nil, // 其他参数
- )
- if err != nil {
- log.Fatalf("Failed to declare a queue: %v", err)
- }
- // 绑定队列到交换机
- err = ch.QueueBind(
- q.Name, // 队列名称
- *key, // 路由键
- *exchange, // 交换机名称
- false, // 现在绑定
- nil, // 其他参数
- )
- if err != nil {
- log.Fatalf("Failed to bind a queue: %v", err)
- }
- // 接收消息
- msgs, err := ch.Consume(
- q.Name, // 队列名称
- "consumer01", // 消费者标签
- false, // 自动ack
- false, // 不独占
- false, // 不等消息
- false, // 不从服务器获取消息
- nil, // 其他参数
- )
- if err != nil {
- log.Fatalf("Failed to register a consumer: %v", err)
- }
- fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")
- for d := range msgs {
- // 输出接收到的消息
- fmt.Printf(" [x] Received %s\n", d.Body)
- err = ch.Ack(d.DeliveryTag, true)
- if err != nil {
- log.Fatalf("Failed to ack message: %v", err)
- }
- }
- }
复制代码 可视化
看板
http://localhost:15672/
账户密码
消耗进度
http://localhost:15672/#/queues
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |