在 RabbitMQ 中,互换机(Exchange)是一个核心组件,负责接收来自生产者的消息,并根据特定的路由规则将消息分发到相应的队列。互换机的存在改变了消息发送的模式,使得消息的路由更加灵活和高效。
互换机的类型
RabbitMQ 提供了四种主要类型的互换机,每种互换机的路由规则不同:
- Direct Exchange(直连互换机):
- 功能:基于路由键(Routing Key)将消息发送到与该路由键完全匹配的队列。
- 应用场景:适用于需要准确匹配路由键的场景。
- 示例:假设有两个队列 A 和 B,A 绑定了路由键 key1,B 绑定了路由键 key2。当生产者发送一条路由键为 key1 的消息时,只有队列 A 会接收到这条消息。
- Fanout Exchange(扇出互换机):
- 功能:将消息广播到全部绑定到该互换机的队列,不思量路由键。
- 应用场景:适用于需要将消息广播到多个队列的场景。
- 示例:假设有两个队列 A 和 B 都绑定到了一个 Fanout 互换机上。当生产者发送一条消息到该互换机时,A 和 B 都会接收到这条消息。
- Topic Exchange(主题互换机):
- 功能:基于路由键的模式匹配(使用通配符)将消息发送到匹配的队列。
- 应用场景:适用于需要基于模式匹配路由键的场景。
- 示例:假设有两个队列 A 和 B,A 绑定了路由键模式 key.*,B 绑定了路由键模式 key.#。当生产者发送一条路由键为 key.test 的消息时,A 和 B 都会接收到这条消息。
- Headers Exchange(头互换机):
- 功能:基于消息的头部属性进行匹配,将消息发送到匹配的队列。
- 应用场景:适用于需要基于消息头部属性进行路由的场景。
- 示例:这种互换机使用较少,通常在特定环境下才会使用。
互换机的作用
- 消息路由:互换机根据路由规则将消息分发到相应的队列。
- 解耦生产者和消费者:生产者只需将消息发送到互换机,不需要知道消息的最终目标地队列。
- 灵活性和扩展性:通过不同类型的互换机,可以实现复杂的消息路由逻辑,满足各种业务需求。
示例代码
以下是怎样使用 Direct Exchange 和 Fanout Exchange 的示例代码:
Direct Exchange 示例
- const amqp = require('amqplib/callback_api');
- amqp.connect('amqp://localhost', function(error0, connection) {
- if (error0) {
- throw error0;
- }
- connection.createChannel(function(error1, channel) {
- if (error1) {
- throw error1;
- }
- const exchange = 'direct_logs';
- const msg = 'Hello World!';
- const routingKey = 'key1';
- channel.assertExchange(exchange, 'direct', { durable: true });
- channel.publish(exchange, routingKey, Buffer.from(msg));
- console.log(" [x] Sent %s: '%s'", routingKey, msg);
- });
- setTimeout(function() {
- connection.close();
- process.exit(0);
- }, 500);
- });
复制代码 Fanout Exchange 示例
- const amqp = require('amqplib/callback_api');
- amqp.connect('amqp://localhost', function(error0, connection) {
- if (error0) {
- throw error0;
- }
- connection.createChannel(function(error1, channel) {
- if (error1) {
- throw error1;
- }
- const exchange = 'logs';
- const msg = 'Hello World!';
- channel.assertExchange(exchange, 'fanout', { durable: true });
- channel.publish(exchange, '', Buffer.from(msg));
- console.log(" [x] Sent %s", msg);
- });
- setTimeout(function() {
- connection.close();
- process.exit(0);
- }, 500);
- });
复制代码 Topic Exchange 示例
Topic Exchange 允许使用通配符进行路由,支持更复杂的路由规则。
发布者代码
- const amqp = require('amqplib/callback_api');
- amqp.connect('amqp://localhost', function(error0, connection) {
- if (error0) {
- throw error0;
- }
- connection.createChannel(function(error1, channel) {
- if (error1) {
- throw error1;
- }
- const exchange = 'topic_logs';
- const msg = 'Hello World!';
- const routingKey = 'quick.orange.rabbit';
- channel.assertExchange(exchange, 'topic', { durable: true });
- channel.publish(exchange, routingKey, Buffer.from(msg));
- console.log(" [x] Sent %s: '%s'", routingKey, msg);
- });
- setTimeout(function() {
- connection.close();
- process.exit(0);
- }, 500);
- });
复制代码 消费者代码
- const amqp = require('amqplib/callback_api');
- amqp.connect('amqp://localhost', function(error0, connection) {
- if (error0) {
- throw error0;
- }
- connection.createChannel(function(error1, channel) {
- if (error1) {
- throw error1;
- }
- const exchange = 'topic_logs';
- const queue = 'topic_queue';
- channel.assertExchange(exchange, 'topic', { durable: true });
- channel.assertQueue(queue, { durable: true });
- // 绑定队列到交换机,使用通配符
- channel.bindQueue(queue, exchange, '*.orange.*');
- channel.consume(queue, function(msg) {
- if (msg.content) {
- console.log(" [x] Received %s: '%s'", msg.fields.routingKey, msg.content.toString());
- }
- }, { noAck: true });
- });
- });
复制代码 在这个示例中,发布者将消息发送到 topic_logs 互换机,使用路由键 quick.orange.rabbit。消费者绑定到 topic_logs 互换机,使用通配符 *.orange.*,因此会接收到全部包含 orange 的消息。
Headers Exchange 示例
Headers Exchange 基于消息头部属性进行路由,适用于需要复杂路由规则的场景。
发布者代码
- const amqp = require('amqplib/callback_api');
- amqp.connect('amqp://localhost', function(error0, connection) {
- if (error0) {
- throw error0;
- }
- connection.createChannel(function(error1, channel) {
- if (error1) {
- throw error1;
- }
- const exchange = 'headers_logs';
- const msg = 'Hello World!';
- channel.assertExchange(exchange, 'headers', { durable: true });
- channel.publish(exchange, '', Buffer.from(msg), {
- headers: {
- 'format': 'pdf',
- 'type': 'report'
- }
- });
- console.log(" [x] Sent %s", msg);
- });
- setTimeout(function() {
- connection.close();
- process.exit(0);
- }, 500);
- });
复制代码 消费者代码
- const amqp = require('amqplib/callback_api');
- amqp.connect('amqp://localhost', function(error0, connection) {
- if (error0) {
- throw error0;
- }
- connection.createChannel(function(error1, channel) {
- if (error1) {
- throw error1;
- }
- const exchange = 'headers_logs';
- const queue = 'headers_queue';
- channel.assertExchange(exchange, 'headers', { durable: true });
- channel.assertQueue(queue, { durable: true });
- // 绑定队列到交换机,使用头部属性
- channel.bindQueue(queue, exchange, '', {
- 'x-match': 'all',
- 'format': 'pdf',
- 'type': 'report'
- });
- channel.consume(queue, function(msg) {
- if (msg.content) {
- console.log(" [x] Received %s", msg.content.toString());
- }
- }, { noAck: true });
- });
- });
复制代码 在这个示例中,发布者将消息发送到 headers_logs 互换机,并设置消息头部属性 format: pdf 和 type: report。消费者绑定到 headers_logs 互换机,使用头部属性匹配 format: pdf 和 type: report,因此会接收到符合这些头部属性的消息。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |