ToB企服应用市场:ToB评测及商务社交产业平台
标题:
Go操作RabbitMQ_go调用rabbitmq
[打印本页]
作者:
诗林
时间:
2025-2-19 12:57
标题:
Go操作RabbitMQ_go调用rabbitmq
RabbitMQ 常用端口以及作用
端口 功能 5672 AMQP(Advanced Message Queuing Protocol)协议的默认端口,用于客户端与RabbitMQ服务器之间的通信。 15672 RabbitMQ的管理界面,默认利用HTTP协议,用于监控和管理RabbitMQ服务器。 4369 Erlang分布式节点通信端口,用于RabbitMQ节点之间的通信。 25672 Erlang分布式节点通信端口,用于集群中的内部通信。 5671 安全的AMQP端口,利用TLS/SSL进行加密通信。
架构图
publisher:生产者,也就是发送消息的一方
consumer:斲丧者,也就是斲丧消息的一方
queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待斲丧者处理
exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
virtual host:假造主机,起到数据隔离的作用。每个假造主机相互独立,有各自的exchange、queue
工作模式
1. 简朴模式(Simple)
特点
:
生产者直接将消息发送到队列中,斲丧者从队列中获取消息并进行处理。
消息一旦被斲丧,就会从队列中移除。
真实应用场景
:
电子邮件关照系统:生产者将待发送的邮件放入队列,斲丧者负责从队列中获取邮件并发送。
优点
:
实现简朴,易于明白和部署。
适用于单一斲丧者场景。
缺点
:
没有消息持久化,一旦 RabbitMQ 服务器重启,队列中的消息将会丢失。
1.1. 目录结构
simple
-mq
–-rabbitmq.go //这个是RabbitMQ的封装
-SimplePublish
–-mainSimplePublish.go //生产者发送消息
-SimpleRecieve
–-mainSimpleRecieve.go // 斲丧者接受消息
1.2. 代码实现
1.2.1. rabbitmq.go
package mq
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
// MQURL amqp://user:password@host:port/vhost
// amqp://是固定参数,这个信息是固定不变的。后面两个是用户名密码ip地址端口号Virtual Host
// 如果vhost是“/”就输入/%2F,/%2F代表斜杠
const MQURL = "amqp://test:123123@127.0.0.1:5672/%2F"
// RabbitMQ rabbitMQ结构体
type RabbitMQ struct {
conn *amqp.Connection
channel *amqp.Channel
//队列名称
QueueName string
//交换机名称
Exchange string
//bind Key 名称
Key string
//连接信息
Mqurl string
}
// NewRabbitMQ 创建结构体实例
func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
return &RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, Mqurl: MQURL}
}
// Destory 断开channel 和 connection
func (r *RabbitMQ) Destory() {
err := r.channel.Close()
if err != nil {
return
}
err = r.conn.Close()
if err != nil {
return
}
}
// 错误处理函数
func (r *RabbitMQ) failOnErr(err error, message string) {
if err != nil {
log.Fatalf("%s:%s", message, err)
}
}
// NewRabbitMQSimple 创建简单模式下RabbitMQ实例
func NewRabbitMQSimple(queueName string) *RabbitMQ {
//创建RabbitMQ实例
rabbitmq := NewRabbitMQ(queueName, "", "")
var err error
//获取connection
rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
rabbitmq.failOnErr(err, "failed to connect rabb"+
"itmq!")
//获取channel
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "failed to open a channel")
return rabbitmq
}
// PublishSimple simple模式队列生产
func (r *RabbitMQ) PublishSimple(message string) {
//1.申请队列,如果队列不存在会自动创建,存在则跳过创建
_, err := r.channel.QueueDeclare(
// queue:队列名称
r.QueueName,
//durable:是否持久化,当mq重启之后,还在
true,
//exclusive:是否独占即只能有一个消费者监听这个队列
false,
//autoDelete:是否自动删除。当没有Consumer时,自动删除掉
false,
//noWait:是否阻塞处理。true:不阻塞,false:阻塞
false,
//arguments:其他属性
nil,
)
if err != nil {
fmt.Println(err)
}
//调用channel 发送消息到队列中
r.channel.Publish(
r.Exchange,
r.QueueName,
//如果为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
false,
//如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
// ConsumeSimple simple模式下消费者
func (r *RabbitMQ) ConsumeSimple() {
//1.申请队列,如果队列不存在会自动创建,存在则跳过创建
q, err := r.channel.QueueDeclare(
//队列名称
r.QueueName,
//是否持久化
true,
//是否自动删除
false,
//是否具有排他性
false,
//是否阻塞处理
false,
//额外的属性
nil,
)
if err != nil {
fmt.Println(err)
}
//接收消息
msgs, err := r.channel.Consume(
q.Name, // queue
//用来区分多个消费者
"", // consumer
//是否自动应答
true, // auto-ack
//是否独有
false, // exclusive
//设置为true,表示 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者
false, // no-local
//列是否阻塞
false, // no-wait
nil, // args
)
if err != nil {
fmt.Println(err)
}
forever := make(chan bool)
//启用协程处理消息
go func() {
for d := range msgs {
//消息逻辑处理,可以自行设计逻辑
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
复制代码
1.2.2. SimplePublish.go
package main
import (
"fmt"
"qiutian.com/study/02.rabbitmq/Simple/mq"
)
func main() {
rabbitmq := mq.NewRabbitMQSimple("hello.queue1")
rabbitmq.PublishSimple("Hello akita!")
fmt.Println("发送成功!")
}
复制代码
1.2.3. mainSimpleRecieve.go
<
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4