宝塔山 发表于 2024-12-24 10:05:48

Java实现 RabbitMQ 消息队列

       媒介:这是一篇利用 Java 来实现 RabbitMQ 消息队列 基本焦点功能的文章,比方:互换价,队列,绑定,消息的功能。
目录
一、RabbitMQ 是什么
二、需求分析
按模块划分:如下图
服务器模块(Broker)
互换机范例 (Exchange Type)
持久化
三、具体实现
焦点类:
创建 Exchange 类
互换机的范例
创建 Queue 类
创建 Binding 类
创建 Message 类
硬盘管理
数据库管理
文件管理
消息内容存储
消息统计信息存储
垃圾采取
内存管理
互换机管理
队列管理
绑定管理
消息管理
待确认消息管理
消息转发
服务器详细全部代码
客户端模块
生产者     
消耗者
公共模块
序列化和反序列化

一、RabbitMQ 是什么

        RabbitMQ是一个流行的开源消息队列体系,是AMQP(高级消息队列协议)尺度的实现,由以高性能、健壮、可伸缩性着名的Erlang语言开发,并继续了这些长处。
         RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。吸收端可以根据RabbitMQ设置的转发机制吸收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子体系间进行通讯,是分布式体系尺度的设置。
二、需求分析

按模块划分:如下图

https://i-blog.csdnimg.cn/direct/f6c8d0b38f61411a9feda441643a0347.png
服务器模块(Broker)

服务器模块是消息队列中最焦点的部分,负责消息的存储和转发,有以下概念:


[*]虚拟机 (VirtualHost): 类似于 MySQL 的 "database", 是⼀个逻辑上的聚集. ⼀个 BrokerServer 上可 以存在多个 VirtualHost.
[*]互换机 (Exchange): ⽣产者把消息先发送到 Broker 的 Exchange 上. 再根据不同的规则, 把消息转发 给不同的 Queue.
[*]队列 (Queue): 真正⽤来存储消息的部分. 每个消耗者决定⾃⼰从哪个 Queue 上读取消息. 
[*]绑定 (Binding): Exchange 和 Queue 之间的关联关系. Exchange 和 Queue 可以理解成 "多对多" 关 系. 使⽤⼀个关联表就可以把这两个概念接洽起来.
[*]消息 (Message): 传递的内容.
   所谓的 Exchange 和 Queue 可以理解成 "多对多" 关系, 和数据库中的 "多对多" ⼀样. 意思是: ⼀个 Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息). ⼀个 Queue 也可以被多个 Exchange 绑定 (⼀个 Queue 中的消息可以来⾃于多个 Exchange).
https://i-blog.csdnimg.cn/direct/881c8b915c234eb39d4dd0b8ae8f85eb.png
互换机范例 (Exchange Type)

这里主要实现 RabbitMQ 的三种互换机范例:


[*]Direct: ⽣产者发送消息时, 直接指定被该互换机绑定的队列名.
[*]Fanout: ⽣产者发送的消息会被复制到该互换机的所有队列中.
[*]Topic: 绑定队列到互换机上时, 指定⼀个字符串为 bindingKey. 发送消息指定⼀个字符串为 routingKey. 当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列.
持久化

        Exchange, Queue, Binding, Message都有在硬盘和数据库中实现持久化的需求,当步伐重启 / 主机重启, 保证上述内容不丢失。
三、具体实现

首先实现 Broker 模块的焦点类,也就是 Exchange, Queue, Binding, Message 等焦点
焦点类:

创建 Exchange 类



public class Exchange {
    // 此处使用 name 来作为交换机的身份标识(唯一的)
    private String name;
    // 交换机类型, DIRECT, FANOUT, TOPIC
    private ExchangeType type = ExchangeType.DIRECT;
    // 该交换机是否需要持久化存储, true 代表需要持久化, false 表示不需要持久化
    private boolean durable = false;


    // 如果当前交换机, 没人使用了, 就会自动被删除. 这个属性暂时先列在这里, 后续的代码中并没有真的实现这个自动删除功能~~ (RabbitMQ 是有的)
    private boolean autoDelete = false;

    // arguments 表示的是创建交换机时指定的一些额外的参数选项.
    private Map<String, Object> arguments = new HashMap<>();// 为了把这个 arguments 存到数据库中, 就需要把 Map 转成 json 格式的字符串.
}
互换机的范例

public enum ExchangeType {
    DIRECT(0), // 直接交换机
    FANOUT(1), // 扇出交换机
    TOPIC(2); // 主体交换机

    private final int type;
    private ExchangeType(int type){
      this.type = type;
    }

    public int getType(){
      return type;
    }


} 创建 Queue 类

public class MSGQueue {
// 表示队列的身份标识
    private String name;
    // 表示队列是否持久化, true 表示需要持久化, false表示不需要持久化
    private boolean durable = false;


    // 这个属性为 true, 表示这个队列只能被一个消费者使用(别人用不了). 如果为 false 则是大家都能使用
    private boolean exclusive = false;
    // 为 true 表示没有人使用之后, 就自动删除. false 则是不会自动删除.
    private boolean autoDelete = false;

    // 为了把这个 arguments 存到数据库中,就需要把 Map 转成 json 格式的字符串.
    private Map<String , Object> arguments = new HashMap<>();

    // 当前队列都有哪些消费者订阅了.
    private List<ConsumerEnv> consumerEnvList = new ArrayList<>();
    // 记录当前取到了第几个消费者. 方便实现轮询策略.
    private AtomicInteger consumerSeq = new AtomicInteger(0);
}
创建 Binding 类

public class Binding {

    private String exchangeName;
    private String queueName;
    // 主题交换机的 key
    private String bindingKey;
} 创建 Message 类

public class Message implements Serializable {
    // 核心
    private BasicProperties basicProperties = new BasicProperties(); // 消息的属性 集合成对象
    private byte[] body; // 消息要传递的 数据

    // 辅助用的属性
    private transient long offsetBeg = 0; // 消息数据的 开头 距离文件开头的位置偏移(字节) / transient : 不可被序列化
    private transient long offsetEnd = 0; // 消息数据的 结尾 距离文件开头的位置偏移(字节) / transient : 不可被序列化
    // 该属性表示该消息在文件中是否有效。(逻辑删除)
    private byte isValid   = 0x1; // 0x1 表示有效, 0x0 表示无效


    // 使用工厂方法 封装创建 Message 对象的过程,
    public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body){
      Message message = new Message();
      if (basicProperties != null){
            message.setBasicProperties(basicProperties);
      }
      // 此处生成的 MessageId 以 M- 作为前缀.
      message.setMessageId("M-" + UUID.randomUUID()); // 这个方法中创建的 Message 对象, 会自动生成唯一的 MessageId
      message.setRoutingKey(routingKey); //routingKey 和 basicProperties 里的 routingKey 冲突, 以外面的为主.
      message.setBody(body);
      // 此处是把 body 和 basicProperties 先设置出来. 他俩是 Message 的核心内容.
      // 而 offsetBeg, offsetEnd, isValid, 则是消息持久化的时候才会用到. 在把消息写入文件之前再进行设定.
      // 此处只是在内存中创建一个 Message 对象.

      return message;

    } 硬盘管理

硬盘管理主要是为了实现持久化,保证在服务器关机,重启之后重要的消息不会丢失
数据库管理

数据库主要是用来管理,互换机,队列,绑定的持久化操作的,因为这些是体系的焦点功能必要频繁的增删查操作,利用 SQLite 数据库 + MyBatis 框架可以更好管理这些数据。
互换机存储
private void createDefaultData() {
      Exchange exchange = new Exchange();
      exchange.setName("");
      exchange.setType(ExchangeType.DIRECT);
      exchange.setDurable(true);
      exchange.setAutoDelete(false);
      metaMapper.insertExchange(exchange);
      System.out.println(" 创建初始数据完成!");

    }

    // exchangeTable 的查找、插入和删除
   publicList<Exchange> selectAllexchanges(){
      return metaMapper.selectAllexchanges();
   }
   public void insertExchange(Exchange exchange){
         metaMapper.insertExchange(exchange);
   }
    public void deleteExchange(String exchangeName){
      metaMapper.deleteExchange(exchangeName);
    }
队列存储
// QueueTable 的查找、插入和删除
    publicvoid insertQueue(MSGQueue queue){
      metaMapper.insertQueue(queue);
    }
    public void deleteQueue(String queueName){
      metaMapper.deleteQueue(queueName);
    }
   public List<MSGQueue> selectAllQueues(){
      return metaMapper.selectAllQueues();
   } 绑定存储
// BindingTable 的查找、插入和删除
    public void insertBinding(Binding binding){
      metaMapper.insertBinding(binding);
    }
    public void deleteBinding(Binding binding){
      metaMapper.deleteBinding(binding);
    }
    public List<Binding> selectAllBindings(){
      return metaMapper.selectAllBindings();
    } 文件管理

   消息必要在硬盘上存储. 但是并不直接放到数据库中, ⽽是直接使⽤⽂件存储。
缘故原由如下:

[*]对于消息的操作并不必要复杂的 增编削查 .
[*]对于⽂件的操作效率⽐数据库会⾼很多
给每个队列分配⼀个⽬录. ⽬录的名字为 data + 队列名. 形如 ./data/testQueue 该⽬录中包含两个固定名字的⽂件


[*]queue_data.txt 消息数据⽂件, ⽤来保存消息内容.
[*] queue_stat.txt 消息统计⽂件, ⽤来保存消息统计信息.
queue_data.txt ⽂件格式: 使⽤⼆进制⽅式存储. 每个消息分成两个部分:


[*] 前四个字节, 表⽰ Message 对象的⻓度(字节数)
[*]后⾯若⼲字节, 表⽰ Message 内容.
[*]消息和消息之间⾸尾相连.
每个 Message 基于 Java 尺度库的 ObjectInputStream / ObjectOutputStream 序列化
   Message 对象中的 offsetBeg 和 offsetEnd 正是⽤来描述每个消息体所在的位置.
https://i-blog.csdnimg.cn/direct/97d109f304144add907801de18658395.png
queue_stat.txt ⽂件格式: 使⽤⽂本⽅式存储. ⽂件中只包含⼀⾏, ⾥⾯包含两列(都是整数), 使⽤ \t 分割. 第⼀列表⽰当前总的消息数⽬. 第⼆列表⽰有效消息数⽬. 比方:
   2000\t1500
消息内容存储

// 这个方法用来把一个新 消息(Message)放到对应的队列中去
    public void sendMessage(@NotNull MSGQueue queue, Message message) throws MessageQueueException, IOException {
            // 检查当前要写入的队列对应的文件是否存在
            if (!(checkQueueAndFileExits(queue.getName()))){
                throw new MessageQueueException(" 队列对应的文件不存在! queueName = " + queue.getName());
            }

            // 把 Message 对象,进行序列化,转成二进制的字节数组
            byte[] messageBinary = BinaryTool.toBytes(message);
            synchronized (queue) { // 通过对 同一个队列加锁, 保证在读写文件多线程操作时,数据的一致性和准确性
                // 获取到当前的队列数据文件的长度,计算出该 Message 对象的 offsetBeg 和 offsetEnd
                // 通过方法 queueDataFile.length() 就能获取到文件的长度. 单位字节.
                File queueDataFile = new File(getQueueDataPath(queue.getName())); // 获取到 队列的数据文件
                message.setOffsetBeg(queueDataFile.length() + 4); // 把新的 Message 数据, 写入到队列数据文件的末尾. 此时 Message 对象的 offsetBeg , 就是当前文件长度 + 4
                message.setOffsetEnd(queueDataFile.length() + messageBinary.length + 4); // offsetEnd 就是当前文件长度 + 4 + message 自身长度.

                // 写入消息到数据文件中,是追加到数据文件末尾
                try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {
                  try (DataOutputStream dataOutputStream =new DataOutputStream(outputStream)){
                        // 接下来要先写当前消息的长度, 占据 4 个字节的~~
                        dataOutputStream.writeInt(messageBinary.length);
                        // 写入消息本体
                        dataOutputStream.write(messageBinary);

                  }
                }

                // 更新消息统计文件
                Stat stat = readStat(queue.getName());
                stat.totalCount += 1;
                stat.validCount += 1;
                writeStat(queue.getName(), stat);
            }


    } 消息统计信息存储

private Stat readStat(String queueName){
      Stat stat = new Stat();
      try(InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))){
            Scanner scanner = new Scanner(inputStream); // 因当前的消息统计文件是文本文件,所以使用 Scanner 来读取文件内容
            stat.totalCount = scanner.nextInt();
            stat.validCount = scanner.nextInt();
            return stat;
      }catch (IOException e) {
            e.printStackTrace();
      }
      return null;
    }

    private void writeStat(String queueName, Stat stat){
      //OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的.
      try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))){
            // 使用 PrintWriter 写文件
            PrintWriter printWriter = new PrintWriter(outputStream);
            printWriter.write(stat.totalCount + "\t" + stat.validCount);
            printWriter.flush(); // 确保消息及时刷新到内存中去
      } catch (IOException e) {
            e.printStackTrace();
      }
    } 垃圾采取

           因为信息的删除只是把消息在⽂件上标志成了⽆效. 并没有腾出硬盘空间,终极⽂件⼤⼩大概会越积越 多. 因此必要定期的进⾏批量扫除.
        此处使⽤类似于复制算法. 当总消息数凌驾 2000, 并且有效消息数⽬少于 50% 的时候, 就触发 GC. GC 的时候会把所有有效消息加载出来, 写⼊到⼀个新的消息⽂件中, 使⽤新⽂件, 取代旧⽂件即可.
// 这个方法,使用复制算法来完成. 真正执行消息数据文件的垃圾回收操作.
    public void gc(MSGQueue queue) throws MessageQueueException, IOException, ClassNotFoundException {
      // 进行 gc 的时候, 是针对消息数据文件进行大洗牌. 在这个过程中, 其他线程不能针对该队列的消息文件做任何修改.
      synchronized (queue){
            // 由于 gc 操作可能比较耗时, 此处统计一下执行消耗的时间. gc 执行开始的时间
            long gcBeg = System.currentTimeMillis();
            // 创建一个新的文件, 名字就是 queue_data_new.txt
            File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));
            if (queueDataNewFile.exists()){
                // 正常情况下, 这个文件不应该存在. 如果存在, 就是意外~~ 说明上次 gc 了一半, 程序意外崩溃了.
                throw new MessageQueueException(" gc 时发现该队列的 queue_data_new 已经存在! queueName = " + queue.getName());
            }
            boolean creatOK = queueDataNewFile.createNewFile();
            if (!creatOK){
                thrownew MessageQueueException(" 创建文件失败! queueDataNewFile = " + queueDataNewFile.getAbsolutePath());
            }

            // 从旧的文件中, 读取出所有的有效消息对象了
            LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());

            // 把有效消息, 写入到新的文件中.
            try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)){
                try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
                  for (Message message : messages){
                        byte[] buffer = BinaryTool.toBytes(message);
                        // 先写四个字节消息的长度
                        dataOutputStream.writeInt(buffer.length);
                        dataOutputStream.write(buffer);
                  }
                }
            }

            // 删除旧的数据文件, 并且把新的文件进行重命名
            File queueDataOldFile = new File(getQueueDataPath(queue.getName()));
            boolean deleteOK = queueDataOldFile.delete();
            if (!deleteOK) {
                thrownew MessageQueueException(" gc删除数据旧文件失败! queueDataOldFile = " +queueDataOldFile.getAbsolutePath());
            }

            // 把 queue_data_new.txt => queue_data.txt
            boolean renameOK = queueDataNewFile.renameTo(queueDataOldFile);
            if (!renameOK){
                thrownew MessageQueueException(" gc重命名信数据文件失败! queueDataNewFile = " +queueDataNewFile.getAbsolutePath() + "queueDataOldFile = " + queueDataOldFile.getAbsolutePath());
            }

            // 更新统计文件
            Stat stat = readStat(queue.getName());
            stat.totalCount = messages.size();
            stat.validCount = messages.size();
            writeStat(queue.getName(), stat);

            // gc 执行结束的时间
            long gcEnd = System.currentTimeMillis();
            // gc 耗时多久
            System.out.println(" gc 执行完毕! queueName = " + queue.getName() + ", time = "
                  + (gcEnd - gcBeg) + "ms");
      }
    } 内存管理

        硬盘上存储数据, 只是为了实现 "持久化" 这样的效果. 但是实际的消息存储/转发, 还是主要靠内存的结 构. 对于 消息队列来说, 内存部分是更关键的, 内存速度更快, 可以达成更⾼的并发.
互换机管理

// 交换机 操作 Exchange
    public void insertExchange(Exchange exchange){
      exchangeMap.put(exchange.getName(), exchange);
      System.out.println(" 新交换机添加成功! exchangeName = " + exchange.getName());
    }

    public Exchange getExchange(String exchangeName){
       return exchangeMap.get(exchangeName);
    }

    public void deleteExchange(String exchangeName){
      exchangeMap.remove(exchangeName);
      System.out.println(" 交换机删除成功! exchangeName = " + exchangeName);
    }
队列管理

// 队列 操作 MSGQueue
    public void insertQueue(MSGQueue queue){
      queueMap.put(queue.getName(), queue);
      System.out.println(" 新队列添加成功! queueName = " + queue.getName());
    }

    public MSGQueue getQueue(String queueName){
      return queueMap.get(queueName);
    }

    public void deleteQueue(String queueName){
      queueMap.remove(queueName);
      System.out.println(" 队列删除成功! queueName = " + queueName);
    } 绑定管理

