Java实现 RabbitMQ 消息队列

打印 上一主题 下一主题

主题 869|帖子 869|积分 2607

       媒介:这是一篇利用 Java 来实现 RabbitMQ 消息队列 基本焦点功能的文章,比方:互换价,队列,绑定,消息的功能。
  目录
一、RabbitMQ 是什么
二、需求分析
按模块划分:如下图
服务器模块(Broker)
互换机范例 (Exchange Type)
持久化
三、具体实现
焦点类:
创建 Exchange 类
互换机的范例
创建 Queue 类
创建 Binding 类
创建 Message 类
硬盘管理
数据库管理
文件管理
消息内容存储
消息统计信息存储
垃圾采取
内存管理
互换机管理
队列管理
绑定管理
消息管理
待确认消息管理
消息转发
服务器详细全部代码
客户端模块
生产者     
消耗者
公共模块
序列化和反序列化


一、RabbitMQ 是什么

        RabbitMQ是一个流行的开源消息队列体系,是AMQP(高级消息队列协议)尺度的实现,由以高性能、健壮、可伸缩性着名的Erlang语言开发,并继续了这些长处。
         RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。吸收端可以根据RabbitMQ设置的转发机制吸收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子体系间进行通讯,是分布式体系尺度的设置。
二、需求分析

按模块划分:如下图


服务器模块(Broker)

服务器模块是消息队列中最焦点的部分,负责消息的存储和转发,有以下概念:


  • 虚拟机 (VirtualHost): 类似于 MySQL 的 "database", 是⼀个逻辑上的聚集. ⼀个 BrokerServer 上可 以存在多个 VirtualHost.
  • 互换机 (Exchange): ⽣产者把消息先发送到 Broker 的 Exchange 上. 再根据不同的规则, 把消息转发 给不同的 Queue.
  • 队列 (Queue): 真正⽤来存储消息的部分. 每个消耗者决定⾃⼰从哪个 Queue 上读取消息. 
  • 绑定 (Binding): Exchange 和 Queue 之间的关联关系. Exchange 和 Queue 可以理解成 "多对多" 关 系. 使⽤⼀个关联表就可以把这两个概念接洽起来.
  • 消息 (Message): 传递的内容.
   所谓的 Exchange 和 Queue 可以理解成 "多对多" 关系, 和数据库中的 "多对多" ⼀样. 意思是: ⼀个 Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息). ⼀个 Queue 也可以被多个 Exchange 绑定 (⼀个 Queue 中的消息可以来⾃于多个 Exchange).
  

互换机范例 (Exchange Type)

这里主要实现 RabbitMQ 的三种互换机范例:


  • Direct: ⽣产者发送消息时, 直接指定被该互换机绑定的队列名.
  • Fanout: ⽣产者发送的消息会被复制到该互换机的所有队列中.
  • Topic: 绑定队列到互换机上时, 指定⼀个字符串为 bindingKey. 发送消息指定⼀个字符串为 routingKey. 当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列.
持久化

        Exchange, Queue, Binding, Message都有在硬盘和数据库中实现持久化的需求,当步伐重启 / 主机重启, 保证上述内容不丢失。
三、具体实现

首先实现 Broker 模块的焦点类,也就是 Exchange, Queue, Binding, Message 等焦点
焦点类:

创建 Exchange 类

  1. public class Exchange {
  2.     // 此处使用 name 来作为交换机的身份标识(唯一的)
  3.     private String name;
  4.     // 交换机类型, DIRECT, FANOUT, TOPIC
  5.     private ExchangeType type = ExchangeType.DIRECT;
  6.     // 该交换机是否需要持久化存储, true 代表需要持久化, false 表示不需要持久化
  7.     private boolean durable = false;
  8.     // 如果当前交换机, 没人使用了, 就会自动被删除. 这个属性暂时先列在这里, 后续的代码中并没有真的实现这个自动删除功能~~ (RabbitMQ 是有的)
  9.     private boolean autoDelete = false;
  10.     // arguments 表示的是创建交换机时指定的一些额外的参数选项.
  11.     private Map<String, Object> arguments = new HashMap<>();// 为了把这个 arguments 存到数据库中, 就需要把 Map 转成 json 格式的字符串.
  12. }
复制代码
互换机的范例

  1. public enum ExchangeType {
  2.     DIRECT(0), // 直接交换机
  3.     FANOUT(1), // 扇出交换机
  4.     TOPIC(2); // 主体交换机
  5.     private final int type;
  6.     private ExchangeType(int type){
  7.         this.type = type;
  8.     }
  9.     public int getType(){
  10.         return type;
  11.     }
  12. }
复制代码
创建 Queue 类

  1. public class MSGQueue {
  2. // 表示队列的身份标识
  3.     private String name;
  4.     // 表示队列是否持久化, true 表示需要持久化, false表示不需要持久化
  5.     private boolean durable = false;
  6.     // 这个属性为 true, 表示这个队列只能被一个消费者使用(别人用不了). 如果为 false 则是大家都能使用
  7.     private boolean exclusive = false;
  8.     // 为 true 表示没有人使用之后, 就自动删除. false 则是不会自动删除.
  9.     private boolean autoDelete = false;
  10.     // 为了把这个 arguments 存到数据库中,  就需要把 Map 转成 json 格式的字符串.
  11.     private Map<String , Object> arguments = new HashMap<>();
  12.     // 当前队列都有哪些消费者订阅了.
  13.     private List<ConsumerEnv> consumerEnvList = new ArrayList<>();
  14.     // 记录当前取到了第几个消费者. 方便实现轮询策略.
  15.     private AtomicInteger consumerSeq = new AtomicInteger(0);
  16. }
复制代码
创建 Binding 类

  1. public class Binding {
  2.     private String exchangeName;
  3.     private String queueName;
  4.     // 主题交换机的 key
  5.     private String bindingKey;
  6. }
复制代码
创建 Message 类

  1. public class Message implements Serializable {
  2.     // 核心
  3.     private BasicProperties basicProperties = new BasicProperties(); // 消息的属性 集合成对象
  4.     private byte[] body; // 消息要传递的 数据
  5.     // 辅助用的属性
  6.     private transient long offsetBeg = 0; // 消息数据的 开头 距离文件开头的位置偏移(字节) / transient : 不可被序列化
  7.     private transient long offsetEnd = 0; // 消息数据的 结尾 距离文件开头的位置偏移(字节) / transient : 不可被序列化
  8.     // 该属性表示该消息在文件中是否有效。(逻辑删除)
  9.     private byte isValid   = 0x1; // 0x1 表示有效, 0x0 表示无效
  10.     // 使用工厂方法 封装创建 Message 对象的过程,
  11.     public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body){
  12.         Message message = new Message();
  13.         if (basicProperties != null){
  14.             message.setBasicProperties(basicProperties);
  15.         }
  16.         // 此处生成的 MessageId 以 M- 作为前缀.
  17.         message.setMessageId("M-" + UUID.randomUUID()); // 这个方法中创建的 Message 对象, 会自动生成唯一的 MessageId
  18.         message.setRoutingKey(routingKey); //routingKey 和 basicProperties 里的 routingKey 冲突, 以外面的为主.
  19.         message.setBody(body);
  20.         // 此处是把 body 和 basicProperties 先设置出来. 他俩是 Message 的核心内容.
  21.         // 而 offsetBeg, offsetEnd, isValid, 则是消息持久化的时候才会用到. 在把消息写入文件之前再进行设定.
  22.         // 此处只是在内存中创建一个 Message 对象.
  23.         return message;
  24.     }
复制代码
硬盘管理

硬盘管理主要是为了实现持久化,保证在服务器关机,重启之后重要的消息不会丢失
数据库管理

数据库主要是用来管理,互换机,队列,绑定的持久化操作的,因为这些是体系的焦点功能必要频繁的增删查操作,利用 SQLite 数据库 + MyBatis 框架可以更好管理这些数据。
互换机存储
  1. private void createDefaultData() {
  2.         Exchange exchange = new Exchange();
  3.         exchange.setName("");
  4.         exchange.setType(ExchangeType.DIRECT);
  5.         exchange.setDurable(true);
  6.         exchange.setAutoDelete(false);
  7.         metaMapper.insertExchange(exchange);
  8.         System.out.println("[DataBaseManager] 创建初始数据完成!");
  9.     }
  10.     // exchangeTable 的查找、插入和删除
  11.      public  List<Exchange> selectAllexchanges(){
  12.         return metaMapper.selectAllexchanges();
  13.      }
  14.      public void insertExchange(Exchange exchange){
  15.          metaMapper.insertExchange(exchange);
  16.      }
  17.     public void deleteExchange(String exchangeName){
  18.         metaMapper.deleteExchange(exchangeName);
  19.     }
复制代码
队列存储
  1. // QueueTable 的查找、插入和删除
  2.     public  void insertQueue(MSGQueue queue){
  3.         metaMapper.insertQueue(queue);
  4.     }
  5.     public void deleteQueue(String queueName){
  6.         metaMapper.deleteQueue(queueName);
  7.     }
  8.    public List<MSGQueue> selectAllQueues(){
  9.         return metaMapper.selectAllQueues();
  10.    }
复制代码
绑定存储
  1. // BindingTable 的查找、插入和删除
  2.     public void insertBinding(Binding binding){
  3.         metaMapper.insertBinding(binding);
  4.     }
  5.     public void deleteBinding(Binding binding){
  6.         metaMapper.deleteBinding(binding);
  7.     }
  8.     public List<Binding> selectAllBindings(){
  9.         return metaMapper.selectAllBindings();
  10.     }
复制代码
文件管理

   消息必要在硬盘上存储. 但是并不直接放到数据库中, ⽽是直接使⽤⽂件存储。
  缘故原由如下:

  • 对于消息的操作并不必要复杂的 增编削查 .
  • 对于⽂件的操作效率⽐数据库会⾼很多
给每个队列分配⼀个⽬录. ⽬录的名字为 data + 队列名. 形如 ./data/testQueue 该⽬录中包含两个固定名字的⽂件


  • queue_data.txt 消息数据⽂件, ⽤来保存消息内容.
  •  queue_stat.txt 消息统计⽂件, ⽤来保存消息统计信息.
queue_data.txt ⽂件格式: 使⽤⼆进制⽅式存储. 每个消息分成两个部分:


  •  前四个字节, 表⽰ Message 对象的⻓度(字节数)
  • 后⾯若⼲字节, 表⽰ Message 内容.
  • 消息和消息之间⾸尾相连.
每个 Message 基于 Java 尺度库的 ObjectInputStream / ObjectOutputStream 序列化
   Message 对象中的 offsetBeg 和 offsetEnd 正是⽤来描述每个消息体所在的位置.
  

queue_stat.txt ⽂件格式: 使⽤⽂本⽅式存储. ⽂件中只包含⼀⾏, ⾥⾯包含两列(都是整数), 使⽤ \t 分割. 第⼀列表⽰当前总的消息数⽬. 第⼆列表⽰有效消息数⽬. 比方:
   2000\t1500
  消息内容存储

  1. // 这个方法用来把一个新 消息(Message)放到对应的队列中去
  2.     public void sendMessage(@NotNull MSGQueue queue, Message message) throws MessageQueueException, IOException {
  3.             // 检查当前要写入的队列对应的文件是否存在
  4.             if (!(checkQueueAndFileExits(queue.getName()))){
  5.                 throw new MessageQueueException("[MessageFileManager] 队列对应的文件不存在! queueName = " + queue.getName());
  6.             }
  7.             // 把 Message 对象,进行序列化,转成二进制的字节数组
  8.             byte[] messageBinary = BinaryTool.toBytes(message);
  9.             synchronized (queue) { // 通过对 同一个队列加锁, 保证在读写文件多线程操作时,数据的一致性和准确性
  10.                 // 获取到当前的队列数据文件的长度,计算出该 Message 对象的 offsetBeg 和 offsetEnd
  11.                 // 通过方法 queueDataFile.length() 就能获取到文件的长度. 单位字节.
  12.                 File queueDataFile = new File(getQueueDataPath(queue.getName())); // 获取到 队列的数据文件
  13.                 message.setOffsetBeg(queueDataFile.length() + 4); // 把新的 Message 数据, 写入到队列数据文件的末尾. 此时 Message 对象的 offsetBeg , 就是当前文件长度 + 4
  14.                 message.setOffsetEnd(queueDataFile.length() + messageBinary.length + 4); // offsetEnd 就是当前文件长度 + 4 + message 自身长度.
  15.                 // 写入消息到数据文件中,是追加到数据文件末尾
  16.                 try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {
  17.                     try (DataOutputStream dataOutputStream =new DataOutputStream(outputStream)){
  18.                         // 接下来要先写当前消息的长度, 占据 4 个字节的~~
  19.                         dataOutputStream.writeInt(messageBinary.length);
  20.                         // 写入消息本体
  21.                         dataOutputStream.write(messageBinary);
  22.                     }
  23.                 }
  24.                 // 更新消息统计文件
  25.                 Stat stat = readStat(queue.getName());
  26.                 stat.totalCount += 1;
  27.                 stat.validCount += 1;
  28.                 writeStat(queue.getName(), stat);
  29.             }
  30.     }
复制代码
消息统计信息存储

  1. private Stat readStat(String queueName){
  2.         Stat stat = new Stat();
  3.         try(InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))){
  4.             Scanner scanner = new Scanner(inputStream); // 因当前的消息统计文件是文本文件,所以使用 Scanner 来读取文件内容
  5.             stat.totalCount = scanner.nextInt();
  6.             stat.validCount = scanner.nextInt();
  7.             return stat;
  8.         }  catch (IOException e) {
  9.             e.printStackTrace();
  10.         }
  11.         return null;
  12.     }
  13.     private void writeStat(String queueName, Stat stat){
  14.         //  OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的.
  15.         try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))){
  16.             // 使用 PrintWriter 写文件
  17.             PrintWriter printWriter = new PrintWriter(outputStream);
  18.             printWriter.write(stat.totalCount + "\t" + stat.validCount);
  19.             printWriter.flush(); // 确保消息及时刷新到内存中去
  20.         } catch (IOException e) {
  21.             e.printStackTrace();
  22.         }
  23.     }
复制代码
垃圾采取

           因为信息的删除只是把消息在⽂件上标志成了⽆效. 并没有腾出硬盘空间,终极⽂件⼤⼩大概会越积越 多. 因此必要定期的进⾏批量扫除.
          此处使⽤类似于复制算法. 当总消息数凌驾 2000, 并且有效消息数⽬少于 50% 的时候, 就触发 GC. GC 的时候会把所有有效消息加载出来, 写⼊到⼀个新的消息⽂件中, 使⽤新⽂件, 取代旧⽂件即可.
  1. // 这个方法,使用复制算法来完成. 真正执行消息数据文件的垃圾回收操作.
  2.     public void gc(MSGQueue queue) throws MessageQueueException, IOException, ClassNotFoundException {
  3.         // 进行 gc 的时候, 是针对消息数据文件进行大洗牌. 在这个过程中, 其他线程不能针对该队列的消息文件做任何修改.
  4.         synchronized (queue){
  5.             // 由于 gc 操作可能比较耗时, 此处统计一下执行消耗的时间. gc 执行开始的时间
  6.             long gcBeg = System.currentTimeMillis();
  7.             // 创建一个新的文件, 名字就是 queue_data_new.txt
  8.             File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));
  9.             if (queueDataNewFile.exists()){
  10.                 // 正常情况下, 这个文件不应该存在. 如果存在, 就是意外~~ 说明上次 gc 了一半, 程序意外崩溃了.
  11.                 throw new MessageQueueException("[MessageFileManager] gc 时发现该队列的 queue_data_new 已经存在! queueName = " + queue.getName());
  12.             }
  13.             boolean creatOK = queueDataNewFile.createNewFile();
  14.             if (!creatOK){
  15.                 throw  new MessageQueueException("[MessageFileManager] 创建文件失败! queueDataNewFile = " + queueDataNewFile.getAbsolutePath());
  16.             }
  17.             // 从旧的文件中, 读取出所有的有效消息对象了
  18.             LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());
  19.             // 把有效消息, 写入到新的文件中.
  20.             try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)){
  21.                 try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
  22.                     for (Message message : messages){
  23.                         byte[] buffer = BinaryTool.toBytes(message);
  24.                         // 先写四个字节消息的长度
  25.                         dataOutputStream.writeInt(buffer.length);
  26.                         dataOutputStream.write(buffer);
  27.                     }
  28.                 }
  29.             }
  30.             // 删除旧的数据文件, 并且把新的文件进行重命名
  31.             File queueDataOldFile = new File(getQueueDataPath(queue.getName()));
  32.             boolean deleteOK = queueDataOldFile.delete();
  33.             if (!deleteOK) {
  34.                 throw  new MessageQueueException("[MessageFileManager] gc删除数据旧文件失败! queueDataOldFile = " +queueDataOldFile.getAbsolutePath());
  35.             }
  36.             // 把 queue_data_new.txt => queue_data.txt
  37.             boolean renameOK = queueDataNewFile.renameTo(queueDataOldFile);
  38.             if (!renameOK){
  39.                 throw  new MessageQueueException("[MessageFileManager] gc重命名信数据文件失败! queueDataNewFile = " +queueDataNewFile.getAbsolutePath() + "queueDataOldFile = " + queueDataOldFile.getAbsolutePath());
  40.             }
  41.             // 更新统计文件
  42.             Stat stat = readStat(queue.getName());
  43.             stat.totalCount = messages.size();
  44.             stat.validCount = messages.size();
  45.             writeStat(queue.getName(), stat);
  46.             // gc 执行结束的时间
  47.             long gcEnd = System.currentTimeMillis();
  48.             // gc 耗时多久
  49.             System.out.println("[MessageFileManager] gc 执行完毕! queueName = " + queue.getName() + ", time = "
  50.                     + (gcEnd - gcBeg) + "ms");
  51.         }
  52.     }
复制代码
内存管理

        硬盘上存储数据, 只是为了实现 "持久化" 这样的效果. 但是实际的消息存储/转发, 还是主要靠内存的结 构. 对于 消息队列来说, 内存部分是更关键的, 内存速度更快, 可以达成更⾼的并发.
互换机管理

  1. // 交换机 操作 Exchange
  2.     public void insertExchange(Exchange exchange){
  3.         exchangeMap.put(exchange.getName(), exchange);
  4.         System.out.println("[MemoryDataCenter] 新交换机添加成功! exchangeName = " + exchange.getName());
  5.     }
  6.     public Exchange getExchange(String exchangeName){
  7.        return exchangeMap.get(exchangeName);
  8.     }
  9.     public void deleteExchange(String exchangeName){
  10.         exchangeMap.remove(exchangeName);
  11.         System.out.println("[MemoryDataCenter] 交换机删除成功! exchangeName = " + exchangeName);
  12.     }
复制代码
队列管理

  1. // 队列 操作 MSGQueue
  2.     public void insertQueue(MSGQueue queue){
  3.         queueMap.put(queue.getName(), queue);
  4.         System.out.println("[MemoryDataCenter] 新队列添加成功! queueName = " + queue.getName());
  5.     }
  6.     public MSGQueue getQueue(String queueName){
  7.         return queueMap.get(queueName);
  8.     }
  9.     public void deleteQueue(String queueName){
  10.         queueMap.remove(queueName);
  11.         System.out.println("[MemoryDataCenter] 队列删除成功! queueName = " + queueName);
  12.     }
复制代码
绑定管理

  1. // 交换机和队列的绑定关系 操作, Binding
  2.     public void insertBinding(Binding binding) throws MessageQueueException {
  3.         ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), k -> new ConcurrentHashMap<>());
  4.         synchronized (bindingMap) {
  5.             // 再根据 queueName 查一下. 如果已经存在, 就抛出异常. 不存在才能插入.
  6.             if (bindingMap.get(binding.getQueueName()) != null) {
  7.                 throw new MessageQueueException("[MemoryDataCenter] 绑定已经存在! exchangeName = " + binding.getExchangeName() +
  8.                         ", queueName = " + binding.getQueueName());
  9.             }
  10.             bindingMap.put(binding.getQueueName(), binding);
  11.             System.out.println("[MemoryDataCenter] 新绑定添加成功! exchangeName = " + binding.getExchangeName()
  12.                     + ", queueName = " + binding.getQueueName());
  13.         }
  14.     }
  15.     // 获取绑定
  16.     // 根据 exchangeName 和 queueName 确定唯一一个 Binding
  17.     public Binding getBinding(String exchangeName, String queueName){
  18.         ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);
  19.         if (bindingMap == null){
  20.             return null;
  21.         }
  22.         return bindingMap.get(queueName);
  23.     }
  24.     // 根据 exchangeName 获取到所有的 Binding
  25.     public ConcurrentHashMap<String, Binding> getBindings(String exchangeName) {
  26.         return bindingsMap.get(exchangeName);
  27.     }
  28.     public void deleteBinding(Binding binding) throws MessageQueueException {
  29.         ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
  30.         if (bindingMap == null){
  31.             throw new MessageQueueException("[MemoryDataCenter] 绑定不存在! exchangeName = " + binding.getExchangeName()
  32.                     + ", queueName = " + binding.getQueueName());
  33.         }
  34.         bindingMap.remove(binding.getQueueName());
  35.         System.out.println("[MemoryDataCenter] 绑定删除成功! exchangeName = " + binding.getExchangeName()
  36.                 + ", queueName = " + binding.getQueueName());
  37.     }
