RabbitMQ实践——搭建多人聊天服务

鼠扑  金牌会员 | 2024-11-6 16:34:31 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 926|帖子 926|积分 2778

在《RabbitMQ实践——搭建单人聊天服务》一文中,我们搭建了Tom和Jerry两人的聊天服务。在这个服务中,它们都向Fanout互换器发送消息。而Fanout会将消息路由到它们两各自监听的队列。如许它们就可以得到全部消息。

如果是多人聊天,比如10个人聊天,按上述方案,需要Fanout互换器绑定10个队列。这就会使得布局变得非常复杂。
这是由于Classic类型队列在消费者确认读取消息后,会将消息从队列中删除。如许就需要我们使用fanout向多个队列路由消息,以供差异消费者消费。如果多个消费者消费同一个队列,则会导致每个消费者得到的都是部分信息。这就不符合我们理解的聊天场景。
但是我们可以使用Stream类型队列来解决这个问题。
Stream类型队列和之前的Classic队列的差异点是:Stream队列并不会清除消息。消息会不停存在于Stream队列中,消费者可以从指定位置开始读取消息。如许我们只要有一个Stream队列生存消息,所有消费者都从队列中读取消息即可。

用户登录

关于用户登录的流程我们在《RabbitMQ实践——搭建单人聊天服务》中已经有详细的介绍。即上图中黑色字体1、2、3、4、5的步骤。
创建聊天室

我们会创建一个以聊天室名称定名的互换器和Stream类型队列。即上图中黑色字体6、7、8、9的步骤。
需要注意的是Stream类型队列创建方案和Classic类型类似,只需要多指定"x-queue-type"=“stream”。但是对于Durable(持久化)只能设置为True,exclusive只能设置为False,autoDelete只能设置为False。
  1. package com.rabbitmq.chat.service;
  2. import java.util.Collections;
  3. import java.util.Date;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.core.MessageBuilder;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.stereotype.Service;
  9. import reactor.core.publisher.Flux;
  10. @Service
  11. public class ChatRoomV2 {
  12.     @Autowired
  13.     private RabbitTemplate rabbitTemplate;
  14.     public void createChatRoom(String admin, String roomName) {
  15.         createChatRoom(roomName);
  16.     }
  17.     private void createChatRoom(String roomName) {
  18.         rabbitTemplate.execute(action -> {
  19.             action.exchangeDeclare(roomName, "fanout", false, true, null);
  20.             action.queueDeclare(roomName, true, false, false,
  21.                 Collections.singletonMap("x-queue-type", "stream"));
  22.             action.queueBind(roomName, roomName, "");
  23.             return null;
  24.         });
  25.     }
复制代码
聊天室创建完毕后,会关照所有登录的用户。
  1.     @PostMapping("/create")
  2.     public void create(@RequestParam String admin, @RequestParam String roomName) {
  3.         chatRoomV2.createChatRoom(admin, roomName);
  4.         core.notifyEveryone(roomName + " is created");
  5.     }
复制代码
监听Stream(聊天室)

  1.     public Flux<String> receive(String username, String roomName) {
  2.         return Flux.create(emitter -> {
  3.             rabbitTemplate.execute(channel -> {
  4.                 channel.basicQos(100);
  5.                 Date timestamp = new Date(System.currentTimeMillis());
  6.                 channel.basicConsume(roomName, false, username,
  7.                     false, true,
  8.                         Collections.singletonMap("x-stream-offset", timestamp),
  9.                         (consumerTag, message) -> {
  10.                             String senderOfMessage = message.getProperties().getHeaders().get("username").toString();
  11.                             String show = "You Said: ";
  12.                             if (!senderOfMessage.equals(username)) {
  13.                                 show = senderOfMessage + " Said: ";
  14.                             }
  15.                             show += new String(message.getBody());
  16.                             System.out.println(show);
  17.                             emitter.next(show);
  18.                             channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
  19.                         },
  20.                         consumerTag -> { }
  21.                 );
  22.                 return null;
  23.             });
  24.         });
  25.     }
复制代码
我们将"x-stream-offset"设置为当前毫秒数,是表示我们只读取当前时间之后发布的消息。这也符合聊天室的业务特点:不能读取汗青消息。
当我们收到消息后,我们会获取消息Header中的自定义字段username,它标记了消息的发布者。如果发布者和读取者是同一人,我们将展示内容前面新增“You Said:”;如果是别人说的,则标记发布者的名称。
由于我们使用了WebFlux响应式编程,所以Controller层要做特殊处理
  1.     @GetMapping(value = "/receive", produces = "text/event-stream")
  2.     public Flux<String> receive(@RequestParam String username, @RequestParam String roomName) {
  3.         return chatRoomV2.receive(username, roomName);
  4.     }
复制代码
发送消息

每个聊天室用户只要给之前创建的Fanout互换器发送消息即可。在这一步,我们给他们发送的消息Header中新增了字段username,以标记是谁发送的。
  1.     public void send(String username, String roomName, String message) {
  2.         Message msg = MessageBuilder.withBody(message.getBytes())
  3.             .setHeader("username", username)
  4.             .build();
  5.         rabbitTemplate.send(roomName, "", msg);
  6.     }
复制代码
实行

登录

Tom侧


Jerry侧


创建聊天室

Jerry侧

Jerry申请创建一个聊天室

在管理背景,我们看到对应的互换器和Stream都创建出来了。


同时在刚才的登录接口界面,Jerry收到了关照

Tom侧

Tom也会收到关照

进入聊天室

Tom和Jerry在收到关照后,可以通过receive接口进入聊天室,监听聊天室内容变革。
Jerry侧


Tom侧


发送消息

Jerry发送消息


Jerry侧聊天室


Tom侧聊天室


Tom发送消息


Jerry侧聊天室


Tom侧聊天室


代码工程

https://github.com/f304646673/RabbitMQDemo
参考资料



  • https://www.rabbitmq.com/docs/streams

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

鼠扑

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表