RabbitMQ 是一个可靠且成熟的消息传递和流署理,它很容易摆设在云环境、内部摆设和本地呆板上。它目前被全天下数百万人使用。
1.基本概念
生产者(Producer)- 生产者是一个发送消息的程序。发送消息的程序可以是任何语言编写的,只要它能够连接到RabbitMQ服务器,并且能够发送消息到RabbitMQ服务器。
复制代码 消费者(Consumer)- 消费者是一个接收消息的程序。接收消息的程序可以是任何语言编写的,只要它能够连接到RabbitMQ服务器,并且能够从RabbitMQ服务器接收消息。
复制代码 队列(Queue)- 队列是RabbitMQ的内部对象,用于存储消息。多个生产者可以向一个队列发送消息,多个消费者可以尝试从一个队列接收消息。队列支持多种消息分发策略。
复制代码 互换机(Exchange)- 交换机是消息的分发中心。它接收来自生产者的消息,然后将这些消息分发给队列。交换机有多种类型,包括直连交换机、主题交换机、扇形交换机、头交换机。
复制代码 绑定(Binding)- 绑定是交换机和队列之间的关联关系。绑定可以使用路由键进行绑定,也可以使用通配符进行绑定。
复制代码 路由键(Routing Key)- 路由键是生产者发送消息时附带的一个属性。路由键的作用是决定消息被分发到哪个队列。
复制代码 通配符(Wildcard)- 通配符是一种模式匹配的方式。RabbitMQ支持两种通配符:`*`和`#`。
复制代码 绑定键(Binding Key)- 绑定键是交换机和队列之间的关联关系。绑定键可以使用路由键进行绑定,也可以使用通配符进行绑定。
复制代码 长期化(Durable)- 持久化是指RabbitMQ服务器重启后,消息是否还存在。持久化可以应用到交换机、队列、绑定、消息等。
复制代码 确认机制(Acknowledge)- 确认机制是指消费者接收到消息后,向RabbitMQ服务器发送一个确认消息。RabbitMQ服务器收到确认消息后,会删除这条消息。
- 自动确认
- 消费者接收到消息后,RabbitMQ服务器会自动删除这条消息。
- 手动确认
- 消费者接收到消息后,需要向RabbitMQ服务器发送一个确认消息。RabbitMQ服务器收到确认消息后,会删除这条消息。
复制代码 拒绝机制(Reject)- 拒绝机制是指消费者接收到消息后,向RabbitMQ服务器发送一个拒绝消息。RabbitMQ服务器收到拒绝消息后,会将这条消息重新发送给其他消费者。
复制代码 死信队列(Dead Letter Queue)- 死信队列是指消息被拒绝、过期或者达到最大重试次数后,会被发送到死信队列。
复制代码 消息过期(Message TTL)- 消息过期是指消息在指定时间内没有被消费者消费,会被删除。
复制代码 消息优先级(Message Priority)- 消息优先级是指消息在队列中的优先级。消息优先级高的消息会被优先消费。
复制代码 消息分发- 消息分发是指消息在队列中的分发策略。消息分发策略包括轮询分发、公平分发、负载均衡分发。
复制代码 2.环境搭建
Docker 安装 RabbitMQ- docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --restart=always --hostname my-rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e TZ=Asia/Shanghai rabbitmq:management
复制代码
- -d:后台运行
- --restart:重启战略
- --name:容器名称
- -p:端口映射
- --hostname:主机名
- -e:环境变量
- RABBITMQ_DEFAULT_USER:默认用户名
- RABBITMQ_DEFAULT_PASS:默认暗码
- TZ:时区
- rabbitmq:management:镜像名称
Docker Compose 安装 RabbitMQ- version: "3.1"
- services:
- rabbitmq:
- restart: always
- image: rabbitmq:management
- container_name: rabbitmq
- hostname: my-rabbit
- ports:
- - 5672:5672
- - 15672:15672 # RabbitMQ管理界面端口
- environment:
- TZ: Asia/Shanghai
- RABBITMQ_DEFAULT_USER: admin
- RABBITMQ_DEFAULT_PASS: admin
复制代码
- restart:重启战略
- image:镜像名称
- container_name:容器名称
- hostname:主机名
- ports:端口映射
- environment:环境变量
- TZ:时区
- RABBITMQ_DEFAULT_USER:默认用户名
- RABBITMQ_DEFAULT_PASS:默认暗码
- rabbitmq:management:镜像名称
3.使用
客户端SDK代码在GitHub:https://github.com/Tangtang1997/IKunLibrary
新建 TestRequest 类,实现 IRabbitMqRequest 接口,定义消息体- public class TestRequest : IRabbitMqRequest
- {
- /// <summary>
- /// 重试次数
- /// </summary>
- public int RetryCount { get; set; }
- #region 自定义字段
- /// <summary>
- /// id
- /// </summary>
- public string Id { get; set; } = default!;
- /// <summary>
- /// 名称
- /// </summary>
- public string Name { get; set; } = default!;
- /// <summary>
- /// 年龄
- /// </summary>
- public int Age { get; set; }
- #endregion
- }
复制代码 新建TestRequestHandler类,实现IRabbitMqRequestHandler接口,处理消息- public class TestRequestHanlder : IRequestProcessorHandler<TestRequest>
- {
- private readonly ILogger<TestRequestHanlder> _logger;
- public TestRequestHanlder(ILogger<TestRequestHanlder> logger)
- {
- _logger = logger;
- }
- public Task StartAsync(CancellationToken cancellationToken)
- {
- return Task.CompletedTask;
- }
- public Task StopAsync(int milliseconds, CancellationToken cancellationToken = default)
- {
- return Task.CompletedTask;
- }
- public async Task HandleAsync(TestRequest request, CancellationToken cancellationToken = default)
- {
- _logger.LogInformation($"开始处理消息: {request.Id}");
- //模拟处理消息耗时操作
- await Task.Delay(1000, cancellationToken);
- _logger.LogInformation($"消息处理完成: {request.Id}");
- }
- }
复制代码 使用 IHostedService 来托管服务- public class SampleHostedService : IHostedService
- {
- private readonly IConsumerProcessorManager<TestRequest> _consumerProcessorManager;
- private readonly IHostApplicationLifetime _applicationLifetime;
- private readonly ILogger<SampleHostedService> _logger;
- public SampleHostedService(
- IConsumerProcessorManager<TestRequest> consumerProcessorManager,
- IHostApplicationLifetime applicationLifetime,
- ILogger<SampleHostedService> logger)
- {
- _consumerProcessorManager = consumerProcessorManager;
- _applicationLifetime = applicationLifetime;
- _logger = logger;
- }
- public async Task StartAsync(CancellationToken cancellationToken)
- {
- _applicationLifetime.ApplicationStarted.Register(() =>
- {
- _logger.LogInformation("SampleHostedService is starting.");
- _consumerProcessorManager.StartAsync(cancellationToken);
- });
- _applicationLifetime.ApplicationStopping.Register(() =>
- {
- _logger.LogInformation("SampleHostedService is stopping.");
- _consumerProcessorManager.StopAsync(3000, cancellationToken);
- });
- await Task.CompletedTask;
- }
- public async Task StopAsync(CancellationToken cancellationToken)
- {
- await Task.CompletedTask;
- }
- }
复制代码 注册并启用服务- IHost host = Host.CreateDefaultBuilder(args)
- .ConfigureServices(services =>
- {
- services.AddHostedService<SampleHostedService>();
- var configuration = services.BuildServiceProvider().GetRequiredService<IConfiguration>();
- var hostName = configuration["RabbitMq:Host"] ?? throw new Exception("HostName is not configured");
- var port = int.Parse(configuration["RabbitMq:Port"] ?? throw new Exception("Port is not configured"));
- var userName = configuration["RabbitMq:Username"] ?? throw new Exception("Username is not configured");
- var password = configuration["RabbitMq:Password"] ?? throw new Exception("Password is not configured");
- var queueName = configuration["RabbitMq:QueueName"] ?? throw new Exception("QueueName is not configured");
- services.AddRabbitMq<TestRequest, TestRequestHanlder>(options =>
- {
- options.UseSsl = false;
- options.HostName = hostName;
- options.Port = port;
- options.UserName = userName;
- options.Password = password;
- options.Durable = true;
- options.NetworkRecoveryInterval = 10000;
- options.ExchangeType = ExchangeType.Direct;
- options.QueueName = queueName;
- options.Exchange = $"{queueName}_SERVICE_EXCHANGE";
- options.RoutingKey = $"{queueName}_ROUTING_KEY";
- options.DeadLetterExchange = $"{queueName}_SERVICE_EXCHANGE_DEAD";
- options.DeadLetterQueueName = $"{queueName}_DEAD";
- options.DeadLetterRoutingKey = $"{queueName}_ROUTING_KEY";
- });
- })
- .Build();
- await host.RunAsync();
复制代码 4.参考资料
https://www.cnblogs.com/Tangtang1997/p/18067763
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |