- Publisher:生产者,不再发送消息到队列中,而是发给交换机
- Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处置惩罚消息,例如递交给某个特别队列、递交给所有队列、或是将消息抛弃。到底如何操作,取决于Exchange的范例。
- Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列肯定要与交换机绑定。
- Consumer:斲丧者,与以前一样,订阅队列,没有变化
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的范例有四种:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
Fanout交换机
简朴点来说,就是生产者把消息发给交换机,交换机根据路由(绑定规则)来转发消息给队列,斲丧者订阅队列,获得消息。
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。
在广播模式下,消息发送流程是这样的:
- 1) 可以有多个队列
- 2) 每个队列都要绑定到Exchange(交换机)
- 3) 生产者发送的消息,只能发送到交换机
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的斲丧者都能拿到消息
注意:我下面的代码都是在上一个加依靠的底子上的,可看我上一个文档
01使用官方文档的
消息发送:
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.util.Scanner;
- public class EmitLog {
- //定义交换机
- private static final String EXCHANGE_NAME = "fanout-exchange";
- public static void main(String[] argv) throws Exception {
- //创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- //建立连接和通道
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- //通过channel.exchangeDeclare方法声明了一个名为EXCHANGE_NAME(即"logs")的交换机
- // 并指定了其类型为fanout。fanout类型的交换机会将消息广播到所有与之绑定的队列中。
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- //发送消息
- Scanner scanner=new Scanner(System.in);
- while(scanner.hasNext()){
- String message = scanner.nextLine();
- //channel.basicPublish方法将消息发布到前面声明的交换机中。
- // 注意,这里的routingKey(即第二个参数)为空字符串"",因为对于fanout类型的交换机来说,routingKey是不起作用的。
- channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
- System.out.println(" [x] Sent '" + message + "'");
- }
- }
- }
- }
复制代码 接收:
注意肯定要创建队列,不然只有交换机没用
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.DeliverCallback;
- public class ReceiveLogs {
- private static final String EXCHANGE_NAME = "fanout-exchange";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- //创建通道
- Channel channel1 = connection.createChannel();
- Channel channel2 = connection.createChannel();
- //声明交换机
- channel1.exchangeDeclare(EXCHANGE_NAME, "fanout");
- channel2.exchangeDeclare(EXCHANGE_NAME, "fanout");
- //队列的名字
- String queueName = "星星";
- //创建队列
- channel1.queueDeclare(queueName, true, false, false, null);
- channel1.queueBind(queueName, EXCHANGE_NAME, "");
- String queueName1 = "晨晨";
- //创建队列
- channel2.queueDeclare(queueName1, true, false, false, null);
- channel2.queueBind(queueName1, EXCHANGE_NAME, "");
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [1] Received '" + message + "'");
- };
- DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [2] Received '" + message + "'");
- };
- channel1.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
- channel2.basicConsume(queueName1, true, deliverCallback2, consumerTag -> { });
- }
- }
复制代码
可以看到fanout范例,生产者发送一个消息,所有的斲丧者都能接收到,这个范例不消设置路由
02注解形式的:
消息发送
先在配置类中声明交换机,队列,以及绑定关系
- public static final String FANOUT_QUEUE_1 = "fanout.queue.1";
- public static final String FANOUT_QUEUE_2 = "fanout.queue.2";
- public static final String FANOUT_EXCHANGE = "fanout.exchange";
- @Bean
- public Queue fanoutQueue1() {
- return new Queue(FANOUT_QUEUE_1);
- }
- @Bean
- public Queue fanoutQueue2() {
- return new Queue(FANOUT_QUEUE_2);
- }
- @Bean
- public FanoutExchange fanoutExchange() {
- return new FanoutExchange(FANOUT_EXCHANGE);
- }
- @Bean
- public Binding binding1() {
- return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
- }
- @Bean
- public Binding binding2() {
- return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
- }
复制代码 Test类中添加测试方法:
- @Test
- public void testFanoutExchange() {
- // 交换机名称
- String exchangeName = "fanout.exchange";
- // 消息
- String message = "hello, everyone!";
- rabbitTemplate.convertAndSend(exchangeName, "", message);
- }
复制代码 消息接收
在添加两个方法,作为斲丧者:
- @RabbitListener(queues = "fanout.queue.1")
- public void listenFanoutQueue1(String msg) {
- System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
- }
- @RabbitListener(queues = "fanout.queue.2")
- public void listenFanoutQueue2(String msg) {
- System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
- }
复制代码
总结
交换机的作用是什么?
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExchange的会将消息路由到每个绑定的队列
Direct交换机
在Fanout模式中,一条消息,会被所有订阅的队列都斲丧。但是,在某些场景下,我们希望不同的消息被不同的队列斲丧。这时就要用到Direct范例的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
- 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判定,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
此处我省略了官方文档那种,直接springboot注解那种的,而且不再bean。而是全注解那种的,也是对黑马代码的进一步优化,不手动操作添加
下面是黑马的案例
案例需求如图:
- 声明一个名为hmall.direct的交换机
- 声明队列direct.queue1,绑定hmall.direct,bindingKey为blud和red
- 声明队列direct.queue2,绑定hmall.direct,bindingKey为yellow和red
- 在consumer服务中,编写两个斲丧者方法,分别监听direct.queue1和direct.queue2
- 在publisher中编写测试方法,向hmall.direct发送消息
先在配置类加入下面的然后启动一下:
- /** 基于注解的来声明交换机和队列及其绑定关系 */
- @RabbitListener( bindings = @QueueBinding(
- exchange = @Exchange(name = "heima.direct", type = ExchangeTypes.DIRECT),
- value = @org.springframework.amqp.rabbit.annotation.Queue(name = "direct.queue1"),
- key = {"red", "blue"}
- ))
- public void rabbitListener5(String message) {
- System.out.println("红蓝: " + message);
- }
- @RabbitListener( bindings = @QueueBinding(
- exchange = @Exchange(name = "heima.direct", type = ExchangeTypes.DIRECT),
- value = @org.springframework.amqp.rabbit.annotation.Queue(name = "direct.queue2"),
- key = {"yellow","red"}
- ))
- public void rabbitListener6(String message) {
- System.out.println("黄红: " + message);
- }
复制代码 消息发送
在Test类中添加测试方法:
- @Test
- public void testSendDirectExchange() {
- // 交换机名称
- String exchangeName = "heima.direct";
- // 消息
- String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
- // 发送消息
- rabbitTemplate.convertAndSend(exchangeName, "red", message);
- }
- @Test
- public void testSendDirectExchange01() {
- // 交换机名称
- String exchangeName = "heima.direct";
- // 消息
- String message = "最新报道,哥斯拉是居民自治巨型气球,虚惊一场!";
- // 发送消息
- rabbitTemplate.convertAndSend(exchangeName, "blue", message);
- }
复制代码 消息接收:
- @RabbitListener(queues = "direct.queue1")
- public void listenDirectQueue1(String msg) {
- System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
- }
- @RabbitListener(queues = "direct.queue2")
- public void listenDirectQueue2(String msg) {
- System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
- }
复制代码 先点击测试的谁人red的运行一下,在启动项目:
我们再切换为blue这个key:
你会发现,只有斲丧者1收到了消息:
###总结
描述下Direct交换机与Fanout交换机的差别?
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判定路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |