VEC系列-RabbitMQ 入门笔记

打印 上一主题 下一主题

主题 966|帖子 966|积分 2898

消息队列(MQ)对于开发者来说是一个经常听到的词汇,但在实际开发中,大多数人并不会真正用到它。网上已经有许多关于 MQ 概述和原理的详细讲解,官网文档和技术博客也都先容得很深入,因此,我在这里就不再赘述。我一直以为,学习一项技术不仅要知道它是什么,更紧张的是知道怎么用,以及在哪些场景下应该用。所以这篇文章重要就是站在一个新手的角度进行形貌以及实现MQ的实际运用。
利用MQ的常见景象


  • 体系解耦:好比电商体系,订单体系 → 库存体系 → 物流体系 订单体系发送“新订单”消息到 MQ,库存体系和物流体系各自订阅处置惩罚。即使库存体系或物流体系短暂不可用,消息仍旧可以暂存,体系团体不会受影响。这一方面说真话不是架构师也没必要太过关注,究竟体系的底层普通开发也没这个资格去搭建。只是用于了解,不要因为这段话阻拦学习的脚步。
  • 流量削峰,降低并发:这个比力好理解,也是最能碰到的情况。用户请求先进入 MQ 队列,由后台的消费端按照数据库的最大承载能力渐渐处置惩罚请求。确保数据库不会被刹时压垮,提高体系稳定性。还是电商体系常用些。
  • 异步使命处置惩罚:邮件、短信、推送通知,日志处置惩罚等。
理论上MQ能做的不止这些,抛砖引玉,一起深入学习吧。对MQ进行拆分理解

我以为与其去讲原理,画流程图,说些高端的词汇,倒不如说些土话,文章只能抛砖引玉,入门了,能跑起来了,那慢慢的就了解了。这里我只是用我自己理解来形貌,文风粗鄙,大佬莫怪。
MQ里常说生产者,消费者等。我会通过简单的例子来形貌:

  • 生产者:一个游戏,我是GM,我要发送公告,玩家分为普通玩家和VIP玩家等。在这里,发布公告的人就是消息的生产者。应该很好理解嗷?
  • 互换器:如上述,有普通玩家和VIP玩家等,我的公告在普通玩家眼前必然是拽的很啊,但是VIP玩家眼前还是要舔下的……那么我会发布一条给普通玩家的消息,和一条给VIP玩家的消息。互换器的作用在我看来就是消息的承载体,雷同一条运输船,负责把消息运输给玩家们。产生消息的地方许多,但是互换器不用关心是谁发布了消息,他只承载你的消息。
  • 队列:如上述,有了运输船。那么队列有点像是码头了。普通玩家进普通码头,VIP玩家进黄金码头。各自码头停泊各自的船。总不会在普通码头取出黄金码头的货哦?
补充:互换器是有范例的:Direct(直连互换器)Fanout(扇形互换器)Topic(主题互换器)Headers(头互换器)概念不多说了。比力常用的是Direct,FanoutDirect:通过路由键进行匹配,运输船是一艘,但是分为普通区和VIP区,玩家凭借船票(路由键)进行取货(取消息)Fanout:只要是是绑定了某个互换器的队列都能进行取货。玩家进普通码头就拿普通货,进黄金码头拿黄金货。当然这是举例子,玩家的队列还是要看你如何分配的。


  • 消费者:说了这么多,玩家就是消费者嗷。
MQ代码演示 

最新代码是通过 变乱总线 来跨方法传递信息和触发动作。通过发布和订阅变乱,模块之间能够解耦通信,使得变乱的发布和处置惩罚不再依赖于直接调用方法的方式,而是通过变乱总线进行跨模块、跨方法的异步传递。这种方式提高了体系的灵活性和扩展性,同时保持了模块之间的松耦合。长代码警告,有兴趣可以fork仓库进行实际训练 VerEasy.Core
必要的知识点大抵如此,通过代码+注释的情势来演示更好理解。
我这里是NETCore项目,所以还是接口的情势方便依赖注入。
接口部分代码
  1.     public interface IRabbitMQPersistentConnection
  2.     {
  3.         /// <summary>
  4.         /// 是否已经连接:判断MQ是否是连接状态
  5.         /// </summary>
  6.         bool IsConnected { get; }
  7.         /// <summary>
  8.         /// 尝试连接:断连重连方法
  9.         /// </summary>
  10.         /// <returns></returns>
  11.         Task<bool> TryConnectAsync();
  12.         /// <summary>
  13.         /// 唯一通道:发布通道可以随时关闭,消费通道需要保持打开状态,否则无法进行消费。
  14.         /// </summary>
  15.         IChannel Channel { get; }
  16.         /// <summary>
  17.         /// 唯一连接:同理,一个连接可以有N个通道,无需建立过多连接。
  18.         /// </summary>
  19.         IConnection Connection { get; }
  20.         /// <summary>
  21.         /// 释放
  22.         /// </summary>
  23.         /// <returns></returns>
  24.         Task DisposeAsync();
  25.         /// <summary>
  26.         /// 发布:发布消息
  27.         /// </summary>
  28.         /// <param name="msg"></param>
  29.         /// <param name="exChangeName"></param>
  30.         /// <param name="routeKey"></param>
  31.         /// <param name="type"></param>
  32.         /// <returns></returns>
  33.         Task PublishAsync(string msg, string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout);
  34.         /// <summary>
  35.         /// 订阅:订阅队列。
  36.         /// </summary>
  37.         /// <returns></returns>
  38.         Task SubscribeAsync(string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout);
  39.     }
