ToB企服应用市场:ToB评测及商务社交产业平台

标题: RabbitMQ [打印本页]

作者: 数据人与超自然意识    时间: 2023-7-14 22:44
标题: RabbitMQ
.NET6使用RabbitMQ学习


目录

前提

前段时间上班无事,上网冲浪看到了消息队列RabbitMQ,就想着学习一下,网上看了点资料在哔哩哔哩上看的到codeman讲的一个rabbitmq的视频,就跟着仔细学习一下,敲一下代码。视频地址: rabbitmq视频
RabbitMq介绍

什么是消息队列

MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

应用场景

削峰填谷

在一个时间段很多用户同时进行请求我们的A系统,我的MQ容器就可以用来存储请求按照每秒多少的请求进行发送,减轻服务器的压力。
​       

异步提速

所有的问题当你解决一个问题就会出现另外的问题,外部依赖多系统的稳定性就越差,MQ但凡挂了,系统就会出问题,后面就会使用mq集群来解决这一问题。
消息模型

点对点模式



在上图的模型中,有以下概念:
代码附上


新增两个项目一个生产者 Z.RabbitMq.Producer,一个消费者Z.RabbitMQ.Consumer01


work消息模型

工作队列或者竞争消费者模式

work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。
接下来我们来模拟这个流程:
P:生产者:任务的发布者
C1:消费者1:领取任务并且完成任务,假设完成速度较慢(模拟耗时)
C2:消费者2:领取任务并且完成任务,假设完成速度较快
代码附上

新增一个工具类用来获取rabbitmq的连接信息
  1. public class RabbitUtils
  2. {
  3.     public static ConnectionFactory GetConnection()
  4.     {
  5.         var factory = new ConnectionFactory();
  6.         factory.HostName = "127.0.0.1";
  7.         factory.Port = 5672;//5672是RabbitMQ默认的端口号
  8.         factory.UserName = "admin";
  9.         factory.Password = "admin";
  10.         factory.VirtualHost = "my_vhost";
  11.         return factory;
  12.     }
  13. }
复制代码
运行结构如下

能者多劳

通过channel.BasicAck(ea.DeliveryTag, false);来完成能者多劳的效果,在完成上一次请求之后再去取下一条消息,这就会出现服务器快的消费的更多,慢的消费的更少。
发布订阅模式

Publish/subscribe(交换机类型:Fanout,也称为广播 )


和前面两种模式不同:
消费者1收到的天气

项目.RabbitMq.Consumer01 创建WeatherFanout使用exchange(交换机)
  1. public class WeatherFanout
  2. {
  3.     public static void Weather()
  4.     {
  5.         using (var connection = RabbitUtils.GetConnection().CreateConnection())
  6.         {
  7.             using (var channel = connection.CreateModel())
  8.             {
  9.                 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout);
  10.                 // 声明队列信息
  11.                 channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
  12.                 /*
  13.                          * queueBind 用于将队列与交换机绑定
  14.                          * 参数1:队列名
  15.                          * 参数2:交换机名
  16.                          * 参数3:路由Key(暂时用不到)
  17.                          */
  18.                 channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
  19.                 channel.BasicQos(0, 1, false);
  20.                 var consumer = new EventingBasicConsumer(channel);
  21.                 consumer.Received += ((model, ea) =>
  22.                                       {
  23.                                           var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  24.                                           Console.WriteLine($"百度收到的气象信息:{message}");
  25.                                           channel.BasicAck(ea.DeliveryTag, false);
  26.                                       });
  27.                 channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);
  28.                 Console.WriteLine("Press [Enter] to exit");
  29.                 Console.Read();
  30.             }
  31.         }
  32.     }
  33. }
复制代码
消费者2收到的天气

项目.RabbitMq.Consumer02 创建WeatherFanout使用exchange(交换机)
代码与消费者01一样
生产者发送天气

