SpingBoot集成Rabbitmq及Docker摆设

打印 上一主题 下一主题

主题 900|帖子 900|积分 2700

  1.                 * [队列@Queue注解属性如下:](#Queue_541)
  2.                 * [1.simple简单模式(点对点模式)](#1simple_579)
  3.                 * [2.work工作模式(一对多)](#2work_632)
  4.                 * [3.publish/subscribe发布订阅(共享资源)](#3publishsubscribe_714)
  5.                 * [4.routing路由模式](#4routing_884)
  6.                 * [5.topic 主题模式(路由模式的一种)](#5topic__976)
  7.                 * [6.RPC (基于消息的远程过程调用)](#6RPC__1066)
  8.         - [延时队列、循环队列、兜底机制、定时任务](#_1074)
  9.         - * [1.延时队列](#1_1076)
  10.                 * + [使用TTL+死信队列组合实现延迟队列的效果。](#TTL_1078)
  11.                         + [使用RabbitMQ官方延迟插件,实现延时队列效果。](#RabbitMQ_1168)
  12.                 * [2.循环队列](#2_1220)
  13.                 * [3.兜底机制](#3_1228)
  14.                 * [4.定时任务](#4_1235)
  15.                 * [回调模式: 只能创建一个](#___1238)
  16.                 * [Classic经典队列、Quorum仲裁队列、Stream流式队列](#ClassicQuorumStream_1292)
复制代码
先容

RabbitMQ是由Erlang语言开发的AMQP的开源实现
AMQP:Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放尺度,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ的特点



  • 可靠性(Reliablity):使用了一些机制来包管可靠性,比如长期化、传输确认、发布确认。
  • 灵活的路由(Flexible Routing):在消息进入队列之前,通过Exchange来路由消息。对于范例的路由功能,Rabbit已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的Exchange。
  • 消息集群(Clustering):多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 高可用(Highly Avaliable Queues):队列可以在集群中的呆板上进行镜像,使得在部门节点出标题的情况下队列仍然可用。
  • 多种协议(Multi-protocol):支持多种消息队列协议,如STOMP、MQTT等。
  • 多种语言客户端(Many Clients):几乎支持所有常用语言,比如Java、.NET、Ruby等。
  • 管理界面(Management UI):提供了易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。
  • 跟踪机制(Tracing):如果消息异常,RabbitMQ提供了消息的跟踪机制,使用者可以找出发生了什么。
  • 插件机制(Plugin System):提供了许多插件,来从多方面进行扩展,也可以编辑自己的插件
Rabbitmq术语



  • 斲丧者:订阅某个队列
  • 生产者::创建消息,然后发布到队列中(queue),终极将消息发送到监听的斲丧者。
  • Broker:标识消息队列服务器实体.
  • Virtual Host:虚拟主机。标识一批互换机、消息队列和相干对象。虚拟主机是共享雷同的身份认证和加密情况的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、互换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。
  • Exchange:互换机,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  • Queue:消息队列,用来保存消息直到发送给斲丧者。它是消息的容器,也是消息的尽头。一个消息可投入一个或多个队列。消息一直在队列内里,等候斲丧者毗连到这个队列将其取走。
  • Banding:绑定,用于消息队列和互换机之间的关联。一个绑定就是基于路由键将互换机和消息队列毗连起来的路由规则,所以可以将互换机理解成一个由绑定构成的路由表。
  • Channel:通道,多路复用毗连中的一条独立的双向数据流通道。通道是建立在真实的TCP毗连内的虚拟链接,AMQP命令都是通过通道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过通道完成。因为对于操作系统来说,建立和销毁TCP都黑白常昂贵的开销,所以引入了通道的概念,以复用一条TCP毗连。
  • Connection:网络毗连,比如一个TCP毗连。
  • Publisher:消息的生产者,也是一个向互换器发布消息的客户端应用程序。
  • Consumer:消息的斲丧者,体现一个从一个消息队列中取得消息的客户端应用程序。
  • Message:消息,消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息可能须要长期性存储[消息的路由模式])等。
消息发布接收流程

1.发送消息


  • 生产者和Broker建立TCP毗连。
  • 生产者和Broker建立通道。
  • 生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
  • Exchange将消息转发到指定的Queue(队列)
2.接收消息


  • 斲丧者和Broker建立TCP毗连 。
  • 斲丧者和Broker建立通道
  • 斲丧者监听指定的Queue(队列)
  • 当有消息到达Queue时Broker默认将消息推送给斲丧者。
  • 斲丧者接收到消息。
Docker摆设

查询rabbitmq最新版本
  1. docker search rabbitmq #查询镜像 已经集成了erlang,无需单独安装erlang
  2. docker pull rabbitmq  #拉取镜像 最新版,或指定版本 docker pull rabbitmq:3.13-management 自带管理界面
  3. docker images # 查看拉取的镜像
  4. #启动容器 指定管理界面登陆账号和密码
  5. # 15672 管理界面端口
  6. # 5672 amqp协议端口,程序连接端口
  7. # -v /mnt/data/rabbitmq/conf:/etc/rabbitmq 配置文件目录
  8. # -v /mnt/data/rabbitmq/data:/var/lib/rabbitmq 数据目录
  9. # -v /mnt/data/rabbitmq/log:/var/log/rabbitmq 日志目录
  10. # -e RABBITMQ\_DEFAULT\_USER=tlmroot 管理界面登陆账号
  11. # -e RABBITMQ\_DEFAULT\_PASS=123456 管理界面登陆密码
  12. # 最好限制容器内存 --memory 2g
  13. docker run -d --hostname rabbitmq --name rabbitmq --restart=always -v /mnt/data/rabbitmq/conf:/etc/rabbitmq -v /mnt/data/rabbitmq/data:/var/lib/rabbitmq -v /mnt/data/rabbitmq/log:/var/log/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ\_DEFAULT\_USER=tlmroot -e RABBITMQ\_DEFAULT\_PASS=123456 rabbitmq:latest
  14. # docker ps 如果容器启动失败,需要提高日志挂载目录的访问权限后重启服务
  15. chmod 777 /mnt/data/rabbitmq/log
  16. # 进入容器内部安装管理界面插件
  17. docker exec -it rabbitmq /bin/bash
  18. # 容器内部创建管理界面插件 安装完成即可访问 服务器IP加15672,如无法访问 关闭防火墙 systemctl stop firewalld
  19. rabbitmq-plugins enable rabbitmq_management
  20. # 容器内部启用所有功能标志,也可以在管理界面操作(Admin/Feature Flags)
  21. enable all feature flags
  22. #至此创建完成 服务器需要开放15672和5672端口
复制代码

管理界面说明



  • Overview: 这个页面显示了RabbitMQ服务器的一般信息,例如集群节点的名字、状态、运行时间等。

  1. Totals 消息数,队列消息、连接数、通道数、交换机数、队列数、消费者数
  2. Nodes:节点信息 进程数、磁盘数据、运行时间、等
  3. Churn statistics: 流失率统计,最后一分钟连接操作、通道操作、队列操作
  4. Ports and contexts: 端口信息及网络环境信息
  5. Export definitions: 导出配置
  6. Import definitions:导入配置
复制代码


  • Connections: 在这里,可以查看、管理和关闭当前所有的TCP毗连。

  1.   Virtual host: 所属的虚拟主机。
  2.   Name: 名称。
  3.   User name: 使用的用户名。
  4.   State: 当前的状态,running:运行中;idle:空闲。
  5.   SSL/TLS: 是否使用ssl进行连接。
  6.   Protocol: 使用的协议。
  7.   Channels: 创建的channel的总数。
  8.   From client: 每秒发出的数据包。
  9.   To client: 每秒收到的数据包。
复制代码


  • Channels: 这个页面展示了所有当前打开的通道以及它们的详细信息。外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  1. channel:名称。
  2. Virtual host:所属的虚拟主机。
  3. User name:使用的用户名。
  4. Mode:渠道保证模式。 可以是以下之一,或者不是:C: confirm。T:transactional(事务)。
  5. State :当前的状态,running:运行中;idle:空闲。
  6. Unconfirmed:待confirm的消息总数。
  7. Prefetch:设置的prefetch的个数。
  8. Unacker:待ack的消息总数。
  9. publish:生产端 pub消息的速率。
  10. confirm:生产端确认消息的速率。
  11. deliver/get:消费端获取消息的速率。
  12. ack:消费端 ack消息的速率
复制代码


  • Exchanges: 可以在这里查看、创建和删除互换机。

  1. Name:名称。
  2. Type:exchange type,具体的type可以查看RabbitMq系列之一:基础概念。
  3. Features:持久化,D:持久化 I:内部 AD:自动删除
  4. Message rate in:消息输入速率。
  5. Message rate out:消息输出速率
  6. Add a new exchange:
  7.     Virtual host:属于哪个Virtual host,我这里只有一个所以不显示
  8.     Name:名字,同一个Virtual host里面的Name不能重复。
  9.     Durability: 是否持久化,Durable:持久化。Transient:不持久化。
  10.     Auto delete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。
  11.     Internal: 是否是内部专用exchange,是的话,就意味着我们不能往该exchange里面发消息。
  12.     Arguments: 参数,是AMQP协议留给AMQP实现做扩展使用的
复制代码


  • Queues: 这个页面展示了所有当前的队列以及它们的详细信息。

  1. Virtual host: 所属的虚拟主机。
  2. Name: 名称。
  3. Features: 功能。(参数参考上述交换机页面)
  4. State: 当前的状态,running:运行中;idle:空闲。
  5. Ready: 待消费的消息总数。
  6. Unacked: 待应答的消息总数。
  7. Total: 总数 Ready+Unacked。
  8. incoming: 消息进入的速率。
  9. deliver/get: 消息获取的速率。
  10. ack: 消息应答的速率。
  11. Add a new queue:
  12.     Virtual host:属于哪个Virtual host
  13.     Name:名字,同一个Virtual host里面的Name不能重复。
  14.     Durability: 是否持久化,Durable:持久化。Transient:不持久化。
  15.     Auto delete:当最后一个绑定(队列或者exchange)被unbind之后,该 queue 自动被删除。
  16.     Arguments: 参数,是AMQP协议留给AMQP实现做扩展使用的
复制代码


  • Admin: 在这里,可以查看系统中所有的操作用户。

  1. Name: 名称。
  2. Tags: 角色标签,只能选取一个。
  3. Can access virtual hosts: 允许进入的vhost。
  4. Has password: 设置了密码。
复制代码
Virtual Host:虚拟主机
  1. 虚拟主机(vhost)提供逻辑分组和资源分离。每一个vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的connection、exchange、queue、binding等,拥有自己的权限。vhost之于RabbitMQ就像虚拟机于物理机一样,他们通过在各个实例间提供逻辑上分离,允许为不同的应用程序安全保密的运行数据。
复制代码
Feature Flags:功能标志开关
Deprecated Features:已废特性
Policies:计谋设置
  1. 策略分为“用户策略”和“系统策略”
  2. 策略使用的是正则表达匹配规则,按名称匹配一个或多个队列,并将其定义的一些规则(参数)到匹配队列中。换句话说,可以使用策略一次为多个队列配置参数。策略可以理解为给“队列”和“分发器”设置额外的“Arguments”参数。每个“分发器”和“队列”只能生效一个“策略”,并且是是立即生效的。
  3.     参数:
  4.     Apply to:指定策略是只匹配队列、还是只匹配交换,或两则两者都匹配。
  5.     Priority:表示的是策略的优先级、值越大,优先级越高。
  6.     Definition:才是真正的规则。有四大类,分别是HA(高可用性)、federation(联合)、Queues(队列)、Exchanges(备用分发器)
  7.     HA(高可用性):表示将队列怎么镜像到节点的策略。
  8.     ha-mode:选项有三个,分别是“all“(表示同步到所有节点),“exactly”,“nodes”。"exactly"和"nodes"需要结合ha-params才能决定同步策略
  9.     ha-params:为数值、表示个数
  10.     ha-sync-mode:(手动(manual)/自动(automatic)同步)
复制代码
Limits 可以设置最大毗连数
Cluster 集群 更改集群名称
延时队列插件下载安装

延时插件链接地址3.13.0,下载版本和rabbitmq版本要一致 我安装的是 3.13.0
  1. # 下载插件到/home/目录,
  2. curl -JL https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez  -o /home/rabbitmq_delayed_message_exchange-3.13.0.ez
  3. # 可省略,进入容器查询插件目录,三方插件需要放在这里 (ez结尾的文件,/opt/rabbitmq/plugins)
  4. rabbitmq-plugins directories -s
  5. # 容器/opt/rabbitmq/plugins 为插件目录 延时队列插件需要复制到这里
  6. docker cp /home/rabbitmq_delayed_message_exchange-3.13.0.ez rabbitmq:/opt/rabbitmq/plugins
  7. # 进入容器内部查询插件,
  8. docker exec -it rabbitmq /bin/bash
  9. # rabbitmq-plugins list 如下图
  10. rabbitmq-plugins list
  11. # 安装插件命令同安装管理界面命令 rabbitmq-plugins enable 《plugin\_name》
  12. rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  13. # 安装完成后 界面Exchanges(交换机),新增的时候就会出现x-delayed-message
复制代码


rabbitmq.conf设置文件示例

容器运行后,默认没有设置文件,自带的设置足够使用,自行创建放在主机/mnt/data/rabbitmq/conf/目次,或是放在容器/etc/rabbitmq目次,创建容器时已映射
  1. 容器内部查询有效配置
  2. rabbitmqctl environment
  3. RabbitMQ 常用的三种自定义服务器的通用方法:
  4.     配置文件 rabbitmq.conf
  5.     环境变量文件 rabbitmq-env.conf
  6.     补充配置文件 advanced.config
  7. rabbitmq.conf和rabbitmq-env.conf的位置
  8.     在二进制安装中路径是在 :安装目录下的/etc/rabbitmq/
  9.     rpm 安装: /etc/rabbitmq/
  10. 如果rabbitmq.conf和rabbitmq-env.conf 的两个文件不存在,那么我们可以创建该文件,然后我们可以通过环境变量
  11. 指定该文件的位置。
  12. 补充 :
  13.     rabbitmqctl rabbitmqctl 是管理虚拟主机和用户权限的工具
  14.     rabbitmq-plugins 是管理插件的工具
复制代码
1.1 rabbitmq.conf

属性形貌默认值listeners要监听 AMQP 0-9-1 and AMQP 1.0 的端口listeners.tcp.default = 5672num_acceptors.tcp接受tcp毗连的erlang 历程数num_acceptors.tcp = 10handshake_timeoutAMQP 0-9-1 超时时间,也就是最大的毗连时间,单元毫秒handshake_timeout = 10000listeners.ssl启用TLS的协议默认值为nonenum_acceptors.ssl接受基于TLS协议的毗连的erlang 历程数num_acceptors.ssl = 10ssl_optionsTLS 设置ssl_options =nonessl_handshake_timeoutTLS 毗连超时时间 单元为毫秒ssl_handshake_timeout = 5000vm_memory_high_watermark触发流量控制的内存阈值,可以为相对值(0.5),大概绝对值 vm_memory_high_watermark.relative = 0.6 ,vm_memory_high_watermark.absolute = 2GB默认vm_memory_high_watermark.relative = 0.4vm_memory_calculation_strategy内存使用陈诉计谋,assigned:使用Erlang内存分配器统计信息 rss:使用操作系统RSS内存陈诉。这使用特定于操作系统的方法,并可能启动短期子历程。legacy:使用遗留内存陈诉(运行时认为将使用多少内存)。这种计谋相当不准确。erlang 与legacy一样 是为了向后兼容vm_memory_calculation_strategy = allocatedvm_memory_high_watermark_paging_ratio当内存的使用达到了50%后,队列开始将消息分页到磁盘vm_memory_high_watermark_paging_ratio = 0.5total_memory_available_override_value该参数用于指定系统的可用内存总量,一般不使用,实用于在容器等一些获取内存实际值不准确的情况默认未设置disk_free_limitRabbitmq存储数据的可用空间限制,当低于该值的时候,将触发流量限制,设置可参考vm_memory_high_watermark参数disk_free_limit.absolute = 50MBlog.file.level控制记录日记的等级,有info,error,warning,debuglog.file.level = infochannel_max最大通道数,但不包含协议中使用的特别通道号0,设置为0体现无限制,不建议使用该值,容易出现channel泄漏channel_max = 2047channel_operation_timeout通道操作超时,单元为毫秒channel_operation_timeout = 15000heartbeat体现毗连参数协商期间服务器建议的心跳超时的值。如果两端都设置为0,则禁用心跳,不建议禁用heartbeat = 60default_vhostrabbitmq安装后启动创建的虚拟主机default_vhost = /default_user默认创建的用户名default_user = guestdefault_pass默认用户的密码default_pass = guestdefault_user_tags默认用户的标签default_user_tags.administrator = truedefault_permissions在创建默认用户是分配给默认用户的权限default_permissions.configure = .* default_permissions.read = .* default_permissions.write = .*loopback_users允许通过回环地址毗连到rabbitmq的用户列表,如果要允许guest用户长途毗连(不安全)请将该值设置为none,如果要将一个用户设置为仅localhost毗连的话,设置loopback_users.username =true(username要替换成用户名)loopback_users.guest = true(默认为只能本地毗连)cluster_formation.classic_config.nodes设置集群节点cluster_formation.classic_config.nodes.1 = rabbit@hostname1cluster_formation.classic_config.nodes.2 = rabbit@hostname2默认为空,未设置collect_statistics统计网络模式,none 不发出统计信息事件,coarse每个队列毗连都发送统计一次,fine每发一条消息的统计数据collect_statistics = nonecollect_statistics_interval统计信息网络隔断,以毫秒为单元collect_statistics_interval = 5000delegate_count用于集群内通讯的委托历程数。在多核的服务器上我们可以增加此值delegate_count = 16tcp_listen_options默认的套接字选项tcp_listen_options.backlog = 128 …hipe_compile设置为true以使用HiPE预编译RabbitMQ的部门,HiPE是Erlang的即时编译器,启用HiPE可以进步吞吐量两位数,但启动时会延迟几分钟。Erlang运行时必须包含HiPE支持。如果不是,启用此选项将不起作用。HiPE在某些平台上根本不可用,尤其是Windows。hipe_compile = falsecluster_keepalive_interval节点应该多长时间向其他节点发送keepalive消息(以毫秒为单元),keepalive的消息丢失不会被视为关闭cluster_keepalive_interval = 10000queue_index_embed_msgs_below消息的字节大小,低于该大小,消息将直接嵌入队列索引中 bytesqueue_index_embed_msgs_below = 4096mnesia_table_loading_retry_timeout等候集群中Mnesia表可用的超时时间,单元毫秒mnesia_table_loading_retry_timeout = 30000mnesia_table_loading_retry_limit集群启动时等候Mnesia表的重试次数,不实用于Mnesia升级或节点删除。mnesia_table_loading_retry_limit = 10mirroring_sync_batch_size要在队列镜像之间同步的消息的批处置惩罚大小mirroring_sync_batch_size = 4096queue_master_locator队列主节点的计谋,有三大计谋 min-masters,client-local,randomqueue_master_locator = client-localproxy_protocol如果设置为true ,则毗连须要通过反向代理毗连,不能直毗连proxy_protocol = falsemanagement.listener.portrabbitmq web管理界面使用的端口management.listener.port = 15672 1.2 advanced.config

某些设置设置不可用或难以使用sysctl格式进行设置。因此,可以使用Erlang术语格式的其他设置文件advanced.config
它将与rabbitmq.conf 文件中提供的设置合并。
属性形貌默认值msg_store_index_module设置队列索引使用的模块{rabbit,[ {msg_store_index_module,rabbit_msg_store_ets_index} ]}backing_queue_module队列内容的实现模块。{rabbit,[ {backing_queue_module,rabbit_variable_queue} ]}msg_store_file_size_limit消息储存的文件大小,现有的节点更改是危险的,可能导致数据丢失默认值16777216trace_vhosts内部的tracer使用,不建议更改{rabbit,[ {trace_vhosts,[]} ]}msg_store_credit_disc_bound设置消息储存库给队列历程的积分,默认一个队列历程被赋予4000个消息积分{rabbit, [{msg_store_credit_disc_bound, {4000, 800}}]}queue_index_max_journal_entries队列的索引日记超过该阈值将刷新到磁盘{rabbit, [{queue_index_max_journal_entries, 32768}]}lazy_queue_explicit_gc_run_operation_threshold在内存压力下为延迟队列设置的值,该值可以触发垃圾回收和减少内存使用,低落该值,会低落性能,进步该值,会导致更高的内存斲丧{rabbit,[{lazy_queue_explicit_gc_run_operation_threshold, 1000}]}queue_explicit_gc_run_operation_threshold在内存压力下,正常队列设置的值,该值可以触发垃圾回收和减少内存使用,低落该值,会低落性能,进步该值,会导致更高的内存斲丧{rabbit, [{queue_explicit_gc_run_operation_threshold, 1000}]} 1.3 rabbitmq-env.conf

通过rabbitmq-env.conf 来界说情况变量
RABBITMQ_NODENAME 指定节点名称
属性形貌默认值RABBITMQ_NODE_IP_ADDRESS绑定的网络接口默认为空字符串体现绑定本机所有的网络接口RABBITMQ_NODE_PORT端口默认为5672RABBITMQ_DISTRIBUTION_BUFFER_SIZE节点之间通讯毗连的数据缓冲区大小默认为128000,该值建议不要使用低于64MBRABBITMQ_IO_THREAD_POOL_SIZE运行时用于io的线程数建议不要低于32,linux默认为128 ,windows默认为64RABBITMQ_NODENAMErabbitmq节点名称,集群中要注意节点名称唯一linux 默认节点名为 rabbit@$hostnameRABBITMQ_CONFIG_FILErabbitmq 的设置文件路径,注意不要加文件的后缀(.conf)默认 $RABBITMQ_HOME/etc/rabbitmq/rabbitmq(二进制安装) /etc/rabbitmq/rabbitmq(rpm 安装)RABBITMQ_ADVANCED_CONFIG_FILEadvanced.config文件路径默认 $RABBITMQ_HOME/etc/rabbitmq/advanced(二进制安装) /etc/rabbitmq/advanced(rpm 安装)RABBITMQ_CONF_ENV_FILE情况变量设置文件路径默认 $RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf(二进制安装) /etc/rabbitmq/rabbitmq-env.conf(rpm 安装)RABBITMQ_SERVER_CODE_PATH在使用HiPE 模块时须要使用默认为空RABBITMQ_LOGS指定日记文件位置默认为 $RABBITMQ_HOME/etc/var/log/rabbitmq/ RABBITMQ_DISTRIBUTION_BUFFER_SIZE 节点间通讯缓冲区大小,默认值 128Mb,节点流量比较多的集群中,可以提升该值,建议该值不要低于64MB。
tcp 缓存区大小
下示例将AMQP 0-9-1毗连的TCP缓冲区设置为192 KiB:
  1. tcp_listen_options.backlog = 128
  2. tcp_listen_options.nodelay = true
  3. tcp_listen_options.linger.on = true
  4. tcp_listen_options.linger.timeout = 0
  5. tcp_listen_options.sndbuf = 196608
  6. tcp_listen_options.recbuf = 196608
复制代码
Java设置

  1. <!-- Maven依赖,Springboot默认集成-->
  2. <dependency>
  3.     <groupId>org.springframework.boot</groupId>
  4.     <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
复制代码
Yml完整设置

  1. spring:
  2.   rabbitmq:
  3.     host: 127.0.0.1 #ip
  4.     port: 5672      #端口
  5.     username: tlmroot #账号
  6.     password: 123456 #密码
  7.     virtualHost:    #链接的虚拟主机 ,切换不同环境 dev\test\prod
  8.     addresses: 127.0.0.1:5672     #多个以逗号分隔,与host功能一样。
  9.     requestedHeartbeat: 60 #指定心跳超时,单位秒,0为不指定;默认60s
  10.     publisherConfirms: true  #发布确认机制是否启用
  11.     publisherReturns: #发布返回是否启用
  12.     connectionTimeout: #链接超时。单位ms。0表示无穷大不超时
  13.     ### ssl相关
  14.     ssl:
  15.       enabled: #是否支持ssl
  16.       keyStore: #指定持有SSL certificate的key store的路径
  17.       keyStoreType: #key store类型 默认PKCS12
  18.       keyStorePassword: #指定访问key store的密码
  19.       trustStore: #指定持有SSL certificates的Trust store
  20.       trustStoreType: #默认JKS
  21.       trustStorePassword: #访问密码
  22.       algorithm: #ssl使用的算法,例如,TLSv1.1
  23.       verifyHostname: #是否开启hostname验证
  24.     ### cache相关
  25.     cache:
  26.       channel:
  27.         size: #缓存中保持的channel数量
  28.         checkoutTimeout: #当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
  29.       connection:
  30.         mode: #连接工厂缓存模式:CHANNEL 和 CONNECTION
  31.         size: #缓存的连接数,只有是CONNECTION模式时生效
  32.     ### listener
  33.     listener:
  34.        type: # 三种类型,SIMPLE,DIRECT,STREAM 默认simple
  35.        ## simple类型
  36.        simple:
  37.          concurrency: #最小消费者数量
  38.          maxConcurrency: #最大的消费者数量
  39.          transactionSize: #指定一个事务处理的消息数量,最好是小于等于prefetch的数量
  40.          missingQueuesFatal: #是否停止容器当容器中的队列不可用
  41.          ## 与direct相同配置部分
  42.          autoStartup: #是否自动启动容器
  43.          acknowledgeMode: #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
  44.          prefetch: #指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量
  45.          defaultRequeueRejected: #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
  46.          idleEventInterval: #container events发布频率,单位ms
  47.          ##重试机制
  48.          retry:
  49.            stateless: #有无状态
  50.            enabled:  #是否开启
  51.            maxAttempts: #最大重试次数,默认3
  52.            initialInterval: #重试间隔
  53.            multiplier: #对于上一次重试的乘数
  54.            maxInterval: #最大重试时间间隔
  55.        direct:
  56.          consumersPerQueue: #每个队列消费者数量
  57.          missingQueuesFatal:
  58.          #...其余配置看上方公共配置
  59.      ## template相关
  60.      template:
  61.        mandatory: #是否启用强制信息;默认false
  62.        receiveTimeout: #`receive()`接收方法超时时间
  63.        replyTimeout: #`sendAndReceive()`超时时间
  64.        exchange: #默认的交换机
  65.        routingKey: #默认的路由
  66.        defaultReceiveQueue: #默认的接收队列
  67.        ## retry重试相关
  68.        retry:
  69.          enabled: true #是否重试功能 默认false
  70.          maxAttempts: 3 #最大重试次数 默认为3
  71.          initialInterval: 1000ms #重试间隔时间 可以使用ms、s、m、h、d
  72.          multiplier: #重试乘数,默认为1,即每次重试间隔时间保持不变
  73.          maxInterval: 10000ms #最大重试间隔时间 与乘数结合使用
复制代码
设置文件
  1. package com.tecloman.cloud.singleton.rabbitmq.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Queue;
  4. import org.springframework.amqp.core.QueueBuilder;
  5. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. /\*\*
  11. \* @author Administrator
  12. \*/
  13. @Configuration
  14. @Slf4j
  15. public class RabbitConfig {
  16.     @Bean
  17.     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  18.         final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  19.         // 序列化配置
  20.         rabbitTemplate.setMessageConverter(jsonMessageConverter());
  21.         rabbitTemplate.setMandatory(true);
  22.         // 推送到server回调
  23.         rabbitTemplate.setConfirmCallback((correlationData, ack, cause) ->
  24.                 log.info("ConfirmCallback correlationData:{},ack:{},cause:{}",correlationData,ack,cause));
  25.         // 消息返回给生产者, 路由不到队列时返回给发送者 先returnCallback,再 confirmCallback
  26.         rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
  27.             log.info("ReturnCallback message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}",
  28.                     message,replyCode,replyText,exchange,routingKey);
  29.         });
  30.         return rabbitTemplate;
  31.     }
  32.     @Bean
  33.     public Jackson2JsonMessageConverter jsonMessageConverter() {
  34.         return new Jackson2JsonMessageConverter();
  35.     }
  36. }
复制代码
RabbitMQ的六种工作模式

斲丧者@RabbitListener注解下的设置内容

  1. @Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION\_TYPE})
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @MessageMapping
  4. @Documented
  5. @Repeatable(RabbitListeners.class)
  6. public @interface RabbitListener {
  7.     String id() default ""; // 用于为监听器指定一个唯一标识符,不指定则自动生成。
  8.     String containerFactory() default "";// 指定要使用的消息监听容器工厂 bean 的名称。
  9.     String[] queues() default {}; // 指定要监听的队列名称。可以是'队列名称','属性占位符键'或'表达式',队列必须存在,queues 属性与 bindings() 和 queuesToDeclare() 属性互斥,不能同时使用
  10.     Queue[] queuesToDeclare() default {}; // 用于声明要监听的队列,可以通过 @Queue 注解定义队列的属性。与 bindings() 和 queues() 属性互斥,不能同时使用,允许动态声明队列。
  11.     boolean exclusive() default false; // 指定是否为独占模式,即只有一个消费者可以消费该队列,为true时要求并发数=1。
  12.     String priority() default ""; // 指定消息的优先级,越大优先级越高。默认为容器优先级,可以为负数。
  13.     String admin() default ""; // 属性用于指定一个 RabbitAdmin bean 的引用。
  14.     QueueBinding[] bindings() default {}; // 用于绑定队列和交换机,以便监听指定的交换机中的消息。与 queues() 和 queuesToDeclare() 属性互斥,不能同时使用。
  15.     String group() default ""; // 指定消费者所属的分组。可以用于实现分组消费,确保同一组内的消费者共享消息。
  16.     String returnExceptions() default ""; // 定义一个异常处理策略,用于处理消息发送失败时的异常情况。
  17.     String errorHandler() default ""; // 指定消息监听容器的错误处理器,用于处理在消息处理过程中发生的错误。
  18.     String concurrency() default ""; // 指定消费者的并发数量,表示同时处理消息的线程数或者并发消费者的数量。
  19.     String autoStartup() default ""; // 指定容器是否自动启动,如果设置为 true,则容器会在启动时自动开始侦听消息。
  20.     String executor() default ""; // 定义用于处理消息的执行器,可以指定一个线程池来处理消息的消费逻辑。
  21.     String ackMode() default ""; // 指定消息确认模式,用于控制消息的确认方式,包括自动、手动、批量确认等。
  22.     String replyPostProcessor() default ""; // 定义一个后处理器,用于在发送响应时对响应消息进行处理。
  23.     String messageConverter() default ""; // 指定消息转换器,用于将消息从字节流转换为目标对象,或者将目标对象转换为字节流。
  24.     String replyContentType() default ""; // 指定回复消息的内容类型。
  25.     String converterWinsContentType() default "true"; // 指定转换器是否覆盖内容类型。
  26. }
复制代码
队列@Queue注解属性如下:

  1.     name: 队列的名称;
  2.     durable: 是否持久化;
  3.     exclusive: 是否独享、排外的;
  4.     autoDelete: 是否自动删除;
  5.     arguments:队列的其他属性参数,有如下可选项,可参看图2的arguments:
  6.     x-message-ttl:消息的过期时间,单位:毫秒;
  7.     x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
  8.     x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
  9.     x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
  10.     x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
  11.     x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
  12.     x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
  13.     x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
  14.     x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
  15.     x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
  16.     x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。
复制代码
1.simple简朴模式(点对点模式)



  • 消息的生产者将消息放入队列
  • 消息的斲丧者(consumer) 监听 消息队列,如果队列中有消息,就斲丧掉,消息被拿走后,主动从队列中删除(隐患 消息可能没有被斲丧者准确处置惩罚,已经从队列中消散了,造成消息的丢失)应用场景:聊天(中间有一个过分的服务器;p端,c端)
  1.         /\*\*
  2. \* 配置文件添加,简单模式队列
  3. \* @return
  4. \*/
  5.     @Bean
  6.     public Queue simpleQueue(){
  7.         //持久化 非独占 非自动删除
  8.         return QueueBuilder.durable("simpleQueue").build();
  9.     }
  10.      /\*\*
  11. \* 简单模式生产者
  12. \*/
  13.     @GetMapping("/simple")
  14.     public R simple(@RequestParam String msg){
  15.         Map<String, Object> map = createMsg(msg);
  16.         // 预先要创建好队列
  17.         rabbitTemplate.convertAndSend("simpleQueue",map);
  18.         return R.ok();
  19.     }
  20.      /\*\*
  21. \* 简单模式的消费者
  22. \*
  23. \* @param message 消息属性
  24. \* @param channel 通道
  25. \* @param msg 消息内容
  26. \* @throws IOException
  27. \*/
  28.     //使用queuesToDeclare属性,如果不存在则会创建队列
  29.     @RabbitListener(queuesToDeclare = @Queue(value = "simpleQueue"))
  30.     public void simple(Message message, Channel channel, JSONObject msg) {
  31.         try {
  32.             MessageProperties properties = message.getMessageProperties();
  33.             // 这个tag每次服务重启会清0
  34.             long tag = properties.getDeliveryTag();
  35.             log.info("简单模式的消费者收到:{}", msg);
  36.             // 简单模式下,消息其实无需确认
  37.             // 由于在yml设置手动回执,此处需要手动回执,false不批量签收,回执后才能处理下一批消息
  38.             channel.basicAck(tag, false);
  39.         } catch (IOException e) {
  40.             log.error(this.getClass().getName());
  41.         }
  42.     }
复制代码
2.work工作模式(一对多)



  • 消息生产者将消息放入队列斲丧者可以有多个,斲丧者1,斲丧者2,同时监听同一个队列,C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责斲丧消息(隐患,高并发情况下,默认会产生某一个消息被多个斲丧者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样) 包管一条消息只能被一个斲丧者使用)
  • 应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统主动争抢)

  1.         /\*\*
  2. \* 配置文件添加,Work模式队列
  3. work队列 默认是轮询发到消息者,priority="10" 设置消费者优先级,优先级相同轮询
  4. \* @return
  5. \*/
  6.     @Bean
  7.     public Queue workQueue(){
  8.         //持久化 非独占 非自动删除
  9.         return QueueBuilder.durable("workQueue").build();
  10.     }
  11.     /\*\*
  12. \* 生产者,一次性生产50条消费,消费者轮询消费,消费者可设置优先级priority="10",越大越优先
  13. \*/
  14.     @GetMapping("/work")
  15.     public R work(@RequestParam String msg) {
  16.         for (int i = 0; i < 50; i++) {
  17.             rabbitTemplate.convertAndSend("workQueue", createMsg(i), message -> {
  18.                 MessageProperties messageProperties = message.getMessageProperties();
  19.                 //默认消息持久化,设置消息不持久化
  20.                 messageProperties.setDeliveryMode(MessageDeliveryMode.NON\_PERSISTENT);
  21.                 return message;
  22.             });
  23.         }
  24.         return R.ok();
  25.     }
  26.     /\*\*
  27. \* 工作模式的消费者1,group分组属性不会生效
  28. \*
  29. \* @param message 消息属性
  30. \* @param channel 通道
  31. \* @param msg 消息内容
  32. \* @throws IOException
  33. \*/
  34.     //使用queuesToDeclare属性,如果不存在则会创建队列
  35.     @RabbitListener(queuesToDeclare = @Queue(value = "workQueue"))
  36.     public void work1(Message message, Channel channel, JSONObject msg) {
  37.         try {
  38.             MessageProperties properties = message.getMessageProperties();
  39.             long tag = properties.getDeliveryTag();
  40.             log.error("工作模式的消费者1收到:{}", msg);
  41.             //手动回执,不批量签收,回执后才能处理下一批消息
  42.             channel.basicAck(tag, false);
  43.         } catch (IOException e) {
  44.             log.error(this.getClass().getName());
  45.         }
  46.     }
  47.     /\*\*
  48. \* 工作模式的消费者2
  49. \*
  50. \* @param message 消息属性
  51. \* @param channel 通道
  52. \* @param msg 消息内容
  53. \* @throws IOException
  54. \*/
  55.     //使用queuesToDeclare属性
  56.     @RabbitListener(queuesToDeclare = @Queue(value = "workQueue"))
  57.     public void work2(Message message, Channel channel, JSONObject msg) {
  58.         try {
  59.             MessageProperties properties = message.getMessageProperties();
  60.             long tag = properties.getDeliveryTag();
  61.             log.error("工作模式的消费者2收到:{}", msg);
  62.             //手动回执,不批量签收,回执后才能处理下一批消息
  63.             channel.basicAck(tag, false);
  64.         } catch (IOException e) {
  65.             log.error(this.getClass().getName());
  66.         }
  67.     }
复制代码
3.publish/subscribe发布订阅(共享资源)



  • 生产者通过fanout扇出互换机群发消息给斲丧者,同一条消息每一个斲丧者都可以收到,消息生产者将消息放入互换机,互换机发布订阅把消息发送到所有消息队列中,对应消息队列的斲丧者拿到消息进行斲丧
  • 相干场景:邮件群发,群聊天,广播(广告)

  1. //------------------方法1:生产者创建交换机,消费者创建队列与监听队列------------------
  2.      /\*\*
  3. \* 配置文件定义交换机
  4. \*
  5. \* @return
  6. \*/
  7.     @Bean
  8.     public Exchange fanout() {
  9.         //持久化 非自动删除
  10.         return ExchangeBuilder.fanoutExchange("fanout").build();
  11.     }
  12.     //创建初始化RabbitAdmin对象
  13.     @Bean
  14.     public RabbitAdmin fanoutRabbitAdmin(ConnectionFactory connectionFactory) {
  15.         RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  16.         // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
  17.         rabbitAdmin.setAutoStartup(true);
  18.         // 声明交换机 fanout
  19.         rabbitAdmin.declareExchange(fanout());
  20.         return rabbitAdmin;
  21.     }
  22.     /\*
  23. \* 生产者 发送50条消息,消费者各自消费50条,
  24. \*/
  25.     @GetMapping("/fanout")
  26.     public R fanout(@RequestParam String msg){
  27.         for (int i = 0; i < 50; i++) {
  28.             Map<String, Object> map = createMsg(i);
  29.             // 第二个参数为路由Key
  30.             rabbitTemplate.convertAndSend("fanout",null,map);
  31.         }
  32.         return R.ok();
  33.     }
  34.    
  35.     /\*\*
  36. \* 发布订阅模式方法1的消费者1,group分组属性不会生效
  37. \*
  38. \* @param message 消息属性
  39. \* @param channel 通道
  40. \* @param msg 消息内容
  41. \*/
  42.     @RabbitListener(
  43.             // 这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除
  44.             // declare = "false":生产者已定义交换机,此处不再声明交换
  45.             bindings = @QueueBinding(value = @Queue, exchange = @Exchange(name = "fanout", declare = "false")))
  46.     public void fanout1(Message message, Channel channel, JSONObject msg) {
  47.         try {
  48.             MessageProperties properties = message.getMessageProperties();
  49.             long tag = properties.getDeliveryTag();
  50.             log.error("发布订阅模式方法1的消费者1收到:{}", msg);
  51.             // 手动回执,不批量签收,回执后才能处理下一批消息
  52.             channel.basicAck(tag, false);
  53.         } catch (IOException e) {
  54.             log.error(this.getClass().getName());
  55.         }
  56.     }
  57.     @RabbitListener(
  58.             // 这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除
  59.             // declare = "false":生产者已定义交换机,此处不再声明交换
  60.             bindings = @QueueBinding(value = @Queue, exchange = @Exchange(name = "fanout", declare = "false")))
  61.     public void fanout2(Message message, Channel channel, JSONObject msg) {
  62.         try {
  63.             MessageProperties properties = message.getMessageProperties();
  64.             long tag = properties.getDeliveryTag();
  65.             log.error("发布订阅模式方法1的消费者2收到:{}", msg);
  66.             // 手动回执,不批量签收,回执后才能处理下一批消息
  67.             channel.basicAck(tag, false);
  68.         } catch (IOException e) {
  69.             log.error(this.getClass().getName());
  70.         }
  71.     }
  72. //------------------方法2:生产者创建队列与交换机,消费者监听队列------------------
  73.     /\*\*
  74. \* 定义队列 持久 非排他 非自动删除
  75. \* @return
  76. \*/
  77.     @Bean
  78.     public Queue fanoutQueue1(){
  79.         return QueueBuilder.durable("fanout-queue1").build();
  80.     }
  81.     @Bean
  82.     public Queue fanoutQueue2(){
  83.         return QueueBuilder.durable("fanout-queue2").build();
  84.     }
  85.     /\*\*
  86. \* 定义扇出交换机 持久 非自动删除
  87. \* @return
  88. \*/
  89.     @Bean
  90.     public FanoutExchange fanoutExchange(){
  91.         return ExchangeBuilder.fanoutExchange("fanout2").build();
  92.     }
  93.     /\*\*
  94. \* 将队列1与交换机绑定
  95. \* @return
  96. \*/
  97.     @Bean
  98.     public Binding fanoutBinding1(){
  99.         return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
  100.     }
  101.     @Bean
  102.     public Binding fanoutBinding2(){
  103.         return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
  104.     }
  105.     //创建初始化RabbitAdmin对象
  106.     @Bean
  107.     public RabbitAdmin fanoutRabbitAdmin(ConnectionFactory connectionFactory) {
  108.         RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  109.         // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
  110.         rabbitAdmin.setAutoStartup(true);
  111.         // 声明交换机和队列
  112.         rabbitAdmin.declareExchange(fanoutExchange());
  113.         rabbitAdmin.declareQueue(fanoutQueue1());
  114.         rabbitAdmin.declareQueue(fanoutQueue2());
  115.         return rabbitAdmin;
  116.     }
  117. // 不同消费者绑定在同一个交换机,队列相同,轮询消费,队列不同,各自消费
  118.     /\*\*
  119. \* 发布订阅模式方法2的消费者1 ,队列不同,生产者发送50条消息,各自消费50条
  120. \*
  121. \* @param message 消息属性
  122. \* @param channel 通道
  123. \* @param msg 消息内容
  124. \*/
  125.     //使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致
  126.     @RabbitListener(queuesToDeclare = @Queue(value = "fanout-queue1"))
  127.     public void fanout1(Message message, Channel channel, JSONObject msg) {
  128.         try {
  129.             MessageProperties properties = message.getMessageProperties();
  130.             long tag = properties.getDeliveryTag();
  131.             log.error("发布订阅模式方法2的消费者1收到:{}", msg);
  132.             //手动回执,不批量签收,回执后才能处理下一批消息
  133.             channel.basicAck(tag, false);
  134.         } catch (IOException e) {
  135.             log.error(this.getClass().getName());
  136.         }
  137.     }
  138.     @RabbitListener(queuesToDeclare = @Queue(value = "fanout-queue2"))
  139.     public void fanout2(Message message, Channel channel, JSONObject msg) {
  140.         try {
  141.             MessageProperties properties = message.getMessageProperties();
  142.             long tag = properties.getDeliveryTag();
  143.             log.error("发布订阅模式方法2的消费者2收到:{}", msg);
  144.             //手动回执,不批量签收,回执后才能处理下一批消息
  145.             channel.basicAck(tag, false);
  146.         } catch (IOException e) {
  147.             log.error(this.getClass().getName());
  148.         }
  149.     }
复制代码
4.routing路由模式



  • 消息生产者将消息发送给互换机按照路由判定,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),互换机根据路由的key,只能匹配上路由key对应的消息队列,对应的斲丧者才能斲丧消息;
  • 根据业务功能界说路由字符串
  • 从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中

  1.    
  2.     /\*\*
  3. \* 定义直流交换机
  4. \* @return
  5. \*/
  6.     @Bean
  7.     public Exchange routeExchange(){
  8.         //持久化 非自动删除
  9.         return ExchangeBuilder.directExchange("route").build();
  10.     }
  11.     // 创建初始化RabbitAdmin对象
  12.     @Bean
  13.     public RabbitAdmin RabbitAdminRoute(ConnectionFactory connectionFactory) {
  14.         RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  15.         // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
  16.         rabbitAdmin.setAutoStartup(true);
  17.         // 声明直流交换机
  18.         rabbitAdmin.declareExchange(routeExchange());
  19.         return rabbitAdmin;
  20.     }
  21.     // 消费者发送消息,key=dev,test,prod
  22.     @GetMapping("/router")
  23.     public R router(@RequestParam String msg,@RequestParam String routerKey){
  24.         Map<String, Object> map = createMsg(msg);
  25.         rabbitTemplate.convertAndSend("route",routerKey,map);
  26.         return R.ok();
  27.     }
  28.     /\*\*
  29. \* 路由式消费者1
  30. \*
  31. \* @param message 消息属性
  32. \* @param channel 通道
  33. \* @param msg 消息内容
  34. \*/
  35.     @RabbitListener(bindings = @QueueBinding(
  36.             // declare = "false":生产者已定义交换机,此处不再声明交换机
  37.             value = @Queue, exchange = @Exchange(name = "route", declare = "false"),
  38.             key = {"prod"}//路由键
  39.     ))
  40.     public void route1(Message message, Channel channel, JSONObject msg) {
  41.         try {
  42.             MessageProperties properties = message.getMessageProperties();
  43.             String routingKey = properties.getReceivedRoutingKey();
  44.             log.error("路由模式方法1的消费者1收到:{},路由键:{}", msg, routingKey);
  45.             //手动回执,不批量签收,回执后才能处理下一批消息
  46.             long tag = properties.getDeliveryTag();
  47.             channel.basicAck(tag, false);
  48.         } catch (Exception e) {
  49.             log.error(this.getClass().getName());
  50.         }
  51.     }
  52.     /\*\*
  53. \* 路由式消费者2
  54. \*
  55. \* @param message 消息属性
  56. \* @param channel 通道
  57. \* @param msg 消息内容
  58. \*/
  59.     @RabbitListener(bindings = @QueueBinding(
  60.             // declare = "false":生产者已定义交换机,此处不再声明交换机
  61.             value = @Queue, exchange = @Exchange(name = "route", declare = "false"),
  62.             key = {"dev","test"}//路由键
  63.     ))
  64.     public void route2(Message message, Channel channel, JSONObject msg) {
  65.         try {
  66.             MessageProperties properties = message.getMessageProperties();
  67.             String routingKey = properties.getReceivedRoutingKey();
  68.             log.error("路由模式方法1的消费者2收到:{},路由键:{}", msg, routingKey);
  69.             //手动回执,不批量签收,回执后才能处理下一批消息
  70.             long tag = properties.getDeliveryTag();
  71.             channel.basicAck(tag, false);
  72.         } catch (Exception e) {
  73.             log.error(this.getClass().getName());
  74.         }
  75.     }
复制代码
5.topic 主题模式(路由模式的一种)



  • 路由功能添加含糊匹配
  • 消息生产者生产消息,把消息交给互换机
  • 互换机根据key的规则含糊匹配到对应的队列,由队列的监听斲丧者接收消息斲丧
  • topic必须是 星号或#.dev.星号,不能以 molo/pcs/q0/*/data_up 如许匹配不了

  1.     /\*\*
  2. \* 定义主题交换机
  3. \* @return
  4. \*/
  5.     @Bean
  6.     public Exchange themeExchange(){
  7.         //持久化 非自动删除
  8.         return ExchangeBuilder.topicExchange("topic").build();
  9.     }
  10.     //创建初始化RabbitAdmin对象
  11.     @Bean
  12.     public RabbitAdmin rabbitAdminTopic(ConnectionFactory connectionFactory) {
  13.         RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  14.         // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
  15.         rabbitAdmin.setAutoStartup(true);
  16.         rabbitAdmin.declareExchange(themeExchange());
  17.         return rabbitAdmin;
  18.     }
  19.     // 生产者,routerKey,\*\*\*\*.dev.\*\*\*,\*\*\*\*\*.test.\*\*\*\*,分别走消费者1,和消费者2
  20.     // 通配符\*,#不能和 / 一起
  21.     @GetMapping("/topic")
  22.     public R topic(@RequestParam String msg, @RequestParam String routerKey) {
  23.         Map<String, Object> map = createMsg(msg);
  24.         rabbitTemplate.convertAndSend("topic", routeKey, map);
  25.         return R.ok();
  26.     }
  27.      /\*\*
  28. \* 主题方法1的消费者1
  29. \*
  30. \* @param message 消息属性
  31. \* @param channel 通道
  32. \* @param msg 消息内容
  33. \*/
  34.     @RabbitListener(bindings = @QueueBinding(
  35.             // declare = "false":生产者已定义交换机,此处不再声明交换机
  36.             value = @Queue, exchange = @Exchange(name = "topic", type = ExchangeTypes.TOPIC),
  37.             key = {"#.dev.\*"}))
  38.     public void topic1(Message message, Channel channel, JSONObject msg) {
  39.         try {
  40.             MessageProperties properties = message.getMessageProperties();
  41.             String routingKey = properties.getReceivedRoutingKey();
  42.             log.error("主题模式方法1的消费者1收到:{},路由键:{}", msg, routingKey);
  43.             //手动回执,不批量签收,回执后才能处理下一批消息
  44.             long tag = properties.getDeliveryTag();
  45.             channel.basicAck(tag, false);
  46.         } catch (Exception e) {
  47.             log.error(this.getClass().getName());
  48.         }
  49.     }
  50.     /\*\*
  51. \* 路由式方法1的消费者2
  52. \* # 号匹配多个 .分隔
  53. \* @param message 消息属性
  54. \* @param channel 通道
  55. \* @param msg 消息内容
  56. \*/
  57.     @RabbitListener(bindings = @QueueBinding(
  58.             // declare = "false":生产者已定义交换机,此处不再声明交换机
  59.             value = @Queue, exchange = @Exchange(name = "topic", type = ExchangeTypes.TOPIC),
  60.             key = {"#.molo.\*"}))
  61.     public void topic2(Message message, Channel channel, JSONObject msg) {
  62.         try {
  63.             MessageProperties properties = message.getMessageProperties();
  64.             String routingKey = properties.getReceivedRoutingKey();
  65.             log.error("主题模式方法1的消费者2收到:{},路由键:{}", msg, routingKey);
  66.             //手动回执,不批量签收,回执后才能处理下一批消息
  67.             long tag = properties.getDeliveryTag();
  68.             channel.basicAck(tag, false);
  69.         } catch (Exception e) {
  70.             log.error(this.getClass().getName());
  71.         }
  72.     }
复制代码
6.RPC (基于消息的长途过程调用)



  • RPC即客户端长途调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct互换机实现,流程如下:
  • 客户端便是生产者也是斲丧者,向RPC请求队列发送RPC调用消息,同时监听RPC相应队列。
  • 服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的效果。
  • 服务端将RPC方法 的效果发送到RPC相应队列。
  • 客户端(RPC调用方)监听RPC相应队列,接收到RPC调用效果。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

傲渊山岳

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表