// 交换机和队列的绑定关系 操作, Binding
    public void insertBinding(Binding binding) throws MessageQueueException {
      ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), k -> new ConcurrentHashMap<>());

      synchronized (bindingMap) {
            // 再根据 queueName 查一下. 如果已经存在, 就抛出异常. 不存在才能插入.
            if (bindingMap.get(binding.getQueueName()) != null) {
                throw new MessageQueueException(" 绑定已经存在! exchangeName = " + binding.getExchangeName() +
                        ", queueName = " + binding.getQueueName());
            }

            bindingMap.put(binding.getQueueName(), binding);
            System.out.println(" 新绑定添加成功! exchangeName = " + binding.getExchangeName()
                  + ", queueName = " + binding.getQueueName());
      }
    }

    // 获取绑定
    // 根据 exchangeName 和 queueName 确定唯一一个 Binding
    public Binding getBinding(String exchangeName, String queueName){
      ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);
      if (bindingMap == null){
            return null;
      }
      return bindingMap.get(queueName);
    }

    // 根据 exchangeName 获取到所有的 Binding
    public ConcurrentHashMap<String, Binding> getBindings(String exchangeName) {
      return bindingsMap.get(exchangeName);
    }

    public void deleteBinding(Binding binding) throws MessageQueueException {
      ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
      if (bindingMap == null){
            throw new MessageQueueException(" 绑定不存在! exchangeName = " + binding.getExchangeName()
                  + ", queueName = " + binding.getQueueName());
      }

      bindingMap.remove(binding.getQueueName());
      System.out.println(" 绑定删除成功! exchangeName = " + binding.getExchangeName()
                + ", queueName = " + binding.getQueueName());
    } 消息管理

// 添加消息
    public void addMessage(Message message) {
      messageMap.put(message.getMessageId(), message);
      System.out.println(" 新消息添加成功! messageId = " + message.getMessageId());
    }
    // 根据 id 查询消息
    public Message getMessageById(String messageId) {
      returnmessageMap.get(messageId);
    }
    // 根据 id 删除消息
    public void removeMessageById(String messageId) {
      messageMap.remove(messageId);
      System.out.println(" 消息被移除! messageId = " + messageId);
    }
    // 发送消息到指定队列
    public void sendMessageToQueue(MSGQueue queue, Message message) { // 把消息放到对应的队列数据结构中.
      LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), k -> new LinkedList<>());
      // 再把数据加到 messages 里面
      synchronized (message) {
            messages.add(message);
            // 把该消息也往消息中心中 存入一份
            addMessage(message);
      }
      System.out.println(" 消息被投递到队列中! messageId = " + message.getMessageId());
    }

    // 从队列中取消息
    public Message pollMessageFromQueue(String queueName) {
      // 根据队列名, 查找一下, 对应的队列的消息链表.
      LinkedList<Message> messages = queueMessageMap.get(queueName);
      if (messages == null) {
            return null;
      }
      synchronized (messages) {
            if (messages.size() == 0) {
                // 如果没找到, 说明队列中没有任何消息.
                return null;
            }
            // 链表中有元素, 就进行头删
            Message currentMessage = messages.remove();
            System.out.println(" 消息从队列中取出! messageId = " + currentMessage.getMessageId());
            return currentMessage;
      }
    } 待确认消息管理

// 添加未确认的消息
    public void addMessageWaitAck(String queueName, Message message) {
      // 从通过 队列名来在哈希表中查询 未确认的消息哈希表,如果没有 则创建一个
      ConcurrentHashMap<String, Message> waitAckMessages= queueMessageWaitAckMap.computeIfAbsent(queueName, k -> new ConcurrentHashMap<>());
      // 添加未确认的消息
      waitAckMessages.put(message.getMessageId(), message);
      System.out.println(" 消息进入待确认队列! messageId = " + message.getMessageId());
    }

    // 删除未确认的消息(消息已经确认了)
    public void removeMessageWaitAck(String queueName, String messageId) {
      ConcurrentHashMap<String, Message> waitAckMessages = queueMessageWaitAckMap.get(queueName);
      if (waitAckMessages == null){
            return;
      }
      waitAckMessages.remove(messageId);
      System.out.println(" 消息从待确认队列删除! messageId = " + messageId);
    }

    // 获取指定的未确认的消息
    public Message getMessageWaitAck(String queueName, String messageId) {
      ConcurrentHashMap<String, Message> waitAckMessages = queueMessageWaitAckMap.get(queueName);
      if (waitAckMessages == null) {
            return null;
      }
      return waitAckMessages.get(messageId);
    } 消息转发

// 发送消息到指定的交换机/队列中.
    public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {
      // 转换交换机的名字
      exchangeName = virtualHostName + exchangeName;
      try {
            // 检查 routingKey 是否合法.
            if (!router.checkRoutingKey(routingKey)) {
                throw new MessageQueueException(" routingKey 非法! routingKey=" + routingKey);
            }
            // 查找交换机对象
            Exchange exchange = memoryDataCenter.getExchange(exchangeName);
            if (exchange == null) {
                throw new MessageQueueException(" 交换机不存在! exchangeName=" + exchangeName);
            }

            // 判定交换机的类型
            if (exchange.getType() == ExchangeType.DIRECT) {
                // 按照直接交换机的方式来转发消息
                // 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.
                // 此时, 可以无视绑定关系.
                String queueName = virtualHostName + routingKey;
                // 构造消息对象
                Message message = Message.createMessageWithId(routingKey, basicProperties, body);
                // 查找该队列名对应的对象
                MSGQueue queue = memoryDataCenter.getQueue(queueName);
                if (queue == null) {
                  throw new MessageQueueException(" 队列不存在! queueName=" + queueName);
                }
                // 队列存在, 直接给队列中写入消息
                sendMessage(queue, message);
            } else {
                // 按照 fanout 和 topic 的方式来转发.
                // 找到该交换机关联的所有绑定, 并遍历这些绑定对象
                ConcurrentHashMap<String, Binding> bindings = memoryDataCenter.getBindings(exchangeName);
                for (Map.Entry<String, Binding> entry : bindings.entrySet()) {
                  // 获取到绑定对象, 判定对应的队列是否存在
                  Binding binding = entry.getValue();
                  MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());
                  if (queue == null) {
                        // 此处咱们就不抛出异常了. 可能此处有多个这样的队列.
                        // 希望不要因为一个队列的失败, 影响到其他队列的消息的传输.
                        System.out.println(" basicPublish 发送消息时, 发现队列不存在! queueName=" + binding.getQueueName());
                        continue;
                  }
                  // 构造消息对象
                  Message message = Message.createMessageWithId(routingKey, basicProperties, body);
                  // 判定这个消息是否能转发给该队列.
                  // 如果是 fanout, 所有绑定的队列都要转发的.
                  // 如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配.
                  if (!router.route(exchange.getType(), binding, message)) {
                        continue;
                  }
                  // 真正转发消息给队列
                  sendMessage(queue, message);
                }
            }
            return true;
      } catch (Exception e) {
            System.out.println(" 消息发送失败!");
            e.printStackTrace();
            return false;
      }

    } 服务器详细全部代码

客户端模块

生产者     

生产者的demo
消耗者

消耗者的demo
公共模块

序列化和反序列化

        主要是用来传输信息,因为我们进行网路通讯时是接纳的TCP 协议,以是在传输过程中,必要把数据转换成二进制代码
public class BinaryTool {
    // 用来验证代码序列化版本的 如果序列化后的文件想反序列化时,和文件中的不对 则不能反序列化,直接报错
    private static final Long serialVersionUID = 1L; // 改动了序列化对象的代码 这个 序列化版本也需要改动


    // 把一个对象序列化成一个字节数组
    public static byte[] toBytes(Object object) throws IOException {
      // ByteArrayOutputStream 这个流对象相当于一个变长的字节数组.
      // 就可以把 object 序列化的数据给逐渐的写入到 byteArrayOutputStream 中, 再统一转成 byte[]
      try (ByteArrayOutputStreambyteArrayOutputStream = new ByteArrayOutputStream()){
            try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
                // 此处的 writeObject 就会把该对象进行序列化, 生成的二进制字节数据, 就会写入到
                // ObjectOutputStream 中.
                // 由于 ObjectOutputStream 又是关联到了 ByteArrayOutputStream, 最终结果就写入到 ByteArrayOutputStream 里了
                objectOutputStream.writeObject(object);
            }
            return byteArrayOutputStream.toByteArray();
      }
    }

    // 把一个字节数组, 反序列化成一个对象
    public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
      Object object = null;
      try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {
            try (ObjectInputStream objectInputStream =new ObjectInputStream(byteArrayInputStream)){
                // 此处的 readObject, 就是从 data 这个 byte[] 中读取数据并进行反序列化.
                object = objectInputStream.readObject();
            }

      }
      return object;
    }
}



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Java实现 RabbitMQ 消息队列