ToB企服应用市场:ToB评测及商务社交产业平台

标题: RabbitMQ 互换机的类型 [打印本页]

作者: 飞不高    时间: 2025-2-17 14:57
标题: RabbitMQ 互换机的类型
在 RabbitMQ 中,互换机(Exchange)是一个核心组件,负责接收来自生产者的消息,并根据特定的路由规则将消息分发到相应的队列。互换机的存在改变了消息发送的模式,使得消息的路由更加灵活和高效。
互换机的类型

RabbitMQ 提供了四种主要类型的互换机,每种互换机的路由规则不同:
互换机的作用


示例代码

以下是怎样使用 Direct Exchange 和 Fanout Exchange 的示例代码:
Direct Exchange 示例

  1. const amqp = require('amqplib/callback_api');
  2. amqp.connect('amqp://localhost', function(error0, connection) {
  3.   if (error0) {
  4.     throw error0;
  5.   }
  6.   connection.createChannel(function(error1, channel) {
  7.     if (error1) {
  8.       throw error1;
  9.     }
  10.     const exchange = 'direct_logs';
  11.     const msg = 'Hello World!';
  12.     const routingKey = 'key1';
  13.     channel.assertExchange(exchange, 'direct', { durable: true });
  14.     channel.publish(exchange, routingKey, Buffer.from(msg));
  15.     console.log(" [x] Sent %s: '%s'", routingKey, msg);
  16.   });
  17.   setTimeout(function() {
  18.     connection.close();
  19.     process.exit(0);
  20.   }, 500);
  21. });
复制代码
Fanout Exchange 示例

  1. const amqp = require('amqplib/callback_api');
  2. amqp.connect('amqp://localhost', function(error0, connection) {
  3.   if (error0) {
  4.     throw error0;
  5.   }
  6.   connection.createChannel(function(error1, channel) {
  7.     if (error1) {
  8.       throw error1;
  9.     }
  10.     const exchange = 'logs';
  11.     const msg = 'Hello World!';
  12.     channel.assertExchange(exchange, 'fanout', { durable: true });
  13.     channel.publish(exchange, '', Buffer.from(msg));
  14.     console.log(" [x] Sent %s", msg);
  15.   });
  16.   setTimeout(function() {
  17.     connection.close();
  18.     process.exit(0);
  19.   }, 500);
  20. });
复制代码
Topic Exchange 示例

Topic Exchange 允许使用通配符进行路由,支持更复杂的路由规则。
发布者代码

  1. const amqp = require('amqplib/callback_api');
  2. amqp.connect('amqp://localhost', function(error0, connection) {
  3.   if (error0) {
  4.     throw error0;
  5.   }
  6.   connection.createChannel(function(error1, channel) {
  7.     if (error1) {
  8.       throw error1;
  9.     }
  10.     const exchange = 'topic_logs';
  11.     const msg = 'Hello World!';
  12.     const routingKey = 'quick.orange.rabbit';
  13.     channel.assertExchange(exchange, 'topic', { durable: true });
  14.     channel.publish(exchange, routingKey, Buffer.from(msg));
  15.     console.log(" [x] Sent %s: '%s'", routingKey, msg);
  16.   });
  17.   setTimeout(function() {
  18.     connection.close();
  19.     process.exit(0);
  20.   }, 500);
  21. });
复制代码
消费者代码

  1. const amqp = require('amqplib/callback_api');
  2. amqp.connect('amqp://localhost', function(error0, connection) {
  3.   if (error0) {
  4.     throw error0;
  5.   }
  6.   connection.createChannel(function(error1, channel) {
  7.     if (error1) {
  8.       throw error1;
  9.     }
  10.     const exchange = 'topic_logs';
  11.     const queue = 'topic_queue';
  12.     channel.assertExchange(exchange, 'topic', { durable: true });
  13.     channel.assertQueue(queue, { durable: true });
  14.     // 绑定队列到交换机,使用通配符
  15.     channel.bindQueue(queue, exchange, '*.orange.*');
  16.     channel.consume(queue, function(msg) {
  17.       if (msg.content) {
  18.         console.log(" [x] Received %s: '%s'", msg.fields.routingKey, msg.content.toString());
  19.       }
  20.     }, { noAck: true });
  21.   });
  22. });
复制代码
在这个示例中,发布者将消息发送到 topic_logs 互换机,使用路由键 quick.orange.rabbit。消费者绑定到 topic_logs 互换机,使用通配符 *.orange.*,因此会接收到全部包含 orange 的消息。
Headers Exchange 示例

Headers Exchange 基于消息头部属性进行路由,适用于需要复杂路由规则的场景。
发布者代码

  1. const amqp = require('amqplib/callback_api');
  2. amqp.connect('amqp://localhost', function(error0, connection) {
  3.   if (error0) {
  4.     throw error0;
  5.   }
  6.   connection.createChannel(function(error1, channel) {
  7.     if (error1) {
  8.       throw error1;
  9.     }
  10.     const exchange = 'headers_logs';
  11.     const msg = 'Hello World!';
  12.     channel.assertExchange(exchange, 'headers', { durable: true });
  13.     channel.publish(exchange, '', Buffer.from(msg), {
  14.       headers: {
  15.         'format': 'pdf',
  16.         'type': 'report'
  17.       }
  18.     });
  19.     console.log(" [x] Sent %s", msg);
  20.   });
  21.   setTimeout(function() {
  22.     connection.close();
  23.     process.exit(0);
  24.   }, 500);
  25. });
复制代码
消费者代码

  1. const amqp = require('amqplib/callback_api');
  2. amqp.connect('amqp://localhost', function(error0, connection) {
  3.   if (error0) {
  4.     throw error0;
  5.   }
  6.   connection.createChannel(function(error1, channel) {
  7.     if (error1) {
  8.       throw error1;
  9.     }
  10.     const exchange = 'headers_logs';
  11.     const queue = 'headers_queue';
  12.     channel.assertExchange(exchange, 'headers', { durable: true });
  13.     channel.assertQueue(queue, { durable: true });
  14.     // 绑定队列到交换机,使用头部属性
  15.     channel.bindQueue(queue, exchange, '', {
  16.       'x-match': 'all',
  17.       'format': 'pdf',
  18.       'type': 'report'
  19.     });
  20.     channel.consume(queue, function(msg) {
  21.       if (msg.content) {
  22.         console.log(" [x] Received %s", msg.content.toString());
  23.       }
  24.     }, { noAck: true });
  25.   });
  26. });
复制代码
在这个示例中,发布者将消息发送到 headers_logs 互换机,并设置消息头部属性 format: pdf 和 type: report。消费者绑定到 headers_logs 互换机,使用头部属性匹配 format: pdf 和 type: report,因此会接收到符合这些头部属性的消息。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4