ToB企服应用市场:ToB评测及商务社交产业平台
标题:
RabbitMQ工作模式详解,以及Java实现
[打印本页]
作者:
立山
时间:
2024-9-6 05:28
标题:
RabbitMQ工作模式详解,以及Java实现
RabbitMQ共有六种工作模式:简单模式(Simple)、工作队列模式(Work Queue)、发布订阅模式(Publish/Subscribe)、路由模式(Routing)、通配符模式(Topics)、长途调用模式(RPC,不常用,不做讲讲授明)
一、简单模式(Simple)
特点:①一个生产者对应一个消耗者,通过队列举行消息传递。
②该模式使用direct交换机,direct交换机是RabbitMQ默认交换机。
图解:
二、工作队列模式(Work Queue)
与简单模式相比,工作队列模式(Work Queue)多了一些消耗者,该模式也使用direct交换机,应用于处置惩罚消息较多的环境。特点如下:
①一个队列对应多个消耗者。
②一条消息只会被一个消耗者消耗。
③消息队列默认采用轮询的方式将消息均匀发送给消耗者。
图解:
三、发布订阅模式(Publish/Subscribe)
在开发过程中,有一些消息需要不同消耗者举行不同的处置惩罚
特点:
①生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的
每个队列
中。
②工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换性能将消息发送给多个队列。发布订阅模式使用fanout交换机。
图解
四、路由模式(Routing)
使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多时间,不是所有消息都无差异的发布到所有队列中。特点:
①每个队列绑定路由关键字RoutingKey
②生产者将带有RoutingKey的消息发送给交换机,交换机根据RoutingKey转发到指定队列。路由模式使用
direct
交换机。
图解:
五、 通配符模式(Topics)
通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用topic交换机。
通配符规则:
①消息设置RoutingKey时,RoutingKey由多个单词构成,中央以.分割。
②队列设置RoutingKey时,#可以匹配任意多个单词,*可以匹配任意一个单词。
六、Java实现五种模式
简单模式生产者
package com.tmh.mq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* rabbitmq工作模式:简单模式,消息生产者
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂 指定IP地址、端口、连接的用户名密码、连接的虚拟主机
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.8");
connectionFactory.setPort(5672);
connectionFactory.setUsername("tangminghao");
connectionFactory.setPassword("tangminghao");
connectionFactory.setVirtualHost("/");
//2、创建连接
Connection connection = connectionFactory.newConnection();
//3、创建信道
Channel channel = connection.createChannel();
//4、创建队列
/**
* 参数一:队列名称
* 参数二:是否持久化,true表示当MQ重启后队列还在
* 参数三:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
* 参数四:是否自动删除
* 参数五:其他参数
*/
channel.queueDeclare("simplqueue", false,false,false,null);
//5、发送消息
String messges="hello simple queue";
/**
* 参数1:交换机名,""表示默认交换机
* 参数2:路由键,简单模式就是队列名
* 参数3:其他额外参数
* 参数4:要传递的消息字节数组
*/
channel.basicPublish("","simplqueue",null,messges.getBytes());
//6、关闭信道和连接
channel.close();
connection.close();
System.out.println("消息发送成功");
}
}
复制代码
简单模式消耗者
package com.tmh.mq.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//simple队列消息消费者
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂 指定IP地址、端口、连接的用户名密码、连接的虚拟主机
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.8");
connectionFactory.setPort(5672);
connectionFactory.setUsername("tangminghao");
connectionFactory.setPassword("tangminghao");
connectionFactory.setVirtualHost("/");
//2、创建连接
Connection connection = connectionFactory.newConnection();
//3、创建信道
Channel channel = connection.createChannel();
//4、监听队列
/**
* 参数1:监听的队列名
* 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息
* 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费
*/
channel.basicConsume("simplqueue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String messge = new String(body, "UTF-8");
System.out.println("接收消息,消息为:"+messge);
}
});
}
}
复制代码
工作队列模式生产者
package com.tmh.mq.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//工作队列模式
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建工厂连接
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("192.168.66.8");
connectionFactory.setPort(5672);
connectionFactory.setUsername("tangminghao");
connectionFactory.setPassword("tangminghao");
connectionFactory.setVirtualHost("/");
//建立连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//创建队列,如果队列已经存在,则使用该队列
channel.queueDeclare("workqueue",true,false,false,null);
//发送大量消息
for (int i = 0; i <100 ; i++) {
channel.basicPublish("","workqueue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
("hello 这是今天的第"+(i+1)+"条消息").getBytes());
}
//关闭资源
channel.close();
connection.close();
}
}
复制代码
工作模式消耗者
package com.tmh.mq.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建工厂连接
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("192.168.66.8");
connectionFactory.setPort(5672);
connectionFactory.setUsername("tangminghao");
connectionFactory.setPassword("tangminghao");
connectionFactory.setVirtualHost("/");
//建立连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//监听队列
channel.basicConsume("workqueue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "UTF-8");
System.out.println("消费者1消费消息:"+s);
}
});
}
}
-----------------------------------------------------------------------------------------
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建工厂连接
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("192.168.66.8");
connectionFactory.setPort(5672);
connectionFactory.setUsername("tangminghao");
connectionFactory.setPassword("tangminghao");
connectionFactory.setVirtualHost("/");
//建立连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//监听队列
channel.basicConsume("workqueue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "UTF-8");
System.out.println("消费者2消费消息:"+s);
}
});
}
}
-----------------------------------------------------------------------------------------
public class Consumer3 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建工厂连接
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("192.168.66.8");
connectionFactory.setPort(5672);
connectionFactory.setUsername("tangminghao");
connectionFactory.setPassword("tangminghao");
connectionFactory.setVirtualHost("/");
//建立连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//监听队列
channel.basicConsume("workqueue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "UTF-8");
System.out.println("消费者1消费消息:"+s);
}
});
}
}
复制代码
发布订阅模式生产者
package com.tmh.mq.publish;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.8");
connectionFactory.setPort(5672);
connectionFactory.setUsername("tangminghao");
connectionFactory.setPassword("tangminghao");
connectionFactory.setVirtualHost("/");
//2、建立连接
final Connection connection = connectionFactory.newConnection();
//3、建立信道
final Channel channel = connection.createChannel();
//4、创建交换机
/**
* 参数一:交换机名
* 参数二:交换机类型
* 参数三:交换机持久化
*/
channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true);
//5、创建队列
channel.queueDeclare("MAIL_QUEUE", true,false,false,null);
channel.queueDeclare("MESSAGE_QUEUE", true,false,false,null);
channel.queueDeclare("STATION_QUEUE", true,false,false,null);
//6、队列绑定交换机
/**
* 参数一:队列名
* 参数二:交换机名字
* 参数三:路由关键字,发布订阅模式写“”即可
*/
channel.queueBind("MAIL_QUEUE","exchange_fanout","");
channel.queueBind("MESSAGE_QUEUE","exchange_fanout","");
channel.queueBind("STATION_QUEUE","exchange_fanout","");
//7、发送消息
for (int i = 1; i <=10 ; i++) {
channel.basicPublish("exchange_fanout","",null,
("你好,发布订阅模式"+i).getBytes(StandardCharsets.UTF_8));
}
//8、关闭资源
channel.close();
connection.close();
}
}
复制代码
发布订阅模式消耗者
package com.tmh.mq.publish;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerMail {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.8");
connectionFactory.setPort(5672);
connectionFactory.setUsername("tangminghao");
connectionFactory.setPassword("tangminghao");
connectionFactory.setVirtualHost("/");
//2、建立连接
final Connection connection = connectionFactory.newConnection();
//3、建立信道
final Channel channel = connection.createChannel();
//4、监听队列
channel.basicConsume("MAIL_QUEUE",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
final String s = new String(body, "UTF-8");
System.out.println("邮件消息:"+s);
}
});
}
}
-----------------------------------------------------------------------------------------
public class ConsumerMessage {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.8");
connectionFactory.setPort(5672);
connectionFactory.setUsername("tangminghao");
connectionFactory.setPassword("tangminghao");
connectionFactory.setVirtualHost("/");
//2、建立连接
final Connection connection = connectionFactory.newConnection();
//3、建立信道
final Channel channel = connection.createChannel();
//4、监听队列
channel.basicConsume("MESSAGE_QUEUE",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
final String s = new String(body, "UTF-8");
System.out.println("短信消息:"+s);
}
});
}
}
-----------------------------------------------------------------------------------------
public class ConsumerStation {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.8");
connectionFactory.setPort(5672);
connectionFactory.setUsername("tangminghao");
connectionFactory.setPassword("tangminghao");
connectionFactory.setVirtualHost("/");
//2、建立连接
final Connection connection = connectionFactory.newConnection();
//3、建立信道
final Channel channel = connection.createChannel();
//4、监听队列
channel.basicConsume("STATION_QUEUE",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
final String s = new String(body, "UTF-8");
System.out.println("站内消息:"+s);
}
});
}
}
复制代码
路由模式生产者
package com.tmh.mq.routing;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.8");
connectionFactory.setPort(5672);
connectionFactory.setUsername("tangminghao");
connectionFactory.setPassword("tangminghao");
connectionFactory.setVirtualHost("/");
//2、建立连接
final Connection connection = connectionFactory.newConnection();
//3、建立信道
final Channel channel = connection.createChannel();
//4、创建交换机
/**
* 参数一:交换机名
* 参数二:交换机类型
* 参数三:交换机持久化
*/
channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true);
//5、创建队列
channel.queueDeclare("MAIL_QUEUE2", true,false,false,null);
channel.queueDeclare("MESSAGE_QUEUE2", true,false,false,null);
channel.queueDeclare("STATION_QUEUE2", true,false,false,null);
//6、队列绑定交换机
/**
* 参数一:队列名
* 参数二:交换机名字
* 参数三:路由关键字,发布订阅模式写“”即可
*/
channel.queueBind("MAIL_QUEUE2","exchange_routing","important");
channel.queueBind("MESSAGE_QUEUE2","exchange_routing","important");
channel.queueBind("STATION_QUEUE2","exchange_routing","important");
channel.queueBind("STATION_QUEUE2","exchange_routing","normal");
//7、发送消息
channel.basicPublish("exchange_routing","important",null,
("你好,路由模式,这是重要消息").getBytes(StandardCharsets.UTF_8));
channel.basicPublish("exchange_routing","normal",null,
("你好,路由模式,这是不太重要的消息").getBytes(StandardCharsets.UTF_8));
//8、关闭资源
channel.close();
connection.close();
}
}
复制代码
路由模式消耗者
package com.tmh.mq.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerMail {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.8");
connectionFactory.setPort(5672);
connectionFactory.setUsername("tangminghao");
connectionFactory.setPassword("tangminghao");
connectionFactory.setVirtualHost("/");
//2、建立连接
final Connection connection = connectionFactory.newConnection();
//3、建立信道
final Channel channel = connection.createChannel();
//4、监听队列
channel.basicConsume("MAIL_QUEUE2",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
final String s = new String(body, "UTF-8");
System.out.println("邮件消息:"+s);
}
});
}
}
-----------------------------------------------------------------------------------------
public class ConsumerMessage {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.8");
connectionFactory.setPort(5672);
connectionFactory.setUsername("tangminghao");
connectionFactory.setPassword("tangminghao");
connectionFactory.setVirtualHost("/");
//2、建立连接
final Connection connection = connectionFactory.newConnection();
//3、建立信道
final Channel channel = connection.createChannel();
//4、监听队列
channel.basicConsume("MESSAGE_QUEUE2",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
final String s = new String(body, "UTF-8");
System.out.println("短信消息:"+s);
}
});
}
}
-----------------------------------------------------------------------------------------
public class ConsumerStation {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.8");
connectionFactory.setPort(5672);
connectionFactory.setUsername("tangminghao");
connectionFactory.setPassword("tangminghao");
connectionFactory.setVirtualHost("/");
//2、建立连接
final Connection connection = connectionFactory.newConnection();
//3、建立信道
final Channel channel = connection.createChannel();
//4、监听队列
channel.basicConsume("STATION_QUEUE2",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
final String s = new String(body, "UTF-8");
System.out.println("站内消息:"+s);
}
});
}
}
复制代码
通配符模式 生产者
package com.tmh.mq.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.8");
connectionFactory.setPort(5672);
connectionFactory.setUsername("tangminghao");
connectionFactory.setPassword("tangminghao");
connectionFactory.setVirtualHost("/");
//2、建立连接
final Connection connection = connectionFactory.newConnection();
//3、建立信道
final Channel channel = connection.createChannel();
//4、创建交换机
/**
* 参数一:交换机名
* 参数二:交换机类型
* 参数三:交换机持久化
*/
channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true);
//5、创建队列
channel.queueDeclare("MAIL_QUEUE3", true,false,false,null);
channel.queueDeclare("MESSAGE_QUEUE3", true,false,false,null);
channel.queueDeclare("STATION_QUEUE3", true,false,false,null);
//6、队列绑定交换机
/**
* 参数一:队列名
* 参数二:交换机名字
* 参数三:路由关键字,发布订阅模式写“”即可
*/
channel.queueBind("MAIL_QUEUE3","exchange_topic","#.mail.#");
channel.queueBind("MESSAGE_QUEUE3","exchange_topic","#.message.#");
channel.queueBind("STATION_QUEUE3","exchange_topic","#.station.#");
//7、发送消息
channel.basicPublish("exchange_topic","mail.message.station",null,
("你好,通配符模式,这是重要消息").getBytes(StandardCharsets.UTF_8));
channel.basicPublish("exchange_topic","station",null,
("你好,通配符模式,这是不太重要的消息").getBytes(StandardCharsets.UTF_8));
}
}
复制代码
通配符模式消耗者
package com.tmh.mq.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerMail {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.8");
connectionFactory.setPort(5672);
connectionFactory.setUsername("tangminghao");
connectionFactory.setPassword("tangminghao");
connectionFactory.setVirtualHost("/");
//2、建立连接
final Connection connection = connectionFactory.newConnection();
//3、建立信道
final Channel channel = connection.createChannel();
//4、监听队列
channel.basicConsume("MAIL_QUEUE3",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
final String s = new String(body, "UTF-8");
System.out.println("邮件消息:"+s);
}
});
}
}
-----------------------------------------------------------------------------------------
public class ConsumerMessage {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.8");
connectionFactory.setPort(5672);
connectionFactory.setUsername("tangminghao");
connectionFactory.setPassword("tangminghao");
connectionFactory.setVirtualHost("/");
//2、建立连接
final Connection connection = connectionFactory.newConnection();
//3、建立信道
final Channel channel = connection.createChannel();
//4、监听队列
channel.basicConsume("MESSAGE_QUEUE3",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
final String s = new String(body, "UTF-8");
System.out.println("短信消息:"+s);
}
});
}
}
-----------------------------------------------------------------------------------------
public class ConsumerStation {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.8");
connectionFactory.setPort(5672);
connectionFactory.setUsername("tangminghao");
connectionFactory.setPassword("tangminghao");
connectionFactory.setVirtualHost("/");
//2、建立连接
final Connection connection = connectionFactory.newConnection();
//3、建立信道
final Channel channel = connection.createChannel();
//4、监听队列
channel.basicConsume("STATION_QUEUE3",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
final String s = new String(body, "UTF-8");
System.out.println("站内消息:"+s);
}
});
}
}
复制代码
在Java实现中有很多注释没有改过来,由于很多模式之间变化不大,所以就复制粘贴了
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4