RabbitMQ介绍+使用手册

打印 上一主题 下一主题

主题 879|帖子 879|积分 2637

一、rabbitmq介绍

​ RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息署理软件(亦称面向消息的中间件),RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。全部主要的编程语言均有与署理接口通讯的客户端库。
RabbitMQ优势


  • 可靠性(Reliablity):使用了一些机制来保证可靠性,好比持久化、传输确认、发布确认。
  • 灵活的路由(Flexible Routing):在消息进入队列之前,通过Exchange来路由消息。对于典型的路由功能,Rabbit已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现本身的Exchange。
  • 消息集群(Clustering):多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 高可用(Highly Avaliable Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出问题的环境下队列仍旧可用。
  • 多种协议(Multi-protocol):支持多种消息队列协议,如STOMP、MQTT等。
  • 多种语言客户端(Many Clients):几乎支持全部常用语言,好比Java、.NET、Ruby等。
  • 管理界面(Management UI):提供了易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。
  • 跟踪机制(Tracing):如果消息非常,RabbitMQ提供了消息的跟踪机制,使用者可以找出发生了什么。
  • 插件机制(Plugin System):提供了许多插件,来从多方面进行扩展,也可以编辑本身的插件。
二、rabbitmq服务器安装及环境设置

1.下载并安装rabbitmq服务器

​ 首先辈入rabbitmq官网(rabbitmq.com)

​ 向下滑动滚轮找到Download+Installation,点击进入

​ 点击Install Windows安装windows版本

​ 向下滑动滚轮找到下图框住的两个下载链接

​ 下载的软件位置,先安装otp.exe,鼠标右键以管理员方式运行。接着选取要安装的路径,然后一路傻瓜式安装 next 下一步,安装即可。不要安装在中文或带空格的文件路径下。
​ 设置系统环境变量:右键此电脑 - 属性 - 高级系统设置 - 环境变量。

​ 接着打开 - 此电脑(文件资源管理器) 找到刚刚我们安装的文件 bin 目录下,复制路径 ctrl+c 切换窗口到环境变量,找到系统变量 path - 编辑

​ 新建 - ctrl + v 粘贴刚才我们复制的路径,然后三次确定,关闭环境变量窗口

​ 安装 RabbitMQ:右键管理员运行,然后选择安装路径,接着一路 next 下一步,遇到弹窗点允许,没有弹窗则无视(不要安装在中文或带空格的文件路径下)。安装完成后也要设置系统环境变量:设置的方法和上面一致,路径是安装的文件下的sbin下。

安装完成后找到安装文件路径,找到 sbin 目录下,全选路径 输入 cmd,打开cmd命令窗口,运行下面命令(这个指令是为了启动web界面)。
rabbitmq-plugins enable rabbitmq_management

​ 然后再通过管理员权限打开一个cmd,输入rabbitmq-service stop,再输入rabbitmq-service start

(如果这里运行这两个命令不成功可能是rabbitmq服务器安装未成功,必要在管理员权限下的cmd实行rabbitmq-service install)
​ 打开浏览器,访问 127.0.0.1:15672,出现管理页面就安装成功了。账户:guest,暗码:guest。

2.下载并编译rabbitmq-c静态库

第一步:在https://github.com/alanxz链接下载rabbitmq-c静态库。
第二步:使用cmake编译天生适合本身编译环境的工程
​ (1)填写源代码路径
​ (2)创建后的路径,build的文件夹一样平常创建在源代码路径里,也可以放到其他位置
​ (3)点击Configure按钮
​ (4)点击Generate按钮

​ (如出现如下报错,可以去掉ENABLE_SSL_SUPPORT括号里的勾。)

​ 然后再将工程放在vs中编译天生即可在工程的build/librabbitmq/Debug文件夹下找到天生的rabbitmq.4.lib和rabbimq.4.dll了,将dll和lib放在本身所需的工程下即可用AMQP库的接口函数了。
三、rabbitmq使用

1.rabbitmq架构及各组件功能


​ virtual Host:虚拟主机。标识一批互换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有本身的队列、互换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。
​ channel:通道,amqp支持一个tcp毗连上启用多个mq通讯通道,每个通道都可以被作为通讯流。
​ publisher:生产者,是消息产生的源头。
​ exchange:互换机,可以理解为具有路由表的路由规则。
​ queues:队列,装载消息的缓存容器。
​ consumer:消费者,毗连到队列并取走消息的客户端。
焦点思想:在RabbitMQ中,生产者从不直接将消息发送给队列。
​ 究竟上,有些生产者甚至不知道消息是否被送到某个队列中去了。生产者只负责将消息送给互换机,而互换机确切地知道什么消息应该送到哪。
​ bind:绑定,实际上可以理解为互换机的路由规则。每个消息都有一个称为路由键的属性(routing key),就是一个简单的字符串。一个绑定将【互换机,路由键,消息送达队列】三者绑定在一起,形成一条路由规则。
​ exchange type:互换机类型:
​ fanout:不处理路由键,转发到全部绑定的队列上
​ direct:处理路由键,必须完全匹配,即路由键字符串相同才会转发
​ topic:路由键模式匹配,此时队列必要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”只能匹配一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”
注意:这里用户的创建是在服务器的Web界面上创建的!并且用户所拥有的权限还决定能吸取的信息。
​ 创建用户方法:
​ 第一步:打开浏览器,访问 127.0.0.1:15672,输入账户:guest,暗码:guest。
​ 第二步:点击导航栏里的Admin

​ 第三步:点击Add a user左边的小三角,在username后面输入创建用户的名字,在Password里输出暗码,下一行的(confirm)是确认暗码,然后在末了一个输入的右侧选择你要创建用户的角色,这个决定用户拥有的权限,在目前我的探究里,如果发送方通讯的方式选的是topic且吸取方权限是Admin,那么Routingkey的设置将毫偶然义,不管设置什么吸取方都会吸取到这个互换机里的全部信息,设置其他的权限则可以通过Routingkey来选择想要吸取到的消息。

​ 第四步:点击Add user,即可成功创建一个用户。

​ 第五步:点击你创建的用户

​ 第六步:设置用户的虚拟主机权限以及TOPIC权限,直接用默认的就行,点击上下两个Set permission就ok了


2.rabbitmq通讯方式

2.1扇出模式(fanout)

在Fanout模式下,消息发送流程是如许的:


  • 可以有多个消费者
  • 每个消费者有本身的queue(队列)
  • 每个队列都要绑定到Exchange(互换机)
  • 生产者发送的消息,只能发送到互换机,互换机来决定要发给哪个队列,生产者无法决定。
  • 互换机把消息发送给绑定过的全部队列
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

2.2直连模式(direct)

在Direct模型下:
​ 队列与互换机的绑定,不是任意绑定,而是要指定一个RoutingKey(路由key)。消息的发送方在向 Exchange发送消息时,也必须指定消息的 RoutingKey。Exchange会判断每一个绑定的队列,只有队列的Routingkey与消息的 Routing key完全一致,才会吸取到消息。

2.3主题模式(topic)

在Topic模式下:
​ Topic模式的Exchange与Direct相比,可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时间使用通配符!这种模型Routingkey 一样平常都是由一个或多个单词组成,多个单词之间以”.”分割,比方: item.insert

3.接口函数介绍

3.1通用函数

  1.   //创建一个新的AMQP连接,他将被用于RabbitMQ服务器进行通信
  2.   amqp_new_connection();
  3.    //这行代码创建了一个新的TCP套接字,并将其与之前的AMQP关联起来
  4.   amqp_tcp_socket_new(conn);
  5.   /**
  6.   *                @brief  amqp_socket_open()                用来打开一个与AMQP服务器的网络套接字连接。
  7.   *                @param  socket                                        用于存储套接字连接信息的指针。
  8.   *                @param  host                                        AMQP服务器的主机名或IP地址
  9.   *                @param  port                                        AMQP服务器主机的端口号,默认端口号是5672
  10.   */
  11.   amqp_socket_open(socket, "127.0.0.1", 5672);
  12.   /**
  13.   *                @brief  amqp_login()                        用来来进行与AMQP服务器的身份验证和登录操作。
  14.   *                @param  conn                                        与AMQP服务器建立的连接。
  15.   *                @param  vhost                                        虚拟主机的名称,用于指定要连接的AMQP虚拟主机,"/"表示连接到默认的虚拟主机。
  16.   *                @param  channel_max                                通道的最大数量。在这里,设置为0表示使用服务器的默认设置。
  17.   *                @param  frame_max                                每个AMQP帧的最大字节大小。AMQP_DEFAULT_FRAME_SIZE表示使用默认的帧大小。
  18.   *                @param  heartbeat                                心跳超时时间,用于检测与服务器之间的连接状态。在这里,设置为0表示禁用心跳检测。
  19.   *                @param  sasl_method                                身份验证机制。AMQP_SASL_METHOD_PLAIN表示使用PLAIN身份验证机制。
  20.   *                @param        username                                登录用户名。使用"guest"表示使用默认的用户名。
  21.   *                @param  password                                登录密码。使用"guest"表示使用默认的密码。
  22.   */
  23.   amqp_login(conn, "/", 0, AMQP_DEFAULT_FRAME_SIZE, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
  24.   /**
  25.   *                amqp_bytes_t是AMQP库中的一种数据类型,用于表示字节数据。
  26.   *                具体而言,amqp_bytes_t是一个结构体类型,包含两个字段:
  27.   *                size_t len:表示字节数据的长度(大小)。
  28.   *                void* bytes:指向字节数据的指针。
  29.   *                通过使用amqp_bytes_t类型,可以方便地处理和传递字节数据,如消息内容、队列名称等。在你的代码片段中,amqp_bytes_t用于作为参数传递给amqp_queue_declare()函数的queue参数。通过使用amqp_bytes_t类型的对象,可以指定要声明的队列名称。在你的例子中,使用了amqp_empty_bytes,表示不指定特定的队列名称,而是让服务器自动生成一个唯一的队列名称。总结起来,amqp_bytes_t是AMQP库中用于表示字节数据的数据类型。它具有表示字节数据长度和指向字节数据的指针的能力,方便在处理和传递字节数据时使用。
  30.   */
  31.   amqp_bytes_t t;
  32.   //打开一个通道(channel),编号为1,所有的消息都是通过通道传输的。
  33.   const amqp_channel_t KChannel = 1;
  34.   amqp_channel_open(conn, KChannel);
  35.   //首先关闭通道,然后关闭连接,最后销毁连接。
  36.   amqp_channel_close(conn, KChannel, AMQP_REPLY_SUCCESS);
  37.   amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
  38.   amqp_destroy_connection(conn);
复制代码
3.2吸取端函数

  1.   /**
  2.   *                @brief  amqp_queue_declare() 向AMQP服务器发送一个队列声明请求,以创建一个新的消息队列或获取现有队列的信息。
  3.   *                @param  conn                                 与AMQP服务器建立的连接。
  4.   *                @param  channel                          通道编号,用于指定操作的特定通道。
  5.   *                @param  queue                                 要声明的队列名称。可以是一个字符串,表示要创建或获取信息的队列名称。
  6.   *                @param  passive                                 表示是否以被动模式进行声明。如果设置为非零值(例如1),表示以被动模式进行声明,仅返回现有队列的信息而不创建新队列。如果设置为零值,则会创建新队列或返回现有队列的信息。
  7.   *                @param  durable                                 表示队列是否持久化。如果设置为非零值(例如1),表示队列是持久化的,即在服务器重启后仍然存在。如果设置为零值,则队列是非持久化的,不会存储到磁盘上。
  8.   *                @param  exclusive                         表示队列是否为独占队列。如果设置为非零值(例如1),表示队列是独占的,只有声明队列的连接可以访问。如果设置为零值,则队列可以被多个连接访问。
  9.   *                @param  auto_delete                         表示队列是否在不再使用时自动删除。如果设置为非零值(例如1),表示队列在没有连接使用时会被自动删除。如果设置为零值,则队列不会自动删除。
  10.   *                @param  arguments                         可选参数,用于传递额外的队列参数。
  11.   */
  12.   amqp_queue_declare(m_pConn,m_iChannel_Receive,_queue,_passive,_durable,_exclusive,_auto_delete,amqp_empty_table);  
  13.   /**
  14.   *                @brief  amqp_queue_bind()         用于将队列与交换机进行绑定。
  15.   *                @param  conn                                 与AMQP服务器建立的连接。
  16.   *                @param  channel                          通道编号,用于指定操作的特定通道。
  17.   *                @param  queue                                 要声明的队列名称。可以是一个字符串,表示要创建或获取信息的队列名称。
  18.   *                @param  exchange                          要声明的队列名称。可以是一个字符串,表示要创建或获取信息的队列名称。
  19.   *                @param  routing_key                         绑定的路由键,用于指定消息从交换机路由到队列的条件。消息的路由键和绑定的路由键匹配时,消息会被路由到该队列。
  20.   *                @param  arguments                         可选参数,用于传递额外的绑定参数。
  21.   */
  22.   amqp_queue_bind(m_pConn,m_iChannel_Receive,_queue,_exchange,_routekey,amqp_empty_table);
  23.   /**
  24.   *                @brief  amqp_basic_qos()  用来设置AMQP通道的QoS(Quality of Service)配置。
  25.   *                @param  m_pConn                                与AMQP服务器建立的连接。
  26.   *                @param  m_iChannel                        通道编号,用于指定操作的特定通道。
  27.   *                @param  prefetch_size                预取大小,用于限制服务器一次性发送给消费者的消息的总大小。在这里,设置为0表示不限制消息的总大小。
  28.   *                @param  prefetch_count                预取计数,用于限制服务器一次性发送给消费者的消息的数量。在这里,设置为1表示每次只获取一条消息。
  29.   *                @param  global                                用于指定是否将预取限制应用到整个通道(global=true)还是仅应用于特定消费者(global=false)。在这里,设置为0表示仅应用于特定消费者。
  30.   */
  31.   amqp_basic_qos(conn, KChannel, 0, /*prefetch_count*/1, 0);
  32.   /**
  33.   *                @brief  amqp_basic_ack()        用来确认(acknowledge)已经成功处理的消息。
  34.   *                @param  m_pConn                                与AMQP服务器建立的连接。
  35.   *                @param  m_iChannel                        通道编号,用于指定操作的特定通道。
  36.   *                @param  delivery_tag  交付标识(delivery tag),它是一个标识特定消息的整数值。该标识用于唯一标识一条消息。
  37.   *                @param  multiple                        用于指示是否同时确认多条消息。设置为false表示只确认当前的一条消息,true为确认多条消息
  38.   */
  39.   amqp_basic_ack(m_pConn, m_iChannel, envelope.delivery_tag, true);
  40.   /**
  41.   *                @brief                                                用于启动消费者并订阅消息队列中的消息。
  42.   *                @param  conn                                与AMQP服务器建立的连接。
  43.   *                @param  channel                         通道编号,用于指定操作的特定通道。
  44.   *                @param  queue                                要声明的队列名称。可以是一个字符串,表示要创建或获取信息的队列名称。
  45.   *                @param  consumer_tag                消费者标识,用于标识特定的消费者。可以为消费者指定一个唯一的标识,也可以使用空字符 (amqp_empty_bytes) 自动分配一个标识。
  46.   *                @param  no_local                        表示是否不接收自己发布的消息。如果设置为非零值(例如 1),表示不接收自己发布的消息。如果设置为零值,则接收自己发布的消息。
  47.   *                @param  no_ack                                表示是否不需要发送确认消息。如果设置为非零值(例如 1),表示不需要发送确认消息,即消息不需要进行显式的确认。如果设置为零值,则需要发送确认消息,以确保消息的可靠传递和处理。
  48.   *                @param  exclusive                        表示是否创建一个独占的消费者。如果设置为非零值(例如 1),表示创建一个独占的消费者,只有当前连接的消费者可以订阅指定队列。如果设置为零值,则可以有多个消费者订阅同一个队列。
  49.   *                @param  arguments                        可选参数,用于传递额外的订阅参数。
  50.   */
  51.   amqp_basic_consume(m_pConn,m_iChannel_Receive,queuename,amqp_empty_bytes,false,false,false,amqp_empty_table);
  52.   /**
  53.   *                @brief  用于从 AMQP 服务器接收订阅的消息,并将消息存储在 amqp_message_t 类型的结构体中,而不设置接收超时时间。
  54.   *                @param  conn                                与 AMQP 服务器建立的连接。
  55.   *                @param  &envelope                        指向 amqp_envelope_t 类型的指针,用于存储接收到的消息的包装信息(如交付标识、交换机名称等)。该结构体中的字段将被填充为接收到的消息的相关信息。
  56.   *                @param  timeval*                        如果输入nullptr,表示没有设置接收超时时间,即函数将一直阻塞等待接收到消息。
  57.   *                @param  flags                                0:表示没有设置特定的接收标志。
  58.   */
  59.   amqp_consume_message(m_pConn,&envelope,timeout,0);
复制代码
3.3发送端函数

  1.   /**
  2.   *                @brief  amqp_exchange_declare() 用于向 AMQP 服务器发送一个交换机声明请求,以创建一个新的交换机或获取现有交换机的信息。
  3.   *                @param  conn                                 与AMQP服务器建立的连接。
  4.   *                @param  channel                          通道编号,用于指定操作的特定通道。
  5.   *                @param  exchange                          要声明的队列名称。可以是一个字符串,表示要创建或获取信息的队列名称。
  6.   *                @param  type                                 交换机的类型,可以是 "direct"、"fanout"、"topic"。
  7.   *                @param  passive                                 表示是否以被动模式进行声明。如果设置为非零值(例如1),表示以被动模式进行声明,仅返回现有队列的信息而不创建新队列。如果设置为零值,则会创建新队列或返回现有队列的信息。
  8.   *                @param  durable                                 表示队列是否持久化。如果设置为非零值(例如1),表示队列是持久化的,即在服务器重启后仍然存在。如果设置为零值,则队列是非持久化的,不会存储到磁盘上。
  9.   *                @param  auto_delete                         表示队列是否在不再使用时自动删除。如果设置为非零值(例如1),表示队列在没有连接使用时会被自动删除。如果设置为零值,则队列不会自动删除。
  10.   *                @param  internal                         表示交换机是否是内部的。如果设置为非零值(例如 1),表示交换机是内部的,只能通过其他交换机进行路由。如果设置为零值,则交换机可以直接接收消息。
  11.   *                @param  arguments                         可选参数,用于传递额外的队列参数。
  12.   */
  13.   amqp_exchange_declare(m_pConn,m_iChannel_Send,_exchange,_type,_passive,_durable,0,0,amqp_empty_table);
  14.   /**
  15.   *                @brief        用于将C风格字符串转换为AMQP字节序列的函数调用,函数的返回值是一个amqp_bytes_t类型的对象,其中包含转换后的字节序列的长度和指向数据的指针,这个的用途只是进行了一个转换,应该amqp库里的函数输入都不是char*以及string类型,都需要转成amqp_bytes_t类型
  16.   *                @param  cstr                                 const char*类型字符串
  17.   */
  18.   amqp_cstring_bytes("direct");
  19.   /**
  20.   *                @brief  将一个消息发布到 RabbitMQ 服务器.
  21.   *                @param  conn                                 已经打开的与 RabbitMQ 服务器的连接。
  22.   *                @param  replyChannel                 消息应该发布到的通道,这通常与消息原始接收的通道相同。  
  23.   *                @param  exchange                         交换器名字。
  24.   *                @param  routekey                         路由键。
  25.   *                @param  mandatory                         如果设置为 true,当消息不能被路由到任何队列时,服务器会将消息返回给生产者。如果是false,表示如果消息不能被路由,服务器会丢弃它。
  26.   *                @param  immediate                         如果设置为 true,当队列中没有消费者能立即接收到消息时,服务器会将消息返回给生产者。如果是false,表示服务器会将消息存储,直到有消费者能接收它。
  27.   *                @param  properties*                         消息的属性。
  28.   *                @param  response_                         要发布的消息体。
  29.   */
  30.   amqp_basic_publish(m_pConn,m_iChannel_Send,_exchange,_routekey,0,0,NULL,message_bytes);
复制代码
四、参考网站

原文链接:https://blog.csdn.net/qq_43410878/article/details/123656765
原文链接:https://zhuanlan.zhihu.com/p/640374684

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

前进之路

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表