复制代码
消息管理

  1. // 添加消息
  2.     public void addMessage(Message message) {
  3.         messageMap.put(message.getMessageId(), message);
  4.         System.out.println("[MemoryDataCenter] 新消息添加成功! messageId = " + message.getMessageId());
  5.     }
  6.     // 根据 id 查询消息
  7.     public Message getMessageById(String messageId) {
  8.         return  messageMap.get(messageId);
  9.     }
  10.     // 根据 id 删除消息
  11.     public void removeMessageById(String messageId) {
  12.         messageMap.remove(messageId);
  13.         System.out.println("[MemoryDataCenter] 消息被移除! messageId = " + messageId);
  14.     }
  15.     // 发送消息到指定队列
  16.     public void sendMessageToQueue(MSGQueue queue, Message message) { // 把消息放到对应的队列数据结构中.
  17.         LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), k -> new LinkedList<>());
  18.         // 再把数据加到 messages 里面
  19.         synchronized (message) {
  20.             messages.add(message);
  21.             // 把该消息也往消息中心中 存入一份
  22.             addMessage(message);
  23.         }
  24.         System.out.println("[MemoryDataCenter] 消息被投递到队列中! messageId = " + message.getMessageId());
  25.     }
  26.     // 从队列中取消息
  27.     public Message pollMessageFromQueue(String queueName) {
  28.         // 根据队列名, 查找一下, 对应的队列的消息链表.
  29.         LinkedList<Message> messages = queueMessageMap.get(queueName);
  30.         if (messages == null) {
  31.             return null;
  32.         }
  33.         synchronized (messages) {
  34.             if (messages.size() == 0) {
  35.                 // 如果没找到, 说明队列中没有任何消息.
  36.                 return null;
  37.             }
  38.             // 链表中有元素, 就进行头删
  39.             Message currentMessage = messages.remove();
  40.             System.out.println("[MemoryDataCenter] 消息从队列中取出! messageId = " + currentMessage.getMessageId());
  41.             return currentMessage;
  42.         }
  43.     }
复制代码
待确认消息管理

  1. // 添加未确认的消息
  2.     public void addMessageWaitAck(String queueName, Message message) {
  3.         // 从通过 队列名来在哈希表中查询 未确认的消息哈希表,如果没有 则创建一个
  4.         ConcurrentHashMap<String, Message> waitAckMessages  = queueMessageWaitAckMap.computeIfAbsent(queueName, k -> new ConcurrentHashMap<>());
  5.         // 添加未确认的消息
  6.         waitAckMessages.put(message.getMessageId(), message);
  7.         System.out.println("[MemoryDataCenter] 消息进入待确认队列! messageId = " + message.getMessageId());
  8.     }
  9.     // 删除未确认的消息(消息已经确认了)
  10.     public void removeMessageWaitAck(String queueName, String messageId) {
  11.         ConcurrentHashMap<String, Message> waitAckMessages = queueMessageWaitAckMap.get(queueName);
  12.         if (waitAckMessages == null){
  13.             return;
  14.         }
  15.         waitAckMessages.remove(messageId);
  16.         System.out.println("[MemoryDataCenter] 消息从待确认队列删除! messageId = " + messageId);
  17.     }
  18.     // 获取指定的未确认的消息
  19.     public Message getMessageWaitAck(String queueName, String messageId) {
  20.         ConcurrentHashMap<String, Message> waitAckMessages = queueMessageWaitAckMap.get(queueName);
  21.         if (waitAckMessages == null) {
  22.             return null;
  23.         }
  24.         return waitAckMessages.get(messageId);
  25.     }
