ToB企服应用市场:ToB评测及商务社交产业平台

标题: RabbitMQ快速上手 [打印本页]

作者: 汕尾海湾    时间: 2024-9-16 21:03
标题: RabbitMQ快速上手


  
媒介

前面我们先容了什么是 RabbitMQ,以及 RabbitMQ 的作用和优势,那么这篇文章我将为各人先容如何安装 RabbitMQ,以及如何利用 RabbitMQ。
安装RabbitMQ

我这里选择的安装环境是在 Linux 的 Ubuntu 发行版本中安装的,各人的平台如果不一样的话,各人可以去官网看看如何安装。RabbitMQ各个体系的安装
前面我们说了 RabbitMq 是由 Erlang 语言实现的,所以要想利用 RabbitMQ,我们起首就需要安装 Erlang。
在安装之前,我们先利用 apt-get update 来更新 apt 库:

然后利用 apt-get install erlang 下载 Erlang:

下载完成之后 Ubuntu 会提示我们需要启动什么服务,我们按 ESC 退出,然后查看 Erlang 是否安装乐成。在 Linux 命令行中输入 erl:

如果表现出来 Erlang 的版本就说明安装乐成。当进入 Erlang 之后,我们如何退出呢?可以通过 halt(). 命令退出,也可以直接利用 CTRL + c 退出。
Erlang 安装乐成之后,我们再来安装 RabbitMQ,安装 RabbitMQ 和安装 Erlang 是雷同的,通过 apt-get install rabbitmq-server:

安装完成之后,我们通过 systemctl status rabbitmq-server 来查看 rabbit-server 的状态:

如果表现 active(running)则表现安装乐成。
通过 apt-get install rabbitmq-server 是安装了 rabbitmq 的服务,那么我们还需要安装的东西就是 RabbitMQ 的管理页面,rabbitmq-plugins enable rabbitmq_management:

安装完成 RabbitMQ 的管理页面之后,我们就可以通过 service rabbitmq-server statrt 启动 RabbitMQ 的服务,然后访问这个管理页面了:
启动 RabbitMQ 之后,我们就通过 IP+Port 的方式来访问 rabbitmq 的管理页面,rabbitmq 的管理页面默认利用的端口号是 15671。

与 RabbitMQ 相关的端口号有三个 5672、15672和25672,5671 是客户端和 RabbitMQ 连接时所利用的端口号,15672 就是访问管理页面所利用的端口号,25672 则是我们搭建集群的时候所需要利用到的端口号。

通过 IP + Port 的方式进入管理页面之后,这里提示我们输入用户名和暗码,但是因为我们第一次进入这个管理页面,所以也没有创建账号,固然 rabbitmq 为我们提供了几个默认的账号,但是现在已经无法登录进去了,所以需要我们创建账户之后才能登录。那么我们如何创建账户呢?
我们在 Linux 命令行中输入 rabbitmqctl add_user ${账号} ${暗码},就可以创建一个帐号了:

创建完成之后,我们直接登岸的话还是不能登岸进去:

这是为什么呢?因为这时候我们创建的用户还不具有权限,所以我们接下来需要给当前用户进行赋权操纵 rabbitmqctl set_user_tags ${账号} ${角色名称},这里的角色名称有下面几种可以选择:
我们这里就给用户设置为 administrator 角色 rabbitmqctl set_user_tags admin administrator:

为账号赋权之后,我们再登录:

RabbitMQ焦点概念

安装完成 RabbitMQ 并且进入到管理页面之后,我们可以看到上面的导航栏有六个部分,那么这六个部分分别代表什么?
Producer 和 Consumer

Producer:生产者,是 RabbitMQ Server 的客户端,向 RabbitMQ 发送消息
Consumer:消费者,也是 RabbitMQ Server 的客户端,从 RabbitMQ 接收消息
Broker:就是 RabbitMQ,主要是接收和发送消息

Connection 和 Channel