生产者把消息推送到交换机上
  1. public class WeatherFanout
  2. {
  3.     public static void Weather()
  4.     {
  5.         using (var connection = RabbitUtils.GetConnection().CreateConnection())
  6.         {
  7.             using (var channel = connection.CreateModel())
  8.             {
  9.                 string message = "20度";
  10.                 var body = Encoding.UTF8.GetBytes(message);
  11.                 channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER, "", null, body);
  12.                 Console.WriteLine("天气信息发送成功!");
  13.             }
  14.         }
  15.     }
  16. }
复制代码
最后得到效果

Routing 路由模型


P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
生产者
  1. public class WeatherDirect
  2. {
  3.      public static void Weather()
  4.      {
  5.          Dictionary<string, string> area = new Dictionary<string, string>();
  6.          area.Add("china.hunan.changsha.20210525", "中国湖南长沙20210525天气数据");
  7.          area.Add("china.hubei.wuhan.20210525", "中国湖北武汉20210525天气数据");
  8.          area.Add("china.hubei.xiangyang.20210525", "中国湖北襄阳20210525天气数据");
  9.          area.Add("us.cal.lsj.20210525", "美国加州洛杉矶20210525天气数据");
  10.          using (var connection = RabbitUtils.GetConnection().CreateConnection())
  11.          {
  12.              using (var channel = connection.CreateModel())
  13.              {
  14.                  foreach (var item in area)
  15.                  {
  16.                      channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, item.Key,
  17.                                           null, Encoding.UTF8.GetBytes(item.Value));
  18.                  }
  19.                  Console.WriteLine("气象信息发送成功!");
  20.              }
  21.          }
  22.      }
  23. }
复制代码
消费者1

接受百度路由的路由消息
  1. public class WeatherDirect
  2. {
  3.     public static void Weather()
  4.     {
  5.         using (var connection = RabbitUtils.GetConnection().CreateConnection())
  6.         {
  7.             using (var channel = connection.CreateModel())
  8.             {
  9.                 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct);
  10.                 channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
  11.                 /*
  12.                     * queueBind 用于将队列与交换机绑定
  13.                     * 参数1:队列名
  14.                     * 参数2:交换机名
  15.                     * 参数3:路由Key(暂时用不到)
  16.                     */
  17.                 channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20210525");
  18.                 channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525");
  19.                 channel.BasicQos(0, 1, false);
  20.                 var consumer = new EventingBasicConsumer(channel);
  21.                 consumer.Received += ((model, ea) =>
  22.                                       {
  23.                                           var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  24.                                           Console.WriteLine($"百度收到的气象信息:{message}");
  25.                                           channel.BasicAck(ea.DeliveryTag, false);
  26.                                       });
  27.                 channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);
  28.                 Console.WriteLine("Press [Enter] to exit");
  29.                 Console.Read();
  30.             }
  31.         }
  32.     }
  33. }
复制代码
消费者2

接受新浪的路由信息
  1. public class WeatherDirect
  2. {
  3.     public static void Weather()
  4.     {
  5.         using (var connection = RabbitUtils.GetConnection().CreateConnection())
  6.         {
  7.             using (var channel = connection.CreateModel())
  8.             {
  9.                 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct);
  10.                 // 声明队列信息
  11.                 channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);
  12.                 /*
  13.                      * queueBind 用于将队列与交换机绑定
  14.                      * 参数1:队列名
  15.                      * 参数2:交换机名
  16.                      * 参数3:路由Key
  17.                      */
  18.                 channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.xiangyang.20210525");
  19.                 channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20210525");
  20.                 channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525");
  21.                 channel.BasicQos(0, 1, false);
  22.                 var consumer = new EventingBasicConsumer(channel);
  23.                 consumer.Received += ((model, ea) =>
  24.                                       {
  25.                                           var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  26.                                           Console.WriteLine($"新浪收到的气象信息:{message}");
  27.                                           channel.BasicAck(ea.DeliveryTag, false);
  28.                                       });
  29.                 channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer);
  30.                 Console.WriteLine("Press [Enter] to exit");
  31.                 Console.Read();
  32.             }
  33.         }
  34.     }
  35. }
复制代码
最后得到的效果


Topics 通配符模式


routingkey支持通配符匹配格式