复制代码
消息转发

  1. // 发送消息到指定的交换机/队列中.
  2.     public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {
  3.         // 转换交换机的名字
  4.         exchangeName = virtualHostName + exchangeName;
  5.         try {
  6.             // 检查 routingKey 是否合法.
  7.             if (!router.checkRoutingKey(routingKey)) {
  8.                 throw new MessageQueueException("[VirtualHost] routingKey 非法! routingKey=" + routingKey);
  9.             }
  10.             // 查找交换机对象
  11.             Exchange exchange = memoryDataCenter.getExchange(exchangeName);
  12.             if (exchange == null) {
  13.                 throw new MessageQueueException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);
  14.             }
  15.             // 判定交换机的类型
  16.             if (exchange.getType() == ExchangeType.DIRECT) {
  17.                 // 按照直接交换机的方式来转发消息
  18.                 // 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.
  19.                 // 此时, 可以无视绑定关系.
  20.                 String queueName = virtualHostName + routingKey;
  21.                 // 构造消息对象
  22.                 Message message = Message.createMessageWithId(routingKey, basicProperties, body);
  23.                 // 查找该队列名对应的对象
  24.                 MSGQueue queue = memoryDataCenter.getQueue(queueName);
  25.                 if (queue == null) {
  26.                     throw new MessageQueueException("[VirtualHost] 队列不存在! queueName=" + queueName);
  27.                 }
  28.                 // 队列存在, 直接给队列中写入消息
  29.                 sendMessage(queue, message);
  30.             } else {
  31.                 // 按照 fanout 和 topic 的方式来转发.
  32.                 // 找到该交换机关联的所有绑定, 并遍历这些绑定对象
  33.                 ConcurrentHashMap<String, Binding> bindings = memoryDataCenter.getBindings(exchangeName);
  34.                 for (Map.Entry<String, Binding> entry : bindings.entrySet()) {
  35.                     // 获取到绑定对象, 判定对应的队列是否存在
  36.                     Binding binding = entry.getValue();
  37.                     MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());
  38.                     if (queue == null) {
  39.                         // 此处咱们就不抛出异常了. 可能此处有多个这样的队列.
  40.                         // 希望不要因为一个队列的失败, 影响到其他队列的消息的传输.
  41.                         System.out.println("[VirtualHost] basicPublish 发送消息时, 发现队列不存在! queueName=" + binding.getQueueName());
  42.                         continue;
  43.                     }
  44.                     // 构造消息对象
  45.                     Message message = Message.createMessageWithId(routingKey, basicProperties, body);
  46.                     // 判定这个消息是否能转发给该队列.
  47.                     // 如果是 fanout, 所有绑定的队列都要转发的.
  48.                     // 如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配.
  49.                     if (!router.route(exchange.getType(), binding, message)) {
  50.                         continue;
  51.                     }
  52.                     // 真正转发消息给队列
  53.                     sendMessage(queue, message);
  54.                 }
  55.             }
  56.             return true;
  57.         } catch (Exception e) {
  58.             System.out.println("[VirtualHost] 消息发送失败!");
  59.             e.printStackTrace();
  60.             return false;
  61.         }
  62.     }
复制代码
服务器详细全部代码

客户端模块

生产者     

生产者的demo
消耗者

消耗者的demo
公共模块

序列化和反序列化

        主要是用来传输信息,因为我们进行网路通讯时是接纳的TCP 协议,以是在传输过程中,必要把数据转换成二进制代码
  1. public class BinaryTool {
  2.     // 用来验证代码序列化版本的 如果序列化后的文件想反序列化时,和文件中的不对 则不能反序列化,直接报错
  3.     private static final Long serialVersionUID = 1L; // 改动了序列化对象的代码 这个 序列化版本也需要改动
  4.     // 把一个对象序列化成一个字节数组
  5.     public static byte[] toBytes(Object object) throws IOException {
  6.         // ByteArrayOutputStream 这个流对象相当于一个变长的字节数组.
  7.         // 就可以把 object 序列化的数据给逐渐的写入到 byteArrayOutputStream 中, 再统一转成 byte[]
  8.         try (ByteArrayOutputStream  byteArrayOutputStream = new ByteArrayOutputStream()){
  9.             try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
  10.                 // 此处的 writeObject 就会把该对象进行序列化, 生成的二进制字节数据, 就会写入到
  11.                 // ObjectOutputStream 中.
  12.                 // 由于 ObjectOutputStream 又是关联到了 ByteArrayOutputStream, 最终结果就写入到 ByteArrayOutputStream 里了
  13.                 objectOutputStream.writeObject(object);
  14.             }
  15.             return byteArrayOutputStream.toByteArray();
  16.         }
  17.     }
  18.     // 把一个字节数组, 反序列化成一个对象
  19.     public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
  20.         Object object = null;
  21.         try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {
  22.             try (ObjectInputStream objectInputStream =new ObjectInputStream(byteArrayInputStream)){
  23.                 // 此处的 readObject, 就是从 data 这个 byte[] 中读取数据并进行反序列化.
  24.                 object = objectInputStream.readObject();
  25.             }
  26.         }
  27.         return object;
  28.     }
  29. }
复制代码




免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

宝塔山

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表