RabbitMQ快速入门

打印 上一主题 下一主题

主题 831|帖子 831|积分 2493

目录

MQ简介
1、同步通信
图片
2、异步通信
图片
RabbitMQ快速上手
根本介绍:
Producer和Consumer
Connection和Channel
Virtual host
Queue
Exchange
工作流程
 AMQP
Java编写RabbitMQ生产者消耗者
生产者
1.建立毗连
 2.开启信道
3.声明交换机
4.声明队列
5.发送消息
6.资源开释
生产者全部代码:
消耗者
执行结果


MQ简介

MQ(Message Queue)消息队列,本质上是队列,满意队列FIFO(先入先出)的性子,队列中存放的内容是消息,消息可以是只包罗文本字符串,JSON,也可以是对象等。
MQ经常用于分布式体系之间的通信,体系之间的通信通常有两种方式:
1、同步通信

直接调用对方的服务,数据从一端出发到达另一端。
图片

2、异步通信

数据从⼀端发出后,先进⼊⼀个容器进⾏临时存储,当达到某种条件后,再由这个容器发送给另⼀端,这个容器通常就是MQ(message queue)。
图片

RabbitMQ快速上手

根本介绍:

RabbitMQ是MQ的一种实现,工作流程如下:

 RabbitMQ是一个消息中心件,也是一个生产者消耗者模型,负责吸收,存储并转发消息,消息传递的过程类似于发快递,你把快递放到驿站,快递小哥帮你把快递送到吸收人的手上。这个过程中,你就相当于Producer,吸收人相当于Consumer,快递站就是RabbitMQ。
Producer和Consumer

Producer(生产者),是RabbitMQ的客户端,用于发送消息到RabbitMQ。
Consumer(消耗者),也是RabbitMQ的客户端,用于向RabbitMQ吸收消息。
Broker(代理),RabbitMQ服务器节点,用于吸收和发送消息。
生产者发送消息到RabbitMQ服务器,让RabbitMQ进行路由转发到对应的消耗者。
生产者发送的消息带有标签,代理(RabbitMQ服务器)会根据标签路由,找到必要的消耗者,进行消息转发。消耗者吸收消息进行消耗过程中标签就会被丢掉,也就是说消耗者并不会知道消息的发送者是谁。
Connection和Channel

 Connection(毗连):客户端和RabbitMQ服务器之间的⼀个TCP毗连,建立毗连后才可以进行消息的发送和吸收。
Channel(信道):类似于发送消息的一个通道,每个TCP毗连可以有多个信道,每个信道都是独立的虚拟毗连,消息的发送和吸收都是基于Channel(信道)的。
Virtual host

Virtual host:虚拟主机,给消息队列提供逻辑上的隔离,一个BrokerServer可以有多个虚拟主机,当不同用户使用RabbitMQ Server提供的服务时,可以使用划分多个虚拟主机的方式将一系列业务隔离开来,类似于MySQL中不同的数据库。
Queue

Queue:队列,用于存放消息,一个队列可以被多个消耗者订阅。
Exchange

Exchenge:交换机,交换机负责吸收生产者发送的消息,并按照规则将消息路由到队列上。类似于快递小哥将快递送给
工作流程

   Proucer生产消息 -> Producer毗连到RabbitMQBroker,建立毗连(Connection),开启信道(Channel) -> Producer声明交换机(Exchange),路由消息 ->roducer声明队列(Queue) -> Produce发送消息到RabbitMQBroker-> RabbitMQBroker吸收消息,并放入相应的队列(Queue)中,未找到相应队列根据生产者的配置,选择丢弃或者退回给生产者。
   AMQP

AMQP是一种高级消息队列协议,界说了一套确定的消息交换功能,包罗交换机,队列等,这些组件共同工作,使生产者能够将消息发送到交换机,由队列吸收并等候消耗者吸收。AMQP还界说了一个网络协议,允许客户端应用通过这个协议与消息代理和AMQP模型进行交互通信。RabbitMQ使AMQP协议的Erlang实现。
Java编写RabbitMQ生产者消耗者

打开管理页面,添加用户:

这里添加的测试用户名称为:test   暗码 123456。 
创建虚拟机:

这里添加的虚拟机名称为:test
返回admin页面,点击刚刚创建的test用户,添加虚拟机操作权限。

进入IDEA创建一个Maven项目,导入依赖:
  1. <dependency>
  2.    <groupId>com.rabbitmq</groupId>
  3.    <artifactId>amqp-client</artifactId>
  4.    <version>5.20.0</version>
  5. </dependency>
复制代码
生产者

1.建立毗连

使用依赖中的ConnectionFactory类创建毗连,设置关键信息:
  1. ConnectionFactory connectionFactory = new ConnectionFactory();
  2. connectionFactory.setHost("139.9.116.124");
  3. connectionFactory.setPort(5672);
  4. connectionFactory.setUsername("fwx");
  5. connectionFactory.setPassword("123456");
  6. connectionFactory.setVirtualHost("测试");
  7. Connection connection = connectionFactory.newConnection();