生产者
  1. public class WeatherTopic
  2. {
  3.     public static void Weather()
  4.     {
  5.         Dictionary<string, string> area = new Dictionary<string, string>();
  6.         area.Add("china.hunan.changsha.20210525", "中国湖南长沙20210525天气数据");
  7.         area.Add("china.hubei.wuhan.20210525", "中国湖北武汉20210525天气数据");
  8.         area.Add("china.hubei.xiangyang.20210525", "中国湖北襄阳20210525天气数据");
  9.         area.Add("us.cal.lsj.20210525", "美国加州洛杉矶20210525天气数据");
  10.         using (var connection = RabbitUtils.GetConnection().CreateConnection())
  11.         {
  12.             using (var channel = connection.CreateModel())
  13.             {
  14.                 foreach (var item in area)
  15.                 {
  16.                     channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, item.Key,
  17.                                          null, Encoding.UTF8.GetBytes(item.Value));
  18.                 }
  19.                 Console.WriteLine("气象信息发送成功!");
  20.             }
  21.         }
  22.     }
  23. }
复制代码
消费者1

获取交换机中通配符为china.#的信息
  1. public class WeatherTopic
  2. {
  3.     public static void Weather()
  4.     {
  5.         using (var connection = RabbitUtils.GetConnection().CreateConnection())
  6.         {
  7.             using (var channel = connection.CreateModel())
  8.             {
  9.                 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic);
  10.                 // 声明队列信息
  11.                 channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
  12.                 /*
  13.                      * queueBind 用于将队列与交换机绑定
  14.                      * 参数1:队列名
  15.                      * 参数2:交换机名
  16.                      * 参数3:路由Key(暂时用不到)
  17.                      */
  18.                 channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.#");
  19.                 channel.BasicQos(0, 1, false);
  20.                 var consumer = new EventingBasicConsumer(channel);
  21.                 consumer.Received += ((model, ea) =>
  22.                                       {
  23.                                           var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  24.                                           Console.WriteLine($"百度收到的气象信息:{message}");
  25.                                           channel.BasicAck(ea.DeliveryTag, false);
  26.                                       });
  27.                 channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);
  28.                 Console.WriteLine("Press [Enter] to exit");
  29.                 Console.Read();
  30.             }
  31.         }
  32.     }
  33. }
复制代码
消费者2

获取交换机中通配符为china.hubei.*.20210525的信息
  1. public class WeatherTopic
  2. {
  3.     public static void Weather()
  4.     {
  5.         using (var connection = RabbitUtils.GetConnection().CreateConnection())
  6.         {
  7.             using (var channel = connection.CreateModel())
  8.             {
  9.                 /*
  10.                      * 生产者发送消息
  11.                      * 队列名称
  12.                      * 交换机名称
  13.                      * 路由key
  14.                      *
  15.                      */
  16.                 channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic);
  17.                 // 声明队列信息
  18.                 channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);
  19.                 /*
  20.                      * queueBind 用于将队列与交换机绑定
  21.                      * 参数1:队列名
  22.                      * 参数2:交换机名
  23.                      * 参数3:路由Key(暂时用不到)
  24.                      */
  25.                 channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.hubei.*.20210525");
  26.                 channel.BasicQos(0, 1, false);
  27.                 var consumer = new EventingBasicConsumer(channel);
  28.                 consumer.Received += ((model, ea) =>
  29.                                       {
  30.                                           var message = Encoding.UTF8.GetString(ea.Body.ToArray());
  31.                                           Console.WriteLine($"新浪收到的气象信息:{message}");
  32.                                           channel.BasicAck(ea.DeliveryTag, false);
  33.                                       });
  34.                 channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer);
  35.                 Console.WriteLine("Press [Enter] to exit");
  36.                 Console.Read();
  37.             }
  38.         }
  39.     }
  40. }
复制代码
最后得到的效果


RPC


基本概念:

流程说明:

分享几题面试题

RabbitMQ中消息可能有的几种状态?

到这里就结束,大家如果需要看视频学习就是点最上面的链接就行了
想要源码的可以加QQ群831181779 @做梦达人

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4