【消息队列】RabbitMQ五种消息模式

打印 上一主题 下一主题

主题 947|帖子 947|积分 2841

RabbitMQ

RabbitMQ是基于Erlang语言开辟的开源消息通信中心件
官网地点:https://www.rabbitmq.com/
RabbitMQ安装

我们在Centos虚拟机中使用Docker来安装

  • 下载镜像,在线拉取
    docker pull rabbitmq
  • 安装MQ
  1. docker run\
  2. --env RABBITMQ_DEFAULT_USER=itcast \  # 设置环境变量用户名
  3. --env RABBITMQ_DEFAULT_PASS= \  # 设置环境变量密码
  4. --name mq \   # 队列名称
  5. --hostname mq1 \  #配置主机名
  6. -p 15672:15672 \  # MQ管理端口
  7. -p 5672:5672 \   #MQ消息传输端口
  8. -d \   # 后台运行
  9. rabbitmq
复制代码


   互换机的创建与消息的发送由虚拟主机来完成,每个用户的虚拟主机是相互隔离的
  在RabbitMQ中:
channel:操作MQ的工具
exchange:路由消息到队列中
queue:缓存消息
virtual host:虚拟主机,是对queue,exchange等资源的逻辑分组
常见的消息模型


  • 根本消息队列
  • 工作消息队列
   这两种并没有效到互换机,而是直接到达队列
  

  • 发布订阅(Publish,Subscribe),根据互换机范例差别分为三种:
    Fanout Exchange:广播
    Direct Exchange:路由
    Topic Exchange:主题
根本消息队列

publisher:消息发布者,将消息发送到队列queue
queue:消息队列,负责接收并缓存消息
consumer:订阅队列,处置惩罚队列中的消息
java模型(消息发布者)
  1. @Test
  2. public void test() throws IOException,TimeoutException{
  3.     //1.建立连接,与消息队列进行连接
  4.     ConnetionFactory factory =new ConnetionFactory();
  5.     //设置连接参数,主机名,端口号,vhost,用户名,密码
  6.     factory.setHost(192.168.75.136);
  7.     factory.setPort(5672);
  8.     factory.setVirtualHost("/");
  9.     factory.setUsername("itcast");
  10.     factory.setPassword("");
  11.     //建立连接
  12.     Connection connection =factory.newConnection();
  13.     //创建通道Channel,就可以向队列发送消息了
  14.     Channel channel =connection.createChannel();
  15.     //创建队列
  16.     String queuename="hlh";
  17.     channel.queueDeclare(queuename,false,false,false,null);
  18.     //发送消息
  19.     String message="hello";
  20.     channel.basicPublish("",queuename,null,message.getBytes());
  21.     //关闭通道和连接
  22.     channel.close();
  23.     connection.close();
  24. }
复制代码
java模型(消息消费者)
  1.     //1.建立连接,与消息队列进行连接
  2.     ConnetionFactory factory =new ConnetionFactory();
  3.     //设置连接参数,主机名,端口号,vhost,用户名,密码
  4.     factory.setHost(192.168.75.136);
  5.     factory.setPort(5672);
  6.     factory.setVirtualHost("/");
  7.     factory.setUsername("itcast");
  8.     factory.setPassword("");
  9.     //建立连接
  10.     Connection connection =factory.newConnection();
  11.     //创建通道Channel,就可以向队列发送消息了
  12.     Channel channel =connection.createChannel();
  13.     //创建队列
  14.     String queuename="hlh";
  15.     channel.queueDeclare(queuename,false,false,false,null);
  16.     //订阅消息
  17.     channel.basicConsume(queuename,true,new DefaultConsumer(channel){
  18.     @Override
  19.     //处理消息的代码,绑定函数,有了消息才执行
  20.     public void handleDelivery(String consumerTag,Envelope envelope,
  21.                                AMQP.BasicProperties properties,byte[] body)throws IOException{
  22.                   //处理消息
  23.                   String message=new String(body);            
  24.                }
  25.     })
复制代码
留意:上边生产者消费者都创建了队列:
   这是为了防止消息队列中的队列不存在,在进行消息队列初始化的时候不知道是先创建消费者,照旧先创建生产者,以是都实行创建函数,但是创建的队列只有一个不会重复
  SpringAMQP



  • AMQP
   是用于在应用程序或之间传递业务消息的开放尺度,该协议与语言和平台无关,更符合微服务中的独立性的要求
  

  • Spring AMQP
   Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息,包含两部分,此中Spring-amqp是基础抽象,spring-rabbit是底层的默认实现
  

  • 特性:

  • 监听器容器,用于异步处置惩罚入站消息
  • 用于发送和接收消息的RabbitTemplate
  • Rabbitadmin用于自动声明队列,互换和绑定


  • 使用:

  • 引入spring-amqp的依靠

    在yml中设置mq连接信息:
  1. spring:
  2.    rabbitmq:
  3.      host: 192.168.75.136 #主机名
  4.      port: 5672 #端口
  5.      virtual-host: / #虚拟主机
  6.      username: itcast #用户名
  7.      password:   #密码
复制代码

  • 在生产者服务中使用RabbitTemplate发送消息到hlh.queue这个队列
  1. public class springamqptest{
  2.    @Autowired
  3.    private RabbitTemplate rabbittemplate;
  4.    @Test
  5.    public void test(){
  6.      String queuename="hlh.queue";
  7.      String message="hello";
  8.      rabbittemplate.convertAndSend(queuename,message);
  9.    }
  10. }