复制代码
接口实现部分代码
  1.     public class RabbitMQPersistentConnection : IRabbitMQPersistentConnection
  2.     {
  3.         //构造函数注入,获取MQ的地址账号密码端口,如果不传就用我默认配置的。
  4.         public RabbitMQPersistentConnection(IConnectionFactory? connectionFactory = null, int retryCount = 5)
  5.         {
  6.             _connectionFactory = connectionFactory ?? new ConnectionFactory
  7.             {
  8.                 HostName = Appsettings.AppStr("RabbitMQ:Connection:HostName"),
  9.                 UserName = Appsettings.AppStr("RabbitMQ:Connection:UserName"),
  10.                 Password = Appsettings.AppStr("RabbitMQ:Connection:PassWord"),
  11.                 Port = Appsettings.AppStr("RabbitMQ:Connection:Port").ObjToInt()
  12.             };
  13.             //使用Policy进行重连,这个是重连次数=5
  14.             _retryCount = retryCount;
  15.         }
  16.         //私有变量,获取连接成功时创建的Mq通道。
  17.         private IChannel _channel = default!;
  18.         public IChannel Channel
  19.         {
  20.             get
  21.             {
  22.                 return _channel;
  23.             }
  24.         }
  25.         /// <summary>
  26.         /// RabbitMQ 连接工厂
  27.         /// </summary>
  28.         private readonly IConnectionFactory _connectionFactory;
  29.         /// <summary>
  30.         /// 私有变量 RabbitMQ 连接上下文
  31.         /// </summary>
  32.         private IConnection _connection = default!;
  33.         /// <summary>
  34.         /// 重连次数
  35.         /// </summary>
  36.         private readonly int _retryCount;
  37.         /// <summary>
  38.         /// 标志是否已释放
  39.         /// </summary>
  40.         private bool _disposed;
  41.         /// <summary>
  42.         /// 是否有效连接
  43.         /// </summary>
  44.         public bool IsConnected
  45.         {
  46.             get
  47.             {
  48.                 return _connection != null && _connection.IsOpen && !_disposed;
  49.             }
  50.         }
  51.         public IConnection Connection
  52.         {
  53.             get
  54.             {
  55.                 return _connection;
  56.             }
  57.         }
  58.         /// <summary>
  59.         /// 手动释放
  60.         /// </summary>
  61.         /// <returns></returns>
  62.         public async Task DisposeAsync()
  63.         {
  64.             if (_disposed) return;
  65.             _disposed = true;
  66.             try
  67.             {
  68.                 await _connection.DisposeAsync();
  69.             }
  70.             catch (IOException ex)
  71.             {
  72.                 Console.WriteLine(ex.Message);
  73.             }
  74.         }
  75.         /// <summary>
  76.         /// 重连机制
  77.         /// </summary>
  78.         /// <returns></returns>
  79.         public async Task<bool> TryConnectAsync()
  80.         {
  81.             var policy = Policy.Handle<SocketException>()//捕获连接异常
  82.                 .Or<BrokerUnreachableException>()//无法连接异常
  83.                 .WaitAndRetryAsync(_retryCount, x =>
  84.                 TimeSpan.FromSeconds(Math.Pow(2, x)), (ex, time) =>
  85.                 {
  86.                     //日志
  87.                 });
  88.             try
  89.             {
  90.                 await policy.ExecuteAsync(async () =>
  91.                 {
  92.                     //重建连接【赋值给私有化变量,通过get同步给接口里的Connection和Channel】
  93.                     _connection = await _connectionFactory.CreateConnectionAsync();
  94.                     _channel = await _connection.CreateChannelAsync();
  95.                 });
  96.                 //如果连接成功
  97.                 if (IsConnected)
  98.                 {
  99.                     // 连接成功后,注册连接关闭、异常、阻塞的事件处理程序
  100.                     _connection.ConnectionShutdownAsync += OnConnectionShutdownAsync;
  101.                     _connection.CallbackExceptionAsync += OnCallbackExceptionAsync;
  102.                     _connection.ConnectionBlockedAsync += OnConnectionBlockedAsync;
  103.                     return true;
  104.                 }
  105.                 else
  106.                 {
  107.                     return false;
  108.                 }
  109.             }
  110.             catch (Exception ex)
  111.             {
  112.                 Console.WriteLine($"重连失败,最终抛出异常: {ex.Message}");
  113.                 return false;
  114.             }
  115.         }
  116.         private async Task OnConnectionShutdownAsync(object sender, ShutdownEventArgs e)
  117.         {
  118.             if (_disposed) return;
  119.             Console.WriteLine("RabbitMQ连接关闭,正在尝试重连...");
  120.             await TryConnectAsync();
  121.         }
  122.         private async Task OnCallbackExceptionAsync(object sender, CallbackExceptionEventArgs e)
  123.         {
  124.             if (_disposed) return;
  125.             Console.WriteLine($"RabbitMQ连接出现异常,正在尝试重连... 异常信息: {e.Exception.Message}");
  126.             await TryConnectAsync();
  127.         }
  128.         private async Task OnConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs e)
  129.         {
  130.             if (_disposed) return;
  131.             Console.WriteLine("RabbitMQ连接被阻塞,正在尝试重连...");
  132.             await TryConnectAsync();
  133.         }
  134.         //发布消息
  135.         public async Task PublishAsync(string msg, string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout)
  136.         {
  137.             //判断是否连接状态,没有连接就重连
  138.             if (!IsConnected)
  139.             {
  140.                 await TryConnectAsync();
  141.             }
  142.             //创建通道,因为是发布消息,通道不用常打开,所以使用了USING
  143.             using var channel = await _connection.CreateChannelAsync();
  144.             //【ExchangeDeclareAsync】声明交换机,exchange:交换机名称,type:交换机类型
  145.             await channel.ExchangeDeclareAsync(exchange: exChangeName, type: type);
  146.             
  147.             //msg就是消息,需要传递Byte[]
  148.             var body = Encoding.UTF8.GetBytes(msg);
  149.             
  150.             //启动消息持久化,我的项目里使用MQ来进行公告的推送,使用的Fanout类型交换机,故此消息保持持久化。
  151.             var properties = new BasicProperties()
  152.             {
  153.                 Persistent = true,
  154.             };
  155.             //发布消息
  156.             await channel.BasicPublishAsync(
  157.                 exchange: exChangeName,
  158.                 routingKey: routeKey,
  159.                 mandatory: false,
  160.                 basicProperties: properties,
  161.                 body: body);
  162.         }
  163.         //订阅消息
  164.         public async Task SubscribeAsync(string exChangeName = "VECLOG", string routeKey = "", string type = ExchangeType.Fanout)
  165.         {
  166.             if (!IsConnected)
  167.             {
  168.                 await TryConnectAsync();
  169.             }
  170.             
  171.             //【queue】队列
  172.             string queueName = string.IsNullOrWhiteSpace(routeKey) ? exChangeName : routeKey;
  173.             //【durable】持久化队列,MQ服务器不会删除它。
  174.             QueueDeclareOk queueDeclareResult = await Channel.QueueDeclareAsync(
  175.                  queue: queueName,
  176.                   durable: true,
  177.                   exclusive: false,
  178.                   autoDelete: false);
  179.             //根据queue,exchange,routingKey 对 交换机和队列进行绑定,如果是Fanout类型不需要routeKey。
  180.             await Channel.QueueBindAsync(queue: queueName, exchange: exChangeName, routingKey: routeKey);
  181.             //创建消费者
  182.             var consumer = new AsyncEventingBasicConsumer(Channel);
  183.             //消费者消费后执行方法
  184.             consumer.ReceivedAsync += async (model, ea) =>
  185.             {
  186.                 byte[] body = ea.Body.ToArray();
  187.                 var message = Encoding.UTF8.GetString(body);
  188.                 //确认消息已被消费,这样后续该消息就不会被该队列继续消费到了。
  189.                 await Channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
  190.             };
  191.             //启动消费者队列,将消费者和队列绑定
  192.             await Channel.BasicConsumeAsync(queueName, autoAck: false, consumer: consumer);
  193.         }
  194.     }
复制代码
MQ服务注入
  1.             if (Appsettings.AppStr("RabbitMQ:Enable").ObjToBool())
  2.             {
  3.                 services.AddSingleton<IRabbitMQPersistentConnection>(x =>
  4.                 {
  5.                     var connectionFactory = new ConnectionFactory()
  6.                     {
  7.                         HostName = Appsettings.AppStr("RabbitMQ:Connection:HostName"),
  8.                         UserName = Appsettings.AppStr("RabbitMQ:Connection:UserName"),
  9.                         Password = Appsettings.AppStr("RabbitMQ:Connection:PassWord"),
  10.                         Port = Appsettings.AppStr("RabbitMQ:Connection:Port").ObjToInt()
  11.                     };
  12.                     var mq = new RabbitMQPersistentConnection(connectionFactory);
  13.                     return mq;
  14.                 });
  15.             }
复制代码
我在注入各种服务时,添加了一些日志进行输出,效果如下:


 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

缠丝猫

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表