复制代码
 2.开启信道

  1. Channel channel = connection.createChannel();
复制代码
3.声明交换机

这里使用的是RabbitMQ默认提供的交换机,不必要代码进行声明
4.声明队列

使用channel的queueDeclare方法,依此填入相干参数:
  1. channel.queueDeclare("hello",true,false,false,null);
复制代码
 打开源码,查看参数的含义:

 第一个参数queue代表队列的名称,第二个参数durable代表是否持久化(true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失),第三个参数exclusive代表是否独占(是否独占, 只能有⼀个消耗者监听队列),第四个参数autoDelete代表是否自动删除(没有消耗者自动删除队列),第五个参数arguments代表一些配置的参数。
5.发送消息

使用Channel的basicPublish()方法进行消息的发送,这里发送的消息为"hello RabbitMQ~",消息发送10次:
  1. for (int i = 0; i < 10; i++) {
  2.       String msg = "hello RabbitMQ~";
  3.       channel.basicPublish("","test1",null,msg.getBytes());
  4. }
复制代码
打开方法,分析方法参数:
  第一个参数exchange代表交换机的名称(这里使用默认交换机""),第二个参数routingKey代表路由的名称(使用默认交换机,路由名称要和队列名称相同才可以找到对应队列),第三个参数body代表发送的消息体(将String类型转换为字节数组)。
6.资源开释

  1. channel.close();
  2. connection.close();
复制代码
最后将资源关闭即可。
生产者全部代码:

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. public class Producer {
  7.     public static void main(String[] args) throws IOException, TimeoutException {
  8.         //1.建立链接
  9.         ConnectionFactory connectionFactory = new ConnectionFactory();
  10.         //主机地址
  11.         connectionFactory.setHost("139.9.116.124");
  12.         //端口号
  13.         connectionFactory.setPort(5672);
  14.         //用户名
  15.         connectionFactory.setUsername("test");
  16.         //密码
  17.         connectionFactory.setPassword("123456");
  18.         //虚拟机
  19.         connectionFactory.setVirtualHost("test");
  20.         //连接建立
  21.         Connection connection = connectionFactory.newConnection();
  22.         //2.开启信道
  23.         Channel channel = connection.createChannel();
  24.         //3.声明交换机,使用内置交换机
  25.         //4.声明队列
  26.         channel.queueDeclare("test1",true,false,false,null);
  27.         //5.发送消息
  28.         for (int i = 0; i < 10; i++) {
  29.             String msg = "hello RabbitMQ~";
  30.             channel.basicPublish("","test1",null,msg.getBytes());
  31.         }
  32.         //6.资源释放
  33.         channel.close();
  34.         connection.close();
  35.     }
  36. }
复制代码
消耗者

消耗者与生产者逻辑相似,这里不再解说,全部代码:
  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. public class Consumer {
  5.     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  6.         //1.建立链接
  7.         ConnectionFactory connectionFactory = new ConnectionFactory();
  8.         //主机地址
  9.         connectionFactory.setHost("139.9.116.124");
  10.         //端口号
  11.         connectionFactory.setPort(5672);
  12.         //用户名
  13.         connectionFactory.setUsername("test");
  14.         //密码
  15.         connectionFactory.setPassword("123456");
  16.         //虚拟机
  17.         connectionFactory.setVirtualHost("test");
  18.         //连接建立
  19.         Connection connection = connectionFactory.newConnection();
  20.         //2.开启信道
  21.         Channel channel = connection.createChannel();
  22.         //3.声明队列(生产者已经声明)
  23.         //4.消费消息
  24.         /**参数声明
  25.          * basicConsume(String queue, boolean autoAck, Consumer callback)
  26.          * queue:队列名
  27.          * autoAck:自动确认
  28.          * callback:接收到消息执行逻辑
  29.          */
  30.         DefaultConsumer consumer = new DefaultConsumer(channel){
  31.             //从队列中收到消息,就会执行的方法
  32.             @Override
  33.             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  34.                 //TODO
  35.                 System.out.println("接收的消息:"+new String(body));
  36.             }
  37.         };
  38.         channel.basicConsume("test1",true,consumer);
  39.         //等待程序执行完成
  40.         Thread.sleep(2000);
  41.         //5.释放资源
  42.         channel.close();
  43.         connection.close();
  44.     }
  45. }
复制代码
生产者已经声明过队列,消耗者无需声明,消耗者通过重写handleDelivery()方法(这个方法在吸收消息时自动执行)去消耗消息。
执行结果

执行生产者代码,打开RabbitMQ管理页面,看到队列test1产生了10条消息等候消耗:

执行消耗者,此时控制台打印了吸收的消息:

再查看守理页面,消息已经全部消耗:
 到此结束,感谢观看!


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

灌篮少年

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表