rabbitmq-java根本详解
一、rabbitmq是什么?1、MQ界说
MQ(Message Queue)消息队列 重要办理:异步处理处罚、应用解耦、流量削峰等题目,是分布式体系的告急组件,从而实现高性能,高可用,可伸缩和终极同等性的架构,rabbitmq 是 消息队列中的一种。
1.1 异步
通过消息队列,生产者无需等候消耗者完成处理处罚即可继续实行其他使命,从而进步体系相应速率和吞吐量。比方,在用户下单后,订单体系可以将订单信息发送到消息队列,然后立即返回给用户确认信息,而物流体系或库存体系则在背景异步地从队列中获取并处理处罚订单。
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvMmQxYzAwOTEwZmNlYTkxYjBiY2RjZDEyY2JjN2YwNzcucG5n
1.2 解耦
差别应用步伐之间通过消息队列通讯,不再直接依靠对方的接口调用,当某一方举行升级或重构时,不会影响其他体系的运行。比方,一个付出体系可以向消息队列发布付出乐成的关照,而积分体系、堆栈体系平分别订阅这些消息来更新各自的业务状态,相互独立工作。
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvMzFhMTA1ZWQxNzBjZjNjMjc3OTllMjRlNTY3NzA4OWIucG5n
1.3 削峰
当短时间内有大量的哀求涌入体系时,消息队列可以作为缓冲区存储这些哀求,以恒定的速率分发给卑鄙服务,克制了由于瞬间高峰导致的服务瓦解。
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvZjczMTU5ZDYwNzg4N2IwNzBmM2U5ZDc4ZTUzYWVkYjEucG5n
2、技能配景
2.1 AMQP高级消息队列协议
Advanced Message Queuing Protocol 是一个开放尺度的消息中心件协议,它界说了消息署理和应用步伐之间的交互方式。RabbitMQ便是基于AMQP协议实现的消息队列产物,提供了一种尺度化的方式来包管跨语言宁静台的可靠消息传输。
2.2 JMS
[*] Java Message Server,Java消息服务应用步伐接口,一种规范,和JDBC继承的脚色雷同
[*] 是一个Java平台中关于面向消息中心件的API,用于在两个应用步伐之间,或分布式体系中发送消 息,举行异步通讯
2.3 接洽
[*] JMS是界说了同一接口,同一消息操纵;AMQP通过协议同一数据交互格式
[*] JMS必须是java语言;AMQP只是协议,与语言无关
2.4 Erlang语言
RabbitMQ服务器端是使用Erlang语言编写的,Erlang以其高并发、容错性和分布式盘算本事闻名,非常恰当用于构建像RabbitMQ如许必要高度稳固和可扩展的消息中心件。
3、为什么使用rabbitmq
[*]可靠性:RabbitMQ提供了多种机制包管消息投递的可靠性,包罗长期化消息、消息确认机制等。
[*]机动性:通过Exchange、Queue和Routing Key等组件,RabbitMQ支持机动的消息路由计谋,包罗发布订阅、路由模式、主题模式等多种模式。
[*]扩展性:通过集群和镜像队列等功能,RabbitMQ可以轻松实现程度扩展,满足高可用及高性能的需求。
[*]广泛支持:RabbitMQ拥有丰富的客户端库,险些支持全部主流开发语言,便于开发者快速集成。
[*]使用简朴方便:安装摆设简朴,上手门槛低,有强大的WEB管理页面。
4、rabbitmq的各组件功能
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvNGMzZmQ1ODZmZmIyYjM4ZjY4ODkyNzEyYWJkNThhZjAucG5n
[*]Broker:消息队列服务器实体
[*]Virtual Host:捏造主机,是一种逻辑隔离单位,可以在单个RabbitMQ Broker实例上创建多个vhost,每个vhost内部有自己的交换机、队列和权限管理,实现差别项目或租户间资源的隔离。
[*]Publisher(生产者):负责天生和发布消息到RabbitMQ服务器,可以选择目的交换机并将消息附带特定的路由键。
[*]Consumer(消耗者):从RabbitMQ中吸取并消耗消息的步伐,可以从绑定到特定交换机和路由键的队列中取出消息举行处理处罚。
[*]Exchange(交换机):根据预界说的范例和路由规则,吸取生产者发布的消息,并将其转发到相应的队列。常见的交换机范例有直连(Direct)、主题(Topic)、头部(Headers)和扇出(Fanout)等。
[*]Queue(队列):存储消息的容器,现实的消息载体,消息会按照路由规则存放在队列中等候消耗者消耗。
[*]Banding:绑定,用于消息队列和交换机之间的关联。
[*]Channel:通道(信道)
[*]多路复用毗连中的一条独立的双向数据流畅道。
[*]信道是创建在真实的TCP毗连内的 捏造链接。
[*]AMQP下令都是通过信道发出去的,不管是发布消息、订阅队列照旧吸取消息,都是通过信道完成的。
[*]由于对于操纵体系来说,创建和烧毁TCP毗连都优劣常昂贵的开销,以是引入了信道的概念,用来复用TCP毗连。
[*]Routing key:生产者在发布消息时指定的一个标识符,用于决定消息怎样被交换机路由到相应的队列。
二、rabbitmq 的使用
1、Linux捏造机设置
[*] rabbitmq的安装通常涉及到如下两个步调,可以参考博文Linux安装RabbitMQ具体教程(最具体的图文教程)-CSDN博客:
[*]安装Erlang:由于RabbitMQ是用Erlang编写的,起首必要在Linux体系中安装Erlang运行环境。
[*]安装RabbitMQ:可以通过官方提供的apt或yum堆栈举行安装,大概下载源码自行编译安装。
[*] 启动背景管理插件
# rabbitmq-plugins enable rabbitmq_management
[*] 启动、检察状态、重启、关闭 rabbitmq
# systemctl start rabbitmq-server.service
# systemctl status rabbitmq-server.service
# systemctl restart rabbitmq-server.service
# systemctl stop rabbitmq-server.service
[*] 检察进程
# ps -ef | grep rabbitmq
[*] 测试
[*] 关闭防火墙: systemctl stop firewalld
[*] 欣赏器输入:http://ip:15672
[*] 默认帐号暗码:guest,guest用户默认不允许长途毗连
[*] 创建账号
# rabbitmqctl add_user 你的用户名 你的密码
[*] 设置用户脚色
# rabbitmqctl set_user_tags 你的用户名 administrator
[*] 设置用户权限
# rabbitmqctl set_permissions -p "/" 你的用户名 ".*"
".*" ".*"
[*] 检察当前用户和脚色
# rabbitmqctl list_users
[*] 修改用户暗码
# rabbitmqctl change_password 你的用户名 新的密码
[*] web界面先容:
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvNzliYjdiZmZiZTZjOTJjOWMzODQ3MDM0NTllNTk5OTMucG5n
端口:
5672:RabbitMQ提供给编程语言客户端链接的端口
15672:RabbitMQ管理界面的端口
25672:RabbitMQ集群的端口
2、java使用rabbitmq
2.1 快速入门
[*] 长途登录创建的账号,在Admin下添加了用户
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvOGJlNGJjMzQwNmYwNjg2NzAwOGMzMzBiODA3MDVhOWMucG5n
[*] pom依靠
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
</dependencies>
[*] 创建毗连工具类
package utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static Connection getConnection() throws Exception{
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、在工厂对象中设置MQ的连接信息(ip、port、vhost、username、password)
factory.setHost("192.168.81.121");
factory.setPort(5672);
factory.setVirtualHost("/lb");
factory.setUsername("lb");
factory.setPassword("123123");
//3、通过工厂获得与MQ的连接
Connection connection = factory.newConnection();
return connection;
}
public static void main(String[] args) throws Exception {
Connection connection = getConnection();
System.out.println("connection = " + connection);
connection.close();
}
}
[*] 运行测试结果
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvZGRlMjAxMGJhZTNmNzBmZTQxMDJjZWQ2ZTdiZDUzODcucG5n
2.2 RabbitMQ模式
5种消息模子,大要分为两类:点对点、发布订阅模式(一对多)
[*]点对点:
[*]包罗三部分:消息队列(queue),发送者(sender),吸取者(receiver)
[*]每个消息发送到一个特定的队列中,吸取者从中得到消息,队列中保存这些消息,直到他们被消耗或超时
[*]每个消息一个消耗者
[*]消耗者不必要运行,发送者发送的消息可以被直接生存在队列内
[*]简朴模式和工作队列模式属于这种范例
[*]发布订阅:
[*]发布订阅多了一部分,交换机,起到将消息路由分发到各个订阅者的作用
[*]每个消息可以有多个订阅者
[*]消耗者必要先订阅,订阅者发布的消息才气被消耗
[*]消耗者必要保持运行状态,才气消耗消息
[*]发布订阅模式、路由模式、通配符(主题)模式属于这种范例
2.2.1 简朴模式
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvZmFhYTA5N2NkODRkNjExNWNhMmE2ZDJkNTgxMWFhNjEucG5n
[*] 生产者
package simplest;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.ConnectionUtil;
public class Sender {
public static void main(String[] args) throws Exception {
String msg = "Hello world!";
//1、获取连接
Connection connection = ConnectionUtil.getConnection();
//2、在连接中创建通道(信道)
Channel channel = connection.createChannel();
/*3、创建消息队列
参数1、队列中的名称
参数2、队列的数据是否持久化
参数3、是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)
参数4、是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存保存数据)
参数5、队列参数(没有参数为null)
*
* */
channel.queueDeclare("queue1", false, false, false, null);
/*4、向指定队列发送消息
参数1、交换机名称,简单模式没有交换机,所以名称为""
参数2、目标队列的名称
参数3、设置消息的属性(没有属性为null)
参数4、消息的内容(只接受字节数组)
*/
channel.basicPublish("", "queue1", null, msg.getBytes());
System.out.println("发送:" + msg);
//5、释放资源
channel.close();
connection.close();
}
}
[*] 消耗者
package simplest;
import com.rabbitmq.client.*;
import utils.ConnectionUtil;
import java.io.IOException;
public class Receiver {
public static void main(String[] args) throws Exception {
//1、获得连接
Connection connection = ConnectionUtil.getConnection();
//2、获得通道(信道)
Channel channel = connection.createChannel();
//3、从信道获得消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/*
交付处理
参数1:收件人信息
参数2:包裹上的快递标签
参数3:协议的配置
参数4:消息
*/
String s = new String(body);
System.out.println("接收到的消息:" + s);
}
};
//4、监听队列 true:自动消息确认
channel.basicConsume("queue1", true, consumer);
}
}
[*] 测试结果
[*] 先运行sender,此时Queue中存入一个消息:
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvMzQwOTZiZDQ0OTgyOGIzYzVkMzJmYWIyYTZkYTc5MDEucG5n
[*] 再运行receiver,此时Queue中的消息被消耗:
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvMTJmYmM2ZmIxZWRhYWFlZjU0NzBiMGFlNDg3ZGM3YWMucG5n
[*] 消息确认机制
消息可以设置手动确认,如允许以包管:
[*]消耗者吸取到消息处理处罚时未发生非常再确认,消息才被删除;
[*]发生非常,不确认,消息就不会被删除,防止消息丢失。
修改消耗者代码:
package simplest;
import com.rabbitmq.client.*;
import utils.ConnectionUtil;
import java.io.IOException;
public class ReceiverByAck {
public static void main(String[] args) throws Exception {
//1、获得连接
Connection connection = ConnectionUtil.getConnection();
//2、获得通道
Channel channel = connection.createChannel();
//3、从channel中获取消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//body就是从队列中获取的消息
String s = new String(body);
System.out.println("接收消息:" + s);
//手动确认(收件人信息,是否同时确认多个消息)
System.out.println("消息已接收并正常处理完毕!手动确认回执!");
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//4、设置手动确认
channel.basicConsume("queue1", false, consumer);
}
}
[*] 再次运行sender和receiver测试结果:
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvZDAwMDkwYjU2YzA4MTIyMDIwNGE1NWViYjc2MGQ1ZDgucG5n
2.2.2 工作队列模式
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvY2MwNjhkNjVlOTJlMWUyNGE4MzZhMmQzNzE4MzIwMjUucG5n
如图,此种模式区别于简朴模式重要是多个消耗者共同消耗消息,但留意,仍然是一个消息对应一个消耗者。
[*] 生产者代码雷同简朴模式,只是循环发了多条消息
package work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.ConnectionUtil;
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test_work_queue", false, false, false, null);
for (int i = 0; i < 100; i++) {
String msg = "产生的消息====>>>" + i;
channel.basicPublish("", "test_work_queue", null, msg.getBytes());
System.out.println(msg);
}
channel.close();
connection.close();
}
}
[*] 消耗者同样声明白消息队列,如允许以提前开启监听消息队列;同时,消耗者1延时300ms,消耗者2延时1000ms,便于观察;
[*] 消耗者1
package work;
import com.rabbitmq.client.*;
import utils.ConnectionUtil;
import java.io.IOException;
public class Receiver1 {
static int num = 1;//统计消费的消息数目
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
//queueDeclare() 此方法有双重作用,如果队列不存在,就创建;如果队列存在,则获取
// 使用此方法可以保证先启动消费者不会报错
channel.queueDeclare("test_work_queue", false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("Receiver1 消费了" + s + "! 总共消费了" + num++ + "条消息!");
//延迟时间
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
//手动确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//设置监听
channel.basicConsume("test_work_queue", false, consumer);
}
}
[*] 消耗者2
package work;
import com.rabbitmq.client.*;
import utils.ConnectionUtil;
import java.io.IOException;
public class Receiver2 {
static int num = 1;//统计消费的消息数目
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
//queueDeclare() 此方法有双重作用,如果队列不存在,就创建;如果队列存在,则获取
// 使用此方法可以保证先启动消费者不会报错
channel.queueDeclare("test_work_queue", false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("Receiver2 消费了" + s + "! 总共消费了" + num++ + "条消息!");
//延迟时间
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//手动确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//设置监听
channel.basicConsume("test_work_queue", false, consumer);
}
}
[*] 测试结果:
[*] 先开启消耗者,背景就存在了相应队列:
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvZjAyMzVhYzJmMWVjZmEwYjRkOGU4ZWRiMWVhODFiM2EucG5n
[*] 再运行生产者:
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvNWZhMzgzZmZiNjUwYWI3ZWUyYzhhZmQyMTc2ZTEyNmIucG5n
[*] 可以看到消耗者1消耗消息的服从高,但仍然只消耗50个,分析生产者的消息是完全匀称分配的,这不符合正常的需求,我们想按照服从分配,需添加如下代码:
channel.basicQos(1);
修改后的消耗者代码:
[*] 消耗者1
package work;import com.rabbitmq.client.*;import utils.ConnectionUtil;import java.io.IOException;public class Receiver1 { static int num = 1;//统计消耗的消息数目 public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); //queueDeclare() 此方法有双重作用,假如队列不存在,就创建;假如队列存在,则获取 // 使用此方法可以包管先启动消耗者不会报错 channel.queueDeclare("test_work_queue", false, false, false, null); channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); System.out.println("Receiver1 消耗了" + s + "! 统共消耗了" + num++ + "条消息!"); //延长时间 try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } //手动确认 channel.basicAck(envelope.getDeliveryTag(), false); } }; //设置监听 channel.basicConsume("test_work_queue", false, consumer); }}
[*] 消耗者2
package work;import com.rabbitmq.client.*;import utils.ConnectionUtil;import java.io.IOException;public class Receiver2 { static int num = 1;//统计消耗的消息数目 public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); //queueDeclare() 此方法有双重作用,假如队列不存在,就创建;假如队列存在,则获取 // 使用此方法可以包管先启动消耗者不会报错 channel.queueDeclare("test_work_queue", false, false, false, null); channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); System.out.println("Receiver2 消耗了" + s + "! 统共消耗了" + num++ + "条消息!"); //延长时间 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //手动确认 channel.basicAck(envelope.getDeliveryTag(), false); } }; //设置监听 channel.basicConsume("test_work_queue", false, consumer); }}
[*] 先开启消耗者,再运行生产者,测试结果:
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvNGM5YTk3NTA4NzA3ZDI4NWVmMTYzMjM3ZTJhMGUwZmMucG5n
留意:能者多劳必须要共同手动的ACK机制才见效
2.2.3 发布订阅模式
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvODYyNGZhMWE4YjViY2M3NWRmOTI4ZDM3NjAzMDhmM2EucG5n
发布订阅模式添加了 X(交换机 Exchange),该脚色重要实现消息的分发,当多个消息队列绑定了该交换机时,该交换机遇把消息广播到全部绑定到它的队列,以是全部订阅了相应队列的消耗者都会收到雷同的消息。
[*] 生产者
package fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.ConnectionUtil;
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明路由(路由名,路由类型)
// fanout:不处理路由键(只需要将队列绑定到路由上,发送到路由的消息都会被转发到与该路由绑定的所有队列上)
channel.exchangeDeclare("test_fanout_exchange", "fanout");
String msg = "hello,everyone!";
channel.basicPublish("test_fanout_exchange", "", null, msg.getBytes());
System.out.println("生产者:" + msg);
channel.close();
connection.close();
}
}
[*] 消耗者
[*] 消耗者1
package fanout;
import com.rabbitmq.client.*;
import utils.ConnectionUtil;
import java.io.IOException;
public class Receiver1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("test_fanout_queue1", false, false, false, null);
//绑定路由
channel.queueBind("test_fanout_queue1", "test_fanout_exchange", "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("消费者1: " + s);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//监听队列
channel.basicConsume("test_fanout_queue1", false, consumer);
}
}
[*] 消耗者2
package fanout;
import com.rabbitmq.client.*;
import utils.ConnectionUtil;
import java.io.IOException;
public class Receiver2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("test_fanout_queue2", false, false, false, null);
//绑定路由
channel.queueBind("test_fanout_queue2", "test_fanout_exchange", "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("消费者2: " + s);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//监听队列
channel.basicConsume("test_fanout_queue2", false, consumer);
}
}
[*] 测试:
[*] 先运行生产者,由于必要先创建交换机,此步的消息忽略。
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvZjYyNWEzOWEwNjU4Y2NjY2U4YjcwMjY0NDZiNDZhYWEucG5n
[*] 再运行消耗者,消耗者的队列绑定交换机。
[*] 末了运行生产者。
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvZGUzN2U0MWQ5Yjc4Zjg3MTI3OTg1MDZhNWRkOTMxOGMucG5n
2.2.4 路由模式
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvZTg2ZjFkNGM0YWRlYTVmMjJhZjExMTQ4MGUwMzA2ZTUucG5n
路由模式可以定向分发消息给差别的队列,区别于发布订阅模式,重要是由 路由key区分了消息的种类,根据差别的消息种类分别分发给对应的消息队列。
[*] 生产者,发布消息时必要声明绑定哪种key
package direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.ConnectionUtil;
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明路由
// direct:根据路由键进行定向分发消息
channel.exchangeDeclare("test_direct_exchange", "direct");
String msg = "用户注册,【userid=100】";
channel.basicPublish("test_direct_exchange", "insert", null, msg.getBytes());
msg = "用户查询,【userid=200】";
channel.basicPublish("test_direct_exchange", "select", null, msg.getBytes());
channel.close();
connection.close();
}
}
[*] 消耗者,一个绑定增编削的路由key,另一个绑定查询的路由key
[*] 消耗者1
package direct;
import com.rabbitmq.client.*;
import utils.ConnectionUtil;
import java.io.IOException;
public class Receiver1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("test_direct_queue1", false, false, false, null);
//队列绑定
channel.queueBind("test_direct_queue1", "test_direct_exchange", "insert");
channel.queueBind("test_direct_queue1", "test_direct_exchange", "delete");
channel.queueBind("test_direct_queue1", "test_direct_exchange", "update");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("消费者1:" + s);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//监听队列
channel.basicConsume("test_direct_queue1", false, consumer);
}
}
[*] 消耗者2
package direct;
import com.rabbitmq.client.*;
import utils.ConnectionUtil;
import java.io.IOException;
public class Receiver2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("test_direct_queue2", false, false, false, null);
//队列绑定
channel.queueBind("test_direct_queue2", "test_direct_exchange", "select");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("消费者2:" + s);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//监听队列
channel.basicConsume("test_direct_queue2", false, consumer);
}
}
[*] 测试:
[*] 先运行生产者,由于必要先创建交换机,此步的消息忽略。
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvZjdmYzU5NjVkMjQ3ZjJhYTNjNzU5MzBjM2MwZTgwY2EucG5n
[*] 再运行消耗者,消耗者的队列绑定交换机。
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvNGYxMjhhYjFkOGEwNzAwZmJmY2FhY2I5MzUzY2Y4MGMucG5n
[*] 末了运行生产者。
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvY2U2MmZiOWZhZThmZWYwODllNjlkMWQwYzIzNTY3NjgucG5n
留意:是 队列和路由键 举行绑定,当队列绑定了路由键,消耗者再监听该队列时,全部的队列信息都能拿到。通常,每个消耗者 只监听自己的消耗队列。
2.2.5 通配符模式
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvYjZhMThlNWU5ZjU5ODNjMDQ5Y2MzMTY3NmY4MTM4NzcucG5n
通配符模式 和 路由模式的区别:
[*]路由键支持暗昧匹配
匹配符号:
[*] *:只能匹配一个词(恰恰一个词,多一个不可,少一个也不可)
[*] #:匹配0个或更多个词
[*] 生产者
package topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.ConnectionUtil;
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("test_topic_exchange", "topic");
String msg = "orange_rabbit";
channel.basicPublish("test_topic_exchange", "orange.rabbit", null, msg.getBytes() );
msg = "beautiful_smart_fox";
channel.basicPublish("test_topic_exchange", "beautiful.smart.fox#.fox", null, msg.getBytes() );
channel.close();
connection.close();
}
}
[*] 消耗者
[*] 消耗者1
package topic;
import com.rabbitmq.client.*;
import utils.ConnectionUtil;
import java.io.IOException;
public class Receiver1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("test_topic_queue1", false, false, false, null);
//绑定队列
channel.queueBind("test_topic_queue1", "test_topic_exchange", "orange.*");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("消费者1:" + s);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//监听队列
channel.basicConsume("test_topic_queue1", false, consumer);
}
}
[*] 消耗者2
package topic;
import com.rabbitmq.client.*;
import utils.ConnectionUtil;
import java.io.IOException;
public class Receiver2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("test_topic_queue2", false, false, false, null);
//绑定队列
channel.queueBind("test_topic_queue2", "test_topic_exchange", "#.fox");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("消费者2:" + s);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//监听队列
channel.basicConsume("test_topic_queue2", false, consumer);
}
}
[*] 测试:
[*] 先运行生产者,由于必要先创建交换机,此步的消息忽略。
[*] 再运行消耗者,消耗者的队列绑定交换机。
[*] 末了运行生产者。
https://dis.qidao123.com/imgproxy/aHR0cHM6Ly9pLWJsb2cuY3NkbmltZy5jbi9ibG9nX21pZ3JhdGUvMjM2MDI0ZDUwNTE1YTc3NDVhZTVhYTVjM2NhOWJmMTMucG5n
2.3 消息的长期化
消息丢失:
[*]消耗者发生非常,丢失消息 --> 办理方案:手动ack
[*]服务器发生宕机 --> 办理方案:长期化
基于通配符模式代码修改
[*] 生产者修改
package persistence.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import utils.ConnectionUtil;
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明路由(路由名,路由类型,持久化)
channel.exchangeDeclare("test_topic_exchange2", "topic", true);
String msg = "orange_rabbit";
channel.basicPublish("test_topic_exchange", "orange.rabbit", null, msg.getBytes() );
msg = "beautiful_smart_fox";
//第三个参数可以让消息持久化
channel.basicPublish("test_topic_exchange", "beautiful.smart.fox#.fox", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes() );
channel.close();
connection.close();
}
}
[*] 消耗者
[*] 消耗者1
package persistence.topic;
import com.rabbitmq.client.*;
import utils.ConnectionUtil;
import java.io.IOException;
public class Receiver1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列,第二个参数为true:支持持久化
channel.queueDeclare("test_topic_queue1", true, false, false, null);
//绑定队列
channel.queueBind("test_topic_queue1", "test_topic_exchange", "orange.*");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("消费者1:" + s);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//监听队列
channel.basicConsume("test_topic_queue1", false, consumer);
}
}
[*] 消耗者2
package persistence.topic;
import com.rabbitmq.client.*;
import utils.ConnectionUtil;
import java.io.IOException;
public class Receiver2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列,第二个参数为true:支持持久化
channel.queueDeclare("test_topic_queue2", false, false, false, null);
//绑定队列
channel.queueBind("test_topic_queue2", "test_topic_exchange", "#.fox");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("消费者2:" + s);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//监听队列
channel.basicConsume("test_topic_queue2", false, consumer);
}
}
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金
页:
[1]