马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
消息队列(MQ)对于开发者来说是一个经常听到的词汇,但在现实开发中,大多数人并不会真正用到它。网上已经有很多关于 MQ 概述和原理的具体讲解,官网文档和技能博客也都介绍得很深入,因此,我在这里就不再赘述。
我一直认为,学习一项技能不仅要知道它是什么,更告急的是知道怎么用,以及在哪些场景下应该用。所以这篇文章重要就是站在一个新手的角度进行形貌以及实现MQ的现实运用。
使用MQ的常见情景
- 系统解耦:好比电商系统,订单系统 → 库存系统 → 物流系统 订单系统发送“新订单”消息到 MQ,库存系统和物流系统各自订阅处理。纵然库存系统或物流系统短暂不可用,消息仍然可以暂存,系统团体不会受影响。这一方面说实话不是架构师也没须要太过关注,究竟系统的底层普通开发也没这个资格去搭建。只是用于了解,不要因为这段话拦阻学习的脚步。
- 流量削峰,降低并发:这个比较好理解,也是最能遇到的环境。用户哀求先进入 MQ 队列,由背景的消耗端按照数据库的最大承载能力逐步处理哀求。确保数据库不会被瞬间压垮,进步系统稳定性。还是电商系统常用些。
- 异步任务处理:邮件、短信、推送关照,日志处理等。
理论上MQ能做的不止这些,抛砖引玉,一起深入学习吧。
对MQ进行拆分理解
MQ里常说生产者,消耗者等。我会通过简单的例子来形貌:
- 生产者:一个游戏,我是GM,我要发送公告,玩家分为普通玩家和VIP玩家等。在这里,发布公告的人就是消息的生产者。应该很好理解嗷?
- 交换器:如上述,有普通玩家和VIP玩家等,我的公告在普通玩家面前必然是拽的很啊,但是VIP玩家面前还是要舔下的……那么我会发布一条给普通玩家的消息,和一条给VIP玩家的消息。交换器的作用在我看来就是消息的承载体,类似一条运输船,负责把消息运输给玩家们。产生消息的地方很多,但是交换器不用关心是谁发布了消息,他只承载你的消息。
- 队列:如上述,有了运输船。那么队列有点像是码头了。普通玩家进普通码头,VIP玩家进黄金码头。各自码头停泊各自的船。总不会在普通码头取出黄金码头的货哦?
补充:交换器是有类型的:Direct(直连交换器)Fanout(扇形交换器)Topic(主题交换器)Headers(头交换器)
概念不多说了。比较常用的是Direct,Fanout
Direct:通过路由键进行匹配,运输船是一艘,但是分为普通区和VIP区,玩家凭借船票(路由键)进行取货(取消息)
Fanout:只要是是绑定了某个交换器的队列都能进行取货。玩家进普通码头就拿普通货,进黄金码头拿黄金货。固然这是举例子,玩家的队列还是要看你如何分配的。
MQ代码演示
最新代码是通过 事件总线 来跨方法通报信息和触发动作。通过发布和订阅事件,模块之间可以或许解耦通信,使得事件的发布和处理不再依赖于直接调用方法的方式,而是通过事件总线进行跨模块、跨方法的异步通报。这种方式进步了系统的灵活性和扩展性,同时保持了模块之间的松耦合。
长代码警告,有兴趣可以fork仓库进行现实操练 VerEasy.Core。
须要的知识点大致云云,通过代码+解释的形式来演示更好理解。
我这里是NETCore项目,所以还是接口的形式方便依赖注入。
接口部分代码
- public interface IRabbitMQPersistentConnection
- {
- /// <summary>
- /// 是否已经连接:判断MQ是否是连接状态
- /// </summary>
- bool IsConnected { get; }
- /// <summary>
- /// 尝试连接:断连重连方法
- /// </summary>
- /// <returns></returns>
- Task<bool> TryConnectAsync();
- /// <summary>
- /// 唯一通道:发布通道可以随时关闭,消费通道需要保持打开状态,否则无法进行消费。
- /// </summary>
- IChannel Channel { get; }
- /// <summary>
- /// 唯一连接:同理,一个连接可以有N个通道,无需建立过多连接。
- /// </summary>
- IConnection Connection { get; }
- /// <summary>
- /// 释放
- /// </summary>
- /// <returns></returns>
- Task DisposeAsync();
- /// <summary>
- /// 发布:发布消息
- /// </summary>
- /// <param name="msg"></param>
- /// <param name="exChangeName"></param>
- /// <param name="routeKey"></param>
- /// <param name="type"></param>
- /// <returns></returns>
- Task PublishAsync(string msg, string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout);
- /// <summary>
- /// 订阅:订阅队列。
- /// </summary>
- /// <returns></returns>
- Task SubscribeAsync(string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout);
- }
复制代码 接口实现部分代码
- public class RabbitMQPersistentConnection : IRabbitMQPersistentConnection
- {
- //构造函数注入,获取MQ的地址账号密码端口,如果不传就用我默认配置的。
- public RabbitMQPersistentConnection(IConnectionFactory? connectionFactory = null, int retryCount = 5)
- {
- _connectionFactory = connectionFactory ?? new ConnectionFactory
- {
- HostName = Appsettings.AppStr("RabbitMQ:Connection:HostName"),
- UserName = Appsettings.AppStr("RabbitMQ:Connection:UserName"),
- Password = Appsettings.AppStr("RabbitMQ:Connection:PassWord"),
- Port = Appsettings.AppStr("RabbitMQ:Connection:Port").ObjToInt()
- };
- //使用Policy进行重连,这个是重连次数=5
- _retryCount = retryCount;
- }
- //私有变量,获取连接成功时创建的Mq通道。
- private IChannel _channel = default!;
- public IChannel Channel
- {
- get
- {
- return _channel;
- }
- }
- /// <summary>
- /// RabbitMQ 连接工厂
- /// </summary>
- private readonly IConnectionFactory _connectionFactory;
- /// <summary>
- /// 私有变量 RabbitMQ 连接上下文
- /// </summary>
- private IConnection _connection = default!;
- /// <summary>
- /// 重连次数
- /// </summary>
- private readonly int _retryCount;
- /// <summary>
- /// 标志是否已释放
- /// </summary>
- private bool _disposed;
- /// <summary>
- /// 是否有效连接
- /// </summary>
- public bool IsConnected
- {
- get
- {
- return _connection != null && _connection.IsOpen && !_disposed;
- }
- }
- public IConnection Connection
- {
- get
- {
- return _connection;
- }
- }
- /// <summary>
- /// 手动释放
- /// </summary>
- /// <returns></returns>
- public async Task DisposeAsync()
- {
- if (_disposed) return;
- _disposed = true;
- try
- {
- await _connection.DisposeAsync();
- }
- catch (IOException ex)
- {
- Console.WriteLine(ex.Message);
- }
- }
- /// <summary>
- /// 重连机制
- /// </summary>
- /// <returns></returns>
- public async Task<bool> TryConnectAsync()
- {
- var policy = Policy.Handle<SocketException>()//捕获连接异常
- .Or<BrokerUnreachableException>()//无法连接异常
- .WaitAndRetryAsync(_retryCount, x =>
- TimeSpan.FromSeconds(Math.Pow(2, x)), (ex, time) =>
- {
- //日志
- });
- try
- {
- await policy.ExecuteAsync(async () =>
- {
- //重建连接【赋值给私有化变量,通过get同步给接口里的Connection和Channel】
- _connection = await _connectionFactory.CreateConnectionAsync();
- _channel = await _connection.CreateChannelAsync();
- });
- //如果连接成功
- if (IsConnected)
- {
- // 连接成功后,注册连接关闭、异常、阻塞的事件处理程序
- _connection.ConnectionShutdownAsync += OnConnectionShutdownAsync;
- _connection.CallbackExceptionAsync += OnCallbackExceptionAsync;
- _connection.ConnectionBlockedAsync += OnConnectionBlockedAsync;
- return true;
- }
- else
- {
- return false;
- }
- }
- catch (Exception ex)
- {
- Console.WriteLine($"重连失败,最终抛出异常: {ex.Message}");
- return false;
- }
- }
- private async Task OnConnectionShutdownAsync(object sender, ShutdownEventArgs e)
- {
- if (_disposed) return;
- Console.WriteLine("RabbitMQ连接关闭,正在尝试重连...");
- await TryConnectAsync();
- }
- private async Task OnCallbackExceptionAsync(object sender, CallbackExceptionEventArgs e)
- {
- if (_disposed) return;
- Console.WriteLine($"RabbitMQ连接出现异常,正在尝试重连... 异常信息: {e.Exception.Message}");
- await TryConnectAsync();
- }
- private async Task OnConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs e)
- {
- if (_disposed) return;
- Console.WriteLine("RabbitMQ连接被阻塞,正在尝试重连...");
- await TryConnectAsync();
- }
- //发布消息
- public async Task PublishAsync(string msg, string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout)
- {
- //判断是否连接状态,没有连接就重连
- if (!IsConnected)
- {
- await TryConnectAsync();
- }
- //创建通道,因为是发布消息,通道不用常打开,所以使用了USING
- using var channel = await _connection.CreateChannelAsync();
- //【ExchangeDeclareAsync】声明交换机,exchange:交换机名称,type:交换机类型
- await channel.ExchangeDeclareAsync(exchange: exChangeName, type: type);
-
- //msg就是消息,需要传递Byte[]
- var body = Encoding.UTF8.GetBytes(msg);
-
- //启动消息持久化,我的项目里使用MQ来进行公告的推送,使用的Fanout类型交换机,故此消息保持持久化。
- var properties = new BasicProperties()
- {
- Persistent = true,
- };
- //发布消息
- await channel.BasicPublishAsync(
- exchange: exChangeName,
- routingKey: routeKey,
- mandatory: false,
- basicProperties: properties,
- body: body);
- }
- //订阅消息
- public async Task SubscribeAsync(string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout)
- {
- if (!IsConnected)
- {
- await TryConnectAsync();
- }
-
- //【queue】队列
- string queueName = string.IsNullOrWhiteSpace(routeKey) ? exChangeName : routeKey;
- //【durable】持久化队列,MQ服务器不会删除它。
- QueueDeclareOk queueDeclareResult = await Channel.QueueDeclareAsync(
- queue: queueName,
- durable: true,
- exclusive: false,
- autoDelete: false);
- //根据queue,exchange,routingKey 对 交换机和队列进行绑定,如果是Fanout类型不需要routeKey。
- await Channel.QueueBindAsync(queue: queueName, exchange: exChangeName, routingKey: routeKey);
- //创建消费者
- var consumer = new AsyncEventingBasicConsumer(Channel);
- //消费者消费后执行方法
- consumer.ReceivedAsync += async (model, ea) =>
- {
- byte[] body = ea.Body.ToArray();
- var message = Encoding.UTF8.GetString(body);
- //确认消息已被消费,这样后续该消息就不会被该队列继续消费到了。
- await Channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
- };
- //启动消费者队列,将消费者和队列绑定
- await Channel.BasicConsumeAsync(queueName, autoAck: false, consumer: consumer);
- }
- }
复制代码
MQ服务注入
- if (Appsettings.AppStr("RabbitMQ:Enable").ObjToBool())
- {
- services.AddSingleton<IRabbitMQPersistentConnection>(x =>
- {
- var connectionFactory = new ConnectionFactory()
- {
- HostName = Appsettings.AppStr("RabbitMQ:Connection:HostName"),
- UserName = Appsettings.AppStr("RabbitMQ:Connection:UserName"),
- Password = Appsettings.AppStr("RabbitMQ:Connection:PassWord"),
- Port = Appsettings.AppStr("RabbitMQ:Connection:Port").ObjToInt()
- };
- var mq = new RabbitMQPersistentConnection(connectionFactory);
- return mq;
- });
- }
复制代码
我在注入各种服务时,添加了一些日志进行输出,效果如下:
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |