ToB企服应用市场:ToB评测及商务社交产业平台

标题: 百万架构师第四十课:RabbitMq:RabbitMq-工作模子与JAVA编程|JavaGuide [打印本页]

作者: 祗疼妳一个    时间: 2025-2-19 22:45
标题: 百万架构师第四十课:RabbitMq:RabbitMq-工作模子与JAVA编程|JavaGuide
来源https://javaguide.net
RabbitMQ 1-工作模子与Java编程

课前准备

预习资料

Windows安装步调
Linux安装步调
官网文章中文翻译系列
环境阐明

操作系统:CentOS 7 JDK:1.8
Erlang:19.0.4或最新版
RabbitMQ:3.6.12或最新版 版本对应关系
典型应用场景

基本介绍

AMQP协议

​        AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层尺度高级消息队列协议,是应用层协议的一个开放尺度,为面向消息的中心件设计。基于此协议的客户端与消息中心件可传递消息,并不受客户端/中心件不同产品、不同的开发语言等条件的限定。
​        AMQP的实现有:RabbitMQ、OpenAMQ、Apache Qpid、Redhat Enterprise MRG、AMQP Infrastructure、MQ、Zyre等。
RabbitMQ的特性

RabbitMQ利用Erlang语言编写,利用Mnesia数据库存储消息。
(1)可靠性(Reliability) RabbitMQ 利用一些机制来保证可靠性,如持久化、传输确认、发布确认。
(2)灵活的路由(Flexible Routing) 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现本身的 Exchange 。
(3)消息集群(Clustering) 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
(4)高可用(Highly Available Queues) 队列可以在集群中的呆板上进行镜像,使得在部分节点出问题的环境下队列仍旧可用。
(5)多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 AMQP、STOMP、MQTT 等等。
(6)多语言客户端(Many Clients) RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby、PHP、C#、JavaScript 等等。
(7)管理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点。
(8)插件机制(Plugin System)
RabbitMQ提供了很多插件,以实现从多方面扩展,当然也可以编写本身的插件。

工作模子

概念解释Broker即RabbitMQ的实体服务器。提供一种传输服务,维护一条从生产者到消费者的传输线路, 保证消息数据能按照指定的方式传输。Exchange消息交换机。指定消息按照什么规则路由到哪个队列Queue。Queue消息队列。消息的载体,每条消息都会被投送到一个或多个队列中。Binding绑定。作用就是将Exchange和Queue按照某种路由规则绑定起来。Routing Key路由关键字。Exchange根据Routing   Key进行消息投递。界说绑定时指定的关键字称为Binding Key。Vhost虚拟主机。一个Broker可以有多个虚拟主机,用作不同用户的权限分离。一个虚拟主机持有一组Exchange、Queue和Binding。Producer消息生产者。主要将消息投递到对应的Exchange上面。一般是独立的步伐。Consumer消息消费者。消息的接收者,一般是独立的步伐。ConnectionProducer 和 Consumer 与Broker之间的TCP长连接。Channel消息通道,也称信道。在客户端的每个连接里可以建立多个Channel,每个Channel代表一 个会话任务。在RabbitMQ Java Client   API中,channel上界说了大量的编程接口。三种主要的交换机
Direct Exchange 直连交换机

界说:直连范例的交换机与一个队列绑定时,需要指定一个明确的binding key。
路由规则:发送消息到直连范例的交换机时,只有routing key跟binding  key完全匹配时,绑定的队列才气收到消息。
比方:
  1. // 只有队列1能收到消息
  2. channel.basicPublish("MY_DIRECT_EXCHANGE", "key1", null, msg.getBytes());
复制代码

Topic Exchange 主题交换机

界说:主题范例的交换机与一个队列绑定时,可以指定按模式匹配的routing key。
通配符有两个,*代表匹配一个单词。#代表匹配零个大概多个单词。单词与单词之间用 . 隔开。
路由规则:发送消息到主题范例的交换机时,routing key符合binding key的模式时,绑定的队列才气收到消息。
比方:
  1. // 只有队列1能收到消息
  2. channel.basicPublish("MY_TOPIC_EXCHANGE", "sh.abc", null, msg.getBytes());
  3. // 队列2和队列3能收到消息
  4. channel.basicPublish("MY_TOPIC_EXCHANGE", "bj.book", null, msg.getBytes());
  5. // 只有队列4能收到消息
  6. channel.basicPublish("MY_TOPIC_EXCHANGE", "abc.def.food", null, msg.getBytes());
复制代码

Fanout Exchange 广播交换机

界说:广播范例的交换机与一个队列绑定时,不需要指定binding key。
路由规则:当消息发送到广播范例的交换机时,不需要指定routing key,所有与之绑定的队列都能收到消息。
比方:
  1. // 3个队列都会收到消息
  2. channel.basicPublish("MY_FANOUT_EXCHANGE", "", null, msg.getBytes());
复制代码

Java API 编程

创建Maven工程,pom.xml引入依赖
  1. <dependency>
  2.     <groupId>com.rabbitmq</groupId>
  3.     <artifactId>amqp-client</artifactId>
  4.     <version>4.1.0</version>
  5. </dependency>
复制代码
生产者
  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. public class MyProducer {
  5.    private final static String QUEUE_NAME = "ORIGIN_QUEUE";
  6.    public static void main(String[] args) throws Exception {
  7.        ConnectionFactory factory = new ConnectionFactory();
  8.        // 连接IP
  9.        factory.setHost("127.0.0.1");
  10.        // 连接端口
  11.        factory.setPort(5672);
  12.        // 虚拟机
  13.        factory.setVirtualHost("/");
  14.        // 用户
  15.        factory.setUsername("guest");
  16.        factory.setPassword("guest");
  17.        // 建立连接
  18.        Connection conn = factory.newConnection();
  19.        // 创建消息通道
  20.        Channel channel = conn.createChannel();
  21.        String msg = "Hello world, Rabbit MQ";
  22.        // 声明队列
  23.        // String queue, boolean durable, boolean exclusive, boolean autoDelete,
  24.        Map<String, Object> arguments
  25.            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  26.        // 发送消息(发送到默认交换机AMQP Default,Direct)
  27.        // 如果有一个队列名称跟Routing Key相等,那么消息会路由到这个队列
  28.        // String exchange, String routingKey, BasicProperties props, byte[] body
  29.        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
  30.        channel.close();
  31.        conn.close();
  32.    }
  33. }
复制代码
消费者
  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. public class MyConsumer {
  4.    private final static String QUEUE_NAME = "ORIGIN_QUEUE";
  5.    public static void main(String[] args) throws Exception {
  6.        ConnectionFactory factory = new ConnectionFactory();
  7.        // 连接IP
  8.        factory.setHost("127.0.0.1");
  9.        // 默认监听端口
  10.        factory.setPort(5672);
  11.        // 虚拟机
  12.        factory.setVirtualHost("/");
  13.        // 设置访问的用户
  14.        factory.setUsername("guest");
  15.        factory.setPassword("guest");
  16.        // 建立连接
  17.        Connection conn = factory.newConnection();
  18.        // 创建消息通道
  19.        Channel channel = conn.createChannel();
  20.        // 声明队列
  21.        // String queue, boolean durable, boolean exclusive, boolean autoDelete,
  22.        Map<String, Object> arguments
  23.            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  24.        System.out.println(" Waiting for message....");
  25.        // 创建消费者
  26.        Consumer consumer = new DefaultConsumer(channel) {
  27.            @Override
  28.            public void handleDelivery(String consumerTag, Envelope envelope,
  29.                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
  30.                String msg = new String(body, "UTF-8");
  31.                System.out.println("Received message : '" + msg + "'");
  32.            }
  33.        };
  34.        // 开始获取消息
  35.        // String queue, boolean autoAck, Consumer callback
  36.        channel.basicConsume(QUEUE_NAME, true, consumer);
  37.    }
  38. }
复制代码
参数阐明

声明交换机的参数

声明队列的参数

消息属性BasicProperties

消息的全部属性有14个,以下枚举了一些主要的参数:
参数释义Map headers消息的其他自界说参数Integer deliveryMode2持久化,其他:瞬态Integer priority消息的优先级String correlationId关联ID,方便RPC相应与哀求关联String replyTo回调队列String expirationTTL,消息过期时间,单位毫秒进阶知识

1、TTL(Time To Live)

a、消息的过期时间

有两种设置方式:
通过队列属性设置消息过期时间:
  1. Map<String, Object> argss = new HashMap<String, Object>();
  2. argss.put("x-message-ttl",6000);
  3. channel.queueDeclare("TEST_TTL_QUEUE", false, false, false, argss);
复制代码
设置单条消息的过期时间:
  1. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  2.     .deliveryMode(2) // 持久化消息
  3.     .contentEncoding("UTF-8")
  4.     .expiration("10000") // TTL
  5.     .build();
  6. channel.basicPublish("", "TEST_TTL_QUEUE", properties, msg.getBytes());
复制代码
b、队列的过期时间
  1. Map<String, Object> argss = new HashMap<String, Object>();
  2. argss.put("x-message-ttl",6000);
  3. channel.queueDeclare("TEST_TTL_QUEUE", false, false, false, argss);
复制代码
队列的过期时间决定了在没有任何消费者以后,队列可以存活多久。
参考:
com.gupaoedu.ttl
2、死信队列

有三种环境消息会进入DLX(Dead Letter Exchange)死信交换机。

可以设置一个死信队列(Dead Letter Queue)与DLX绑定,即可以存储Dead Letter,消费者可以监听这个队列取走消息。
  1. Map<String,Object> arguments = new HashMap<String,Object>();
  2. arguments.put("x-dead-letter-exchange","DLX_EXCHANGE");
  3. // 指定了这个队列的死信交换机
  4. channel.queueDeclare("TEST_DLX_QUEUE", false, false, false, arguments);
  5. // 声明死信交换机
  6. channel.exchangeDeclare("DLX_EXCHANGE","topic", false, false, false, null);
  7. // 声明死信队列
  8. channel.queueDeclare("DLX_QUEUE", false, false, false, null);
  9. // 绑定
  10. channel.queueBind("DLX_QUEUE","DLX_EXCHANGE","#");
复制代码
参考:
com.gupaoedu.dlx
3、优先级队列

设置一个队列的最大优先级:
  1. Map<String, Object> argss = new HashMap<String, Object>();
  2. argss.put("x-max-priority",10); // 队列最大优先级
  3. channel.queueDeclare("ORIGIN_QUEUE", false, false, false, argss);
复制代码
发送消息时指定消息当前的优先级:
  1. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  2.     .priority(5) // 消息优先级
  3.     .build();
  4. channel.basicPublish("", "ORIGIN_QUEUE", properties, msg.getBytes());
复制代码
优先级高的消息可以优先被消费。但是:只有消息堆积(消息的发送速度大于消费者的消费速度)的环境下优先级才有意义。
参考:
com.gupaoedu.message
4、延迟队列

RabbitMQ本身不支持延迟队列。可以利用TTL结合DLX的方式来实现消息的延迟投递,即把DLX跟某个队列绑定,  到了指定时间,消息过期后,就会从DLX路由到这个队列,消费者可以从这个队列取走消息。
另一种方式是利用rabbitmq-delayed-message-exchange插件。
当然,将需要发送的信息保存在数据库,利用任务调度系统扫描然后发送也是可以实现的。
参考:
com.gupaoedu.dlx
5、RPC

RabbitMQ实现RPC的原理:服务端处理消息后,把相应消息发送到一个相应队列,客户端再从相应队列获取到结果。
其中的问题:Client收到消息后,怎么知道应答消息是复兴哪一条消息的?所以必须有一个唯一ID来关联,就是 correlationId。

参考:
com.gupaoedu.rpc
6、服务端流控(Flow Control)

RabbitMQ 会在启动时检测呆板的物理内存数值。默认当 MQ 占用 40% 以上内存时,MQ 会主动抛出一个内存警告并阻塞所有连接(Connections)。可以通过修改 rabbitmq.config 文件来调整内存阈值,默认值是  0.4,如下所示: [{rabbit, [{vm_memory_high_watermark, 0.4}]}].
默认环境,如果剩余磁盘空间在 1GB 以下,RabbitMQ 主动阻塞所有的生产者。这个阈值也是可调的。注意队列长度只在消息堆积的环境下有意义,而且会删除先入队的消息,不能实现服务端限流。
7、消费端限流

在AutoACK为false的环境下,如果一定数目的消息(通过基于consumer大概channel设置Qos的值)未被确认前,不进行消费新的消息。
  1. channel.basicQos(2); // 如果超过2条消息没有发送ACK,当前消费者不再接收队列消息
  2. channel.basicConsume(QUEUE_NAME, false, consumer);
复制代码
参考:com.gupaoedu.limit
UI管理界面的利用

管理插件提供了更简朴的管理方式。
启 用 管 理 插 件

Windows启用管理插件
  1. cd C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.6\sbin
  2. rabbitmq-plugins.bat enable rabbitmq_management
复制代码
Linux启用管理插件
  1. cd /usr/lib/rabbitmq/bin
  2. ./rabbitmq-plugins enable rabbitmq_management
复制代码
管理界面访问端口

默认端口是15672,默认用户guest,密码guest。guest用户默认只能在本机访问。
Linux 创建 RabbitMQ 用户

比方:创建用户admin,密码admin,授权访问所有的Vhost
  1. firewall-cmd --permanent --add-port=15672/tcp
  2. firewall-cmd --reload
  3. rabbitmqctl add_user admin admin
  4. rabbitmqctl set_user_tags admin administrator
  5. rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
复制代码
Spring配置方式集成RabbitMQ


参考gitlab工程步调:
1、创建Maven工程,pom.xml引入依赖
2、src/main/resouces目录,创建rabbitMQ.xml
3、配置applicationContext.xml
4、src/main/resouces目录,log4j.properties
5、编写生产者
6、编写4个消费者
7、编写单元测试类
Spring Boot集成RabbitMQ

参考gitlab工程
来源于:  https://javaguide.net
微信公众号:不止极客

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4