multiple 的 true 和 false 代表不同意思
true 代表批量应答 channel 上未应答的消息
比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答
false 同上面相比
只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
留意:手动应答的编写是写在,消费者/读取者的,读取乐成的 DeliverCallback 回调函数当中的,此中的,后面的 channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback); 第二个参数要改为 false 表示,启动手动应答的方式 。这里我们额外编写一个,睡眠的工具类,用于模仿消费者读取/消费RabbitMQ 消息队列当中的信息的一个网络延时。如下:
手动应答,生产者,不需要做出什么修改上的操纵,就是发送消息,让消费者读取/消费
特别说明:2.3 RabbitMQ 的 持久化
- 这里我们接纳的时 手动应答 的方式,它是在一个RabbitMQ 消息队列的一个默认的轮询读取 的方式下的。
- 手动应答: 实现的效果就是:对应某个消费者,因为某种缘故原由(网络延时),没有实时将,它该在轮询读取的情况下,并没有将全部的消息给读取完,因为没有读取完,又因为接纳的是一个 手动应答的方式,队列知道它没有将,该队列当中的消息读取完,全部 RabbitMQ 消息队列,不会将该消息删除,而是等到该消费者 将该队列的内容读取完才会,删除该队列当中的内容。
- 特别留意:我们这里的两个消费者,假如一开始在,RabbitMQ 没有声明对应的 channel信道,队列信息,那么需要先启动生产者,否则消费者,找不到对应,队列信息,是会报错,无法运行的。
补充:
以下为控制台中持久化与非持久化队列的 UI 表现区:
这个时间即使重启 rabbitmq 队列也依然存在
留意:上面我们仅仅只是将 RabbitMQ 当中的队列举行了持久化,但是此中 RabbitMQ 当中的消息是没有被持久化的。2.3.2 RabbitMQ 消息的持久化:
补充:
意思就是假如这个任务我还没有处置惩罚完或者我还没有应答你,你先别分配给我,我如今只能处置惩罚一个 任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然假如全部的消费者都没有完 成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时间就只能添加 新的 worker 或者改变其他存储任务的策略。运行测试:连续使用上面我们 RabbitMQ 手动应答的代码,仅仅只修改此中的消费者的channel.basicQos(prefetchCount); 参数 ,生产者不需要修改。
**该阈值设置和,不公平分发设置是一样的,编码是一样的,不同的是该设置的值,不是 1了,而是具体的一个该消费者具体要/能处置惩罚的”阈值“了 **
复制代码
- // 设置预取值为: 2
- int prefetchCount = 2;
- channel.basicQos(prefetchCount);
运行测试:连续使用上面我们 RabbitMQ 手动应答的代码,仅仅只修改此中的消费者的int prefetchCount = 5; channel.basicQos(prefetchCount); 参数 ,生产者不需要修改。复制代码
- // 设置预取值为: 5
- int prefetchCount = 5;
- channel.basicQos(prefetchCount);
复制代码
- package com.rainbowsea.rabbitmq.three;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rainbowsea.rabbitmq.utils.RabbitMQUtils;import com.rainbowsea.rabbitmq.utils.SleepUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Worker04 { // ctrl + shift + u 大写转换 // 队列名称 public static final String QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); System.out.println("C2 等待吸收消息处置惩罚时间较长"); // 消费者未乐成消费/读取到队列当中的信息后,的实行的回调函数 CancelCallback cancelCallback = (consumerTag) -> { System.out.println(consumerTag + "消费者取消消费接口实行回调逻辑"); }; // 声明: 消费者乐成消费/读取到队列当中的信息后,的实行的回调函数 DeliverCallback deliverCallback = (consumerTag, message) -> { // new String(message.getBody(),"UTF-8" 假如这里吸收的消息内容是中文的,需要将其转换为 utf-8的内容 System.out.println("读取到的消息" + new String(message.getBody(),"UTF-8")); // 读取时,睡眠 30 s SleepUtils.sleep(30); /* 手动应答 1. 消息的标志 tag 2. 是否批量应答 false : 不批量应答信道中的消息 true 批量,false 不批量 */ channel.basicAck(message.getEnvelope().getDeliveryTag(),false); }; /* 消费者消费/读取消息 1.消费/读取哪个队列当中的消息(留意:一旦读取到了该队列中的某条消息,该消息就被消费者消费掉了,就从队列当中删除了) 2.消费乐成之后是否要自动应答 true 代表的自动应答,false 代表手动应答 3.消费者未乐成消费/读取到队列当中的信息后,的实行的回调函数 4.消费者乐成消费/读取到队列当中的信息后,的实行的回调函数 */ // 设置不公平分发 //int prefetchCount = 1; // 设置预取值为: 5
- int prefetchCount = 5;
- channel.basicQos(prefetchCount); boolean autoAck = false; // 接纳手动应答的方式 channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback); }}
运行结果:3. 最后:
我们可以看到
固然我们这里C1 设置了预取值:是读取2 个,但是这里它却读取到了4个
,是因为,它的预取值的计算是从,它是实时更新的:比如一开始处置惩罚了2个
,那么就是0个,那它就是可以再处置惩罚 2个消息,主要是这个 C1 处置惩罚得太快了
固然 C2 设置的是处置惩罚 5 个但是处置惩罚的太慢了
,由于 C1 处置惩罚的太快了,将C2 的处置惩罚的消息
抢走了,所以预取值,不是绝对的,而是相对的。
“在这个最后的篇章中,我要表达我对每一位读者的感激之情。你们的关注和回复是我创作的动力源泉,我从你们身上吸取了无尽的灵感与勇气。我会将你们的鼓励留在心底,继续在其他的领域奋斗。感谢你们,我们总会在某个时刻再次相遇。”
欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/) | Powered by Discuz! X3.4 |