复制代码

  • 在消费者服务端编写消费逻辑,绑定到hlh.queue这个队列中
  1. @Component
  2. public class SpringrabbitListener {
  3.    @RabbitListener(queues="hlh.queue")
  4.    public void listenSimple(String msg) throws InterruptedException{
  5.      //消费逻辑代码
  6.    }
  7. }
复制代码
留意:消息一旦消费就会从队列中删除,rabbitmq没有消息回溯功能
WorkQueue

Work queue,工作队列。可以提高消息处置惩罚速率,制止队列消息堆积
   一个消息队列绑定多个消费者
  假设如今生产者每秒循环发送50条消息,此时的消费者怎么处置惩罚:
  1. @Component
  2. public class SpringrabbitListener {
  3.    @RabbitListener(queues="hlh.queue")
  4.    public void listenSimple(String msg) throws InterruptedException{
  5.      //消费逻辑代码
  6.    }
  7.       @RabbitListener(queues="hlh.queue")
  8.    public void listenSimple2(String msg) throws InterruptedException{
  9.      //消费逻辑代码
  10.    }
  11. }
复制代码
通过定义多个消费者进行消费,追上生产者生产的速率,同一个消息只能被一个消费者消费,一旦消费完就会在队列中删除
消息预取

指的每个消费者每次取多少条消息:
可以通过设置进行设置:
  1. spring:
  2.    rabbitmq:
  3.       host: 192.168.75.136
  4.       port: 5672
  5.       virtual-host: /
  6.       username: itcast
  7.       password:
  8.       listener:
  9.          simple:
  10.            prefecth: 1 #每次只能获取一条消息,处理完才能获得下一个消息
复制代码
发布订阅模式

发布订阅可以使得同一个消息发送给多个消费者,实现方式是参加了exchange(互换机)

留意:exchange负责消息路由,而不是存储,路由失败则消息丢失
互换机的作用:

  • 接收生产者的消息,将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列
SpringAMQP提过了声明互换机,队列,绑定关系的API:

Fanout Exchange

Fanout Exchange 会将全部的消息路由到每一个跟其绑定的queue
在创建设置类,在设置类中进行消息队列绑定互换机
  1. @Configuration
  2. public class FanoutConfig{
  3.     // 声明FanoutExchange交换机
  4.     @Bean
  5.     public FanoutExchange fanoutExchange(){
  6.        return new FanoutExchange("itcast.fanout");
  7.     }
  8.     //声明一个队列
  9.     @Bean
  10.     public Queue fanoutQueue1(){
  11.        return new Queue("fanout.queue1");
  12.     }
  13.     // 绑定队列跟交换机
  14.     @Bean
  15.     public Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
  16.        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
  17.     }
  18. }
复制代码
此时的生产者怎样发送消息:
  1. public void test(){
  2.   //给出交换机名称
  3.   String exchangeName="itcast.fanout";
  4.   String message="hello";
  5.   //发送消息
  6.   rabbitTemplate.convertAndSend(exchangeName,"",message);
  7. }
复制代码
监听者怎样收到消息
  1. @RabbitListener(queues="fanout.queue1")
  2. public void listener(String msg){
  3.    //处理得到的消息
  4. }
复制代码
DirectExchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此成为路由模式(routes)
   每一个Queue都与Exchange设置一个BindingKey
  发布者发送消息时,指定消息的RoutingKey,Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
   一个队列可以指定多个Key
  我们可以通过 @RabbitListener声明Exchange,Queue,RoutingKey
在消费者方法上注解
  1. @RabbitListener(bindings=@QueueBinding(value=@Queue(name="direct.queue1"),exchange=@Exchange(name="itcast.direct",type=ExchangeTypes.DIRECT),key={"red","blue"}))
  2. public void Listener(String msg){
  3.    //进行消息的处理
  4. }
复制代码
在生产者生产时:
  1. public void test(){
  2.   //给出交换机名称
  3.   String exchangeName="itcast.fanout";
  4.   String message="hello";
  5.   //发送消息
  6.   rabbitTemplate.convertAndSend(exchangeName,"blue",message);
  7. }
复制代码
TopicExchange

TopicExchange与路由模式类似,区别在于routingKey必须是多个单词的列表,而且以.分隔
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词
同样也是使用 @RabbitListener进行声明
  1. @RabbitListener(bindings=@QueueBinding(value=@Queue(name="direct.queue1"),exchange=@Exchange(name="itcast.direct",type=ExchangeTypes.DIRECT),key="hi.#"))
  2. public void Listener(String msg){
  3.    //进行消息的处理
  4. }
复制代码
生产者生产消息:
  1. public void test(){
  2.   //给出交换机名称
  3.   String exchangeName="itcast.fanout";
  4.   String message="hello";
  5.   //发送消息
  6.   rabbitTemplate.convertAndSend(exchangeName,"hi.now",message);
  7. }
复制代码
消息转换器

在SpringAMQP的发送方法中,接收消息的范例是Object,也就是我们可以发送任意对象范例的消息,SpringAMQP会帮助我们序列化为字节后发送
   Spring的对消息对象的处置惩罚是由org.springframework.amqp.support.converter.MessageConverter来处置惩罚的,而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化
  假如要修改只需定义一个MessageConverter 范例的Bean即可,保举使用JSON方式完成序列化

  • 引入jackson的依靠

  • 声明MessageConverter:
  1. @Bean
  2. public MessageConverter jsonMessageConverter(){
  3.    return new Jackson2JsonMessageConverter();
  4. }
复制代码
这样发送的消息就会使用自定义的转换范例

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

傲渊山岳

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表