Connection:连接,是客户端和 RabbitMQ 服务器之间的一个 TCP 连接,这个连接是建立消息传递的基础,它负责传输客户端和服务器之间的全部数据和控制信息
Channel: 通道,信道,Channel 是在 Connection 之上的一个抽象层。在 RabbitMQ 中,一个 TCP 连接可以有多个 Channel,每个 Channel 都是独立的虚拟连接。消息的发送和接收都是基于 Channel 的
通道的主要作用就是将消息的读写操纵复用到一个 TCP 连接中,这样可以淘汰创建和销毁的开销,提高性能。我们可以大致的将 Connection 和 Channel 之间的关系当作是历程和线程之间的关系。

Virtual host

Virtual host:虚拟主机,这是一个虚拟的概念,它为消息队列提供了一种逻辑上的隔离机制,对于一个 RabbitMQ 而言,一个 BrokerServer 上可以存在多个 Virtual Host。当多个不同的用户利用同一个 RabbitMQ Server 提供的服务时,可以虚拟划分出多个 vhost,每个用户在本身的 vhost 创建 exchange/queue 等。Virtual Host 就雷同我们 MySQL 中的 database,一个主机上的 MySQL 可以为多个项目提供服务,每个项目利用不同的 database。
Queue

Queue 队列,RabbitMQ 的内部对象,用于存储消息,也就不外多先容了。
Exchange

Exchange 交换机,生产者生产消息到达 broker 的第一站,它负责接收生产者发送的消息,并根据特定的规则把这些消息路由到一个大概多个 Queue 中。

RabbitMQ 工作流程


RabbitMQ 是一个消息中间件,也是一个消费者生产者模型,它负责接收、存储并转发消息。消息传递的过程雷同邮局,当你要发送一个邮件时,你会把你的邮件放到邮局,邮局接收到邮件,并通过邮递员送到收件人的手中。在上面的图中,Producer 就相当于发件人,Consumer 就相当于收件人,RabbitMQ 就雷同于邮局。
生产者发送消息的流程








消费者接收消息的流程






AMQP

前面反复的提到 AMQP,那么什么是 AMQP 呢?
AMQP(Advanced Message Queuing Protocol),即高级消息队列协议,是一个提供统一消息服务的应用层标准协议,是应用层协议的一个开放标准。AMQP专为面向消息的中间件计划,基于此协议的客户端与消息中间件传递消息,并不受客户端/中间件不同产品、不同开辟语言等条件的限制。
AMQP的工作流程主要包罗以下几个步骤:

Web页面操纵

但知道了 RabbitMQ 的焦点概念和工作流程之后,我们来看看 RabbitMQ 的管理页面如何利用吧。

这里的 Connections、Channels、Exchanges和Queues因为我们还没有客户端连接 RabbitMQ 所以此时还没有什么信息,后面有客户端连接 RabbitMQ 的时候再来看看这内里的内容。
然后就是这个 Admin,账号管理:


我们可以点击对应的账号,对其权限、可操纵的 Virtual Host 等进行设置:

也可以对绑定的交换机进行设置:

修改账号的暗码和权限:

删除账号:

也可以在 Admin 页面添加账号:

我们也可以点击右边的 Virtual Host 对虚拟主机进行设置:

在这里我们可以看到全部的虚拟主机,以及每一个主机中哪些用户对其具有权限:

当然也可以创建一个新的虚拟主机:


创建完成之后,默认是哪个用户创建的这个虚拟主机,这个用户就对这个虚拟主机具有权限。

我们可以点击某个特定的虚拟主机对齐进行设置:

可以删除大概增加某个用户对这个虚拟主机的权限:

RabbitMQ快速入门

在知道了上面的知识了之后,我们来看看 Java 中如何利用 RabbitMQ。
1. 引入依靠

RabbitMQ 属于第三方库,不在 java.lang 包下,所以需要我们将 RabbitMQ 的依靠引入才能利用 RabbitMQ。


将复制的坐标粘贴到 pom.xml 文件中:

导入 rabbitmq 依靠之后,我们来分别编写生产者代码和消费者代码。
2. 编写生产者代码


根据这个 rabbitmq 的工作流程图,我们来看看生产者需要做什么。
1.首老师产者需要先和 RabbitMQ Broker 建立连接

2)设置参数,需要的参数有 IP、Port、Virtual Host、Username、Password:
  1. public class ProducerDemo {
  2.     public static void main(String[] args) throws IOException, TimeoutException {
  3.         ConnectionFactory factory = new ConnectionFactory(); //创建连接工厂
  4.         factory.setHost("x.x.x.x"); //设置IP
  5.         factory.setPort(5672); //前面说了,客户端与RabbitMQ Server连接使用的端口默认是5672
  6.         factory.setVirtualHost("test"); //Virtual Host
  7.         factory.setUsername("admin"); //用户名
  8.         factory.setPassword("****"); //密码
  9.         //创建连接Connection
  10.         Connection connection = factory.newConnection();
  11.     }
  12. }
复制代码
2.开启信道
  1. Channel channel = connection.createChannel();
复制代码
3.声明交换机,因为 rabbitmq 默认提供了几个交换机,所以这里我们就利用默认的交换机。
4.声明队列
channel.queueDeclare() 的参数表明:
AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;

  1. channel.queueDeclare("hello",true,false,false,null);
复制代码
5.发送消息
  1. void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException;
复制代码

  1. for (int i = 0; i < 10; i++) {
  2.     String msg = "hello rabbitmq" + i;
  3.     channel.basicPublish("","hello",null,msg.getBytes()); //因为使用的是默认的交换机,所以交换机这个参数的值就是空字符串
  4.     System.out.println("消息发送成功");
  5. }
复制代码
6.释放资源
  1. // 释放资源,先释放信道再释放Connection
  2. channel.close();
  3. connection.close();
复制代码
当这里我们释放资源了之后,管理页面有些内容我们就看不到了,所以,我们先不释放资源。
编写完成生产者代码之后,我们运行这段代码,然后查看管理页面中的变革:
起首是 Connection:

Chanel:

Exchange:

Queue:

点进去队列,我们可以看到队列的详细信息:


我们也可以在这个界面中生产消息:

得到队列中的 n 条消息:

3. 编写消费者代码

1.建立连接
消费者跟生产者一样,起首还是要和 RabbitMQ Broker 建立连接:
  1. public class ConsumerDemo {
  2.     public static void main(String[] args) throws IOException, TimeoutException {
  3.         ConnectionFactory factory = new ConnectionFactory(); //创建连接工厂
  4.         factory.setHost("x.x.x.x"); //设置IP
  5.         factory.setPort(5672); //前面说了,客户端与RabbitMQ Server连接使用的端口默认是5672
  6.         factory.setVirtualHost("test"); //Virtual Host
  7.         factory.setUsername("admin"); //用户名
  8.         factory.setPassword("***"); //密码
  9.         //建立连接Connection
  10.         Connection connection = factory.newConnection();
  11.     }
  12. }
复制代码
2.创建Channel
  1. connection.createChannel();
复制代码
3.声明交换机,这里还是利用默认的交换机
4.声明队列
  1. channel.queueDeclare("hello",true, false, false, null);
复制代码
5.消费消息
  1. basicConsume(String queue, boolean autoAck, Consumer callback);
复制代码

  1. Consumer consumer = new DefaultConsumer(channel) {
  2.     @Override
  3.     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  4.         System.out.println("接收到消息" + new String(body));
  5.     }
  6. };
  7. channel.basicConsume("hello",false,consumer);
复制代码
运行 Consumer:

查看管理界面变革:
多出了一个连接Connection

Channel 也是多出了一个

但是 Exchange 交换机没有变革,因为交换机是只有生产者到达 Broker 的时候才会利用到,从 Broker 到消费者之间不会利用到 Exchange:

因为我这里没有设置自动应答,也没有手动应答,所以全部的消费的消息都表现的是 Unacked:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4