<context:property-placeholder location=“classpath:rabbitmq.properties”/>
<rabbit:connection-factory id=“connectionFactory” host=“${rabbitmq.host}”
port=“${rabbitmq.port}”
username=“${rabbitmq.username}”
password=“${rabbitmq.password}”
virtual-host=“${rabbitmq.virtual-host}”/>
<rabbit:admin connection-factory=“connectionFactory”/>
<rabbit:queue id=“spring_queue” name=“spring_queue” auto-declare=“true”/>
<rabbit:queue id=“spring_fanout_queue_1” name=“spring_fanout_queue_1” auto-declare=“true”/>
<rabbit:queue id=“spring_fanout_queue_2” name=“spring_fanout_queue_2” auto-declare=“true”/>
<rabbit:fanout-exchange id=“spring_fanout_exchange” name=“spring_fanout_exchange” auto-declare=“true”>
rabbit:bindings
<rabbit:binding queue=“spring_fanout_queue_1” />
<rabbit:binding queue=“spring_fanout_queue_2”/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<rabbit:queue id=“spring_direct_queue” name=“spring_direct_queue” auto-declare=“true”/>
<rabbit:direct-exchange name=“spring_direct_exchange” >
rabbit:bindings
<rabbit:binding queue=“spring_direct_queue” key=“direct”></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:queue id=“spring_topic_queue_one” name=“spring_topic_queue_one” auto-declare=“true”/>
<rabbit:queue id=“spring_topic_queue_two” name=“spring_topic_queue_two” auto-declare=“true”/>
<rabbit:queue id=“spring_topic_queue_three” name=“spring_topic_queue_three” auto-declare=“true”/>
<rabbit:topic-exchange id=“spring_topic_exchange” name=“spring_topic_exchange” auto-declare=“true”>
rabbit:bindings
<rabbit:binding pattern=“one.*” queue=“spring_topic_queue_one”/>
<rabbit:binding pattern=“two.#” queue=“spring_topic_queue_two”/>
<rabbit:binding pattern=“three.#” queue=“spring_topic_queue_three”/>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:template id=“rabbitTemplate” connection-factory=“connectionFactory”/>
4.rabbitmq.properties:
rabbitmq.host=110.42.239.246
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=spring
阐明:这里免费提供rabbitmq毗连方式给各人使用学习
5.ProducerTest:
package com.sky.springrabbitmqprodule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = “classpath:spring-rabbitmq-producer.xml”)
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
*/
@Test
public void testHelloWorld(){
rabbitTemplate.convertAndSend(“spring_queue”,“hello world spring…”);
}
/**
*/
@Test
public void testFanout(){
rabbitTemplate.convertAndSend(“spring_fanout_exchange”,“”,“spring fanout…”);
}
/**
*/
@Test
public void testDirect(){
rabbitTemplate.convertAndSend(“spring_direct_exchange”,“direct”,“spring Direct…”);
}
/**
*/
@Test
public void testTopics(){
rabbitTemplate.convertAndSend(“spring_topic_exchange”,“one.onekey”,“spring topic one…”);
rabbitTemplate.convertAndSend(“spring_topic_exchange”,“two.twokey.topic”,“spring topic two…”);
rabbitTemplate.convertAndSend(“spring_topic_exchange”,“three.threekey.topic”,“spring topic three…”);
}
}
2.消耗者
1.项目架构图
代码如下(示例):
2.pom.xml依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns=“http://maven.apache.org/POM/4.0.0”
xmlns:xsi=“http://www.w3.org/2001/XMLSchema-instance”
xsi:schemaLocation=“http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd”>
4.0.0
com.sky
spring-rabbitmq-consumer
1.0-SNAPSHOT
org.springframework
spring-context
5.1.7.RELEASE
org.springframework.amqp
spring-rabbit
2.1.8.RELEASE
junit
junit
4.12
org.springframework
spring-test
5.1.7.RELEASE
org.apache.maven.plugins
maven-compiler-plugin
3.8.0
1.8 1.8
3.spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns=“http://www.springframework.org/schema/beans”
xmlns:xsi=“http://www.w3.org/2001/XMLSchema-instance”
xmlns:context=“http://www.springframework.org/schema/context”
xmlns:rabbit=“http://www.springframework.org/schema/rabbit”
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<context:property-placeholder location=“classpath:rabbitmq.properties”/>
<rabbit:connection-factory id=“connectionFactory” host=“${rabbitmq.host}”
port=“${rabbitmq.port}”
username=“${rabbitmq.username}”
password=“${rabbitmq.password}”
virtual-host=“${rabbitmq.virtual-host}”/>
<rabbit:listener-container connection-factory=“connectionFactory” auto-declare=“true”>
<rabbit:listener ref=“topicListenerOne” queue-names=“spring_topic_queue_one”/>
<rabbit:listener ref=“topicListenerTwo” queue-names=“spring_topic_queue_two”/>
<rabbit:listener ref=“topicListenerThree” queue-names=“spring_topic_queue_three”/>
</rabbit:listener-container>
4.rabbitmq.properties
rabbitmq.host=110.42.239.246
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=spring
阐明:设置和生产者的同等
5.ConsumerTest
package com.sky.springrabbitmqconsumer.test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ConsumerTest {
public static void main(String[] args) {
//初始化IOC容器
ApplicationContext ctx = new ClassPathXmlApplicationContext(“classpath:spring-rabbitmq-consumer.xml”);
}
}
6.FanoutListener
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class FanoutListener implements MessageListener {
@Override
public void onMessage(Message message) {
//打印消息
System.out.println(new String(message.getBody()));
}
}
7.FanoutListener2
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class FanoutListener2 implements MessageListener {
@Override
public void onMessage(Message message) {
//打印消息
System.out.println(new String(message.getBody()));
}
}
8.SpringDirectQueue
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class SpringDirectQueue implements MessageListener {
@Override
public void onMessage(Message message) {
//打印消息
System.out.println(new String(message.getBody()));
}
}
9.SpringQueueListener
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class SpringQueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
//打印消息
System.out.println(new String(message.getBody()));
}
}
10.TopicListenerOne
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class TopicListenerOne implements MessageListener {
@Override
public void onMessage(Message message) {
//打印消息
System.out.println(new String(message.getBody()));
}
}
11.TopicListenerTwo
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class TopicListenerTwo implements MessageListener {
@Override
public void onMessage(Message message) {
//打印消息
System.out.println(new String(message.getBody()));
}
}
12.TopicListenerThree
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class TopicListenerThree implements MessageListener {
@Override
public void onMessage(Message message) {
//打印消息
System.out.println(new String(message.getBody()));
}
}
上面就是这个项目的全部代码了,下面就是Demo演示内容。
二、项目演示
====================================================================
演示简单模式:
消耗者取消表明:
消耗者启动服务:
生产者发送消息:
消耗者检察消息:
演示广播模式:
消耗者取消表明:
消耗者启动服务:
生产者发送消息:
消耗者检察消息:
演示路由模式:
消耗者取消表明:
消耗者启动服务:
生产者发送消息:
消耗者检察消息:
演示通配符模式:
消耗者取消表明:
消耗者启动服务:
生产者发送消息:
消耗者检察消息:
三、消息可靠性投递
=======================================================================
消息可靠性实现须要包管以下几点:
exchange要长期化
queue要长期化
message要长期化
- 生产方确认Confirm
- 消耗方确认Ack
- Broker高可用
1.rabbitmq 整个消息投递的路径
producer—>rabbitmq broker—>exchange—>queue—>consumer
- 消息从producer 到 exchange 则会返回一个 confirmCallback 。
- 消息从exchange–>queue 投递失败则会返回一个 returnCallback 。
2.实现消息可靠性投递的步调
- 生产者设置ConnectionFactory的publisher-confirms=“true” 开启 确认模式。
- 生产者设置ConnectionFactory的publisher-returns=“true” 开启 退回模式。
- 生产者使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,假如为true,则发送乐成,假如为false,则发送失败,须要处理惩罚。
- 生产者使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,假如设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并实行回调函数returnedMessage。
- 消耗者在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:主动确认,manual:手动确认(none主动确认模式很伤害,当生产者发送多条消息,消耗者吸收到一条信息时,会主动以为当前发送的消息已经签收了,这个时间消耗者举行业务处理惩罚时出现了非常情况,也会以为消息已经正常签收处理惩罚了,而队列内里表现都被消耗掉了。以是真实开辟都会改为手动签收,可以防止消息丢失)
- 消耗者假如在消耗端没有出现非常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
- 消耗者假如出现非常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。
3.具体实现可靠消息投递的代码
阐明:基于上述Spring整合RabbitMQ的代码举行改动
生产者
第一处改动:设置确认模式和退回模式
代码:
publisher-confirms=“true”
publisher-returns=“true”
第二处改动:声明队列和交互机的bean代码:
<rabbit:queue id=“test_queue_confirm” name=“test_queue_confirm”></rabbit:queue>
<rabbit:direct-exchange name=“test_exchange_confirm”>
rabbit:bindings
<rabbit:binding queue=“test_queue_confirm” key=“confirm”></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
第三处改动:编写Confirm测试方法
//测试Confirm 模式
@Test
public void testConfirm() {
//界说回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
- @param correlationData 干系设置信息
- @param ack exchange交换机 是否乐成收到了消息。true 乐成,false代表失败
- @param cause 失败缘故起因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println(“confirm方法被实行了…”);
//ack 为 true表现 消息已经到达交换机
if (ack) {
//吸收乐成
System.out.println(“吸收乐成消息” + cause);
} else {
//假如没有投递到交换机中去就会吸收失败,好比:将交换机名称写错
System.out.println(“吸收失败消息” + cause);
//做一些处理惩罚,让消息再次发送。
}
}
});
//举行消息发送
rabbitTemplate.convertAndSend(“test_exchange_confirm”,“confirm”,“message Confirm…”);
//举行就寝利用
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
第到处改动:编写Return测试方法
//测试 return模式
@Test
public void testReturn() {
//设置交换机处理惩罚失败消息的模式为true的时间,消息到达不了队列时,会将消息重新返回给生产者
rabbitTemplate.setMandatory(true);
//界说回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
- @param message 消息对象
- @param replyCode 错误码
- @param replyText 错误信息
- @param exchange 交换机
- @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println(“return 实行了…”);
System.out.println(“message:”+message);
System.out.println(“replyCode:”+replyCode);
System.out.println(“replyText:”+replyText);
System.out.println(“exchange:”+exchange);
System.out.println(“routingKey:”+routingKey);
//处理惩罚业务
}
});
//举行消息发送
rabbitTemplate.convertAndSend(“test_exchange_confirm”,“confirm”,“message return…”);
//举行就寝利用
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
消耗者
第一处改动:
监听器:AckListener
package com.sky.springrabbitmqconsumer.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
@Component
public class AckListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//1、获取消息的id
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//2、获取消息
System.out.println(“message:”+new String(message.getBody()));
//3、举行业务处理惩罚
System.out.println(“=举行业务处理惩罚”);
//模拟出现非常
int i = 5/0;
//4、举行消息签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
//拒绝签收
/*
- 第三个参数:requeue:重回队列。假如设置为true,则消息重新回到queue,broker会重新发送该消息给消耗端
*/
System.out.println(“=业务处理惩罚非常,消息重新回到queue,broker会重新发送该消息给消耗端”);
channel.basicNack(deliveryTag, true, true);
}
}
}
第二处改动:
原来是通过声明一个个的bean对象,如今改为了扫描某个包下面的类
<context:component-scan base-package=“com.sky.springrabbitmqconsumer.listener” />
第三处改动:
在rabbit:listener-container标签中设置acknowledge属性改为手动确认,(限流设置:prefetch属性改为每次抓取2条消息,而且监听自界说的ackListener)
4.具体实现可靠消息投递的演示
正常发消息Demo演示
启动生产者Confirm模式:
启动消耗者:
启动生产者Return模式:
消耗者的控制台就会不绝的打印:
非常发消息Demo演示
生产者Confirm模式:
生产者Return模式:
四、消息在消耗端限流
========================================================================
1.限流示例图
2.实现步调
- 在rabbit:listener-container中设置 prefetch属性设置消耗端一次拉取多少消息
- 消耗端简直认模式肯定为手动确认:acknowledge=“manual”
3.具体实现消耗端限流代码
消耗者
第一处修改:监听器:QosListener
package com.sky.springrabbitmqconsumer.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
@Component
public class QosListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//获取到的消息
System.out.println(new String(message.getBody()));
Thread.sleep(3000);
//处理惩罚业务逻辑
//举行消息的签收,第二个参数:true表现之前没签收的都给他签收掉
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
第二处修改:
<rabbit:listener-container connection-factory=“connectionFactory”
auto-declare=“true”
acknowledge=“manual”
prefetch=“2”>
<rabbit:listener ref=“qosListener” queue-names=“test_queue_confirm”></rabbit:listener>
天生者
批量发送消息测试方法
//批量发送消息,让消耗者每次拉去指定的数量
@Test
public void testQos(){
for (int i = 0; i < 10; i++) {
// 发送消息
rabbitTemplate.convertAndSend(“test_exchange_confirm”, “confirm”, “message confirm…”);
}
}
4.具体实现消耗端限流Demo演示
启动消耗者
启动生产者
检察消耗者控制台日记
阐明:每隔3秒打印一条消息
非常情况,消耗未举行签收
重启消耗者和生产者发消息,这个时间会看到,本来发送的十条消息,现实只有二条消息打印在消耗者的控制台上面,由于prefetch属性设置了2,以是一次性拉取了二条。
<hr style=" border:solid; width:100px; height:1px;" color=#000000 size=1">
五、TTL
===================================================================
1.业务场景
好比我们在下订单的时间,假如凌驾30分钟未付出,就取消这个订单,把当前商品的库存加归去。
2.界说
TTL 全称 Time To Live(存活时间/逾期时间),当消息到达存活时间后,还没有被消耗,会被主动扫除。RabbitMQ可以对消息设置逾期时间,也可以对整个队列(Queue)设置逾期时间。
3.实现步调
- 设置队列逾期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息同一逾期。
- 设置消息逾期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消耗时),会单独判断这一消息是否逾期。
- 假如两者都举行了设置,以时间短的为准。
4.通过RabbitMQ管理控制台页面实现Demo
1.创建消息
2.创建交换机
3.将交换机和消息绑定
4.发送消息
凌驾5秒没有消耗者消耗,就主动失效了。
5.通过代码实现TTL
添加ttl队列
<rabbit:queue name=“test_queue_ttl” id=“test_queue_ttl”>
rabbit:queue-arguments
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name=“test_exchange_ttl” >
rabbit:bindings
<rabbit:binding pattern=“ttl.#” queue=“test_queue_ttl”></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
发送消息测试方法
//ttl测试
@Test
public void testTtl(){
for (int i = 0; i < 10; i++) {
// 发送消息
rabbitTemplate.convertAndSend(“test_exchange_confirm”, “ttl.test”, “message confirm…”);
}
}
启动测试方法
等候10秒
六、死信队列
====================================================================
1.界说
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
自我先容一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里不停到如今。
深知大多数Java工程师,想要提拔技能,通常是自己探索发展大概是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易遇到天花板技能故步自封!
因此网络整理了一份《2024年Java开辟全套学习资料》,初志也很简单,就是盼望可以或许资助到想自学提拔又不知道该从何学起的朋侪,同时减轻各人的负担。
既有得当小白学习的零底子资料,也有得当3年以上履历的小搭档深入学习提拔的进阶课程,根本涵盖了95%以上Java开辟知识点,真正体系化!
由于文件比力大,这里只是将部分目次截图出来,每个节点内里都包罗大厂面经、学习条记、源码课本、实战项目、解说视频,而且会一连更新!
假如你以为这些内容对你有资助,可以扫码获取!!(备注Java获取)
末了
许多步调员,整天沉醉在业务代码的 CRUD 中,业务中没有大量数据做并发,缺少实战履历,对并发仅仅停顿在相识,做不到醒目,以是总是与大厂擦肩而过。
我把私藏的这套并发体系的条记和头脑脑图分享出来,理论知识与项目实战的联合,我以为只要你肯花时间用心学完这些,肯定可以快速把握并发编程。
不管是查缺补漏照旧深度学习都能有非常不错的成效,须要的话记得帮助点个赞支持一下
整理不易,以为有资助的朋侪可以帮助点赞分享支持一下小编~
《互联网大厂口试真题剖析、进阶开辟核心学习条记、全套解说视频、实战项目源码课本》点击传送门即可获取!
tps://img-blog.csdnimg.cn/8eaf2d0fbdc84aca8e094600bfb68e29.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAamF2YeWwj-S4kQ==,size_20,color_FFFFFF,t_70,g_se,x_16)等候10秒
六、死信队列
====================================================================
1.界说
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
自我先容一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里不停到如今。
深知大多数Java工程师,想要提拔技能,通常是自己探索发展大概是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易遇到天花板技能故步自封!
因此网络整理了一份《2024年Java开辟全套学习资料》,初志也很简单,就是盼望可以或许资助到想自学提拔又不知道该从何学起的朋侪,同时减轻各人的负担。[外链图片转存中…(img-eA3rHdHW-1713374070958)]
[外链图片转存中…(img-21jVJVUz-1713374070958)]
[外链图片转存中…(img-VZgTRJC2-1713374070958)]
既有得当小白学习的零底子资料,也有得当3年以上履历的小搭档深入学习提拔的进阶课程,根本涵盖了95%以上Java开辟知识点,真正体系化!
由于文件比力大,这里只是将部分目次截图出来,每个节点内里都包罗大厂面经、学习条记、源码课本、实战项目、解说视频,而且会一连更新!
假如你以为这些内容对你有资助,可以扫码获取!!(备注Java获取)
末了
许多步调员,整天沉醉在业务代码的 CRUD 中,业务中没有大量数据做并发,缺少实战履历,对并发仅仅停顿在相识,做不到醒目,以是总是与大厂擦肩而过。
我把私藏的这套并发体系的条记和头脑脑图分享出来,理论知识与项目实战的联合,我以为只要你肯花时间用心学完这些,肯定可以快速把握并发编程。
不管是查缺补漏照旧深度学习都能有非常不错的成效,须要的话记得帮助点个赞支持一下
整理不易,以为有资助的朋侪可以帮助点赞分享支持一下小编~
《互联网大厂口试真题剖析、进阶开辟核心学习条记、全套解说视频、实战项目源码课本》点击传送门即可获取!
|