Java实现 RabbitMQ 消息队列
媒介:这是一篇利用 Java 来实现 RabbitMQ 消息队列 基本焦点功能的文章,比方:互换价,队列,绑定,消息的功能。目录
一、RabbitMQ 是什么
二、需求分析
按模块划分:如下图
服务器模块(Broker)
互换机范例 (Exchange Type)
持久化
三、具体实现
焦点类:
创建 Exchange 类
互换机的范例
创建 Queue 类
创建 Binding 类
创建 Message 类
硬盘管理
数据库管理
文件管理
消息内容存储
消息统计信息存储
垃圾采取
内存管理
互换机管理
队列管理
绑定管理
消息管理
待确认消息管理
消息转发
服务器详细全部代码
客户端模块
生产者
消耗者
公共模块
序列化和反序列化
一、RabbitMQ 是什么
RabbitMQ是一个流行的开源消息队列体系,是AMQP(高级消息队列协议)尺度的实现,由以高性能、健壮、可伸缩性着名的Erlang语言开发,并继续了这些长处。
RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。吸收端可以根据RabbitMQ设置的转发机制吸收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子体系间进行通讯,是分布式体系尺度的设置。
二、需求分析
按模块划分:如下图
https://i-blog.csdnimg.cn/direct/f6c8d0b38f61411a9feda441643a0347.png
服务器模块(Broker)
服务器模块是消息队列中最焦点的部分,负责消息的存储和转发,有以下概念:
[*]虚拟机 (VirtualHost): 类似于 MySQL 的 "database", 是⼀个逻辑上的聚集. ⼀个 BrokerServer 上可 以存在多个 VirtualHost.
[*]互换机 (Exchange): ⽣产者把消息先发送到 Broker 的 Exchange 上. 再根据不同的规则, 把消息转发 给不同的 Queue.
[*]队列 (Queue): 真正⽤来存储消息的部分. 每个消耗者决定⾃⼰从哪个 Queue 上读取消息.
[*]绑定 (Binding): Exchange 和 Queue 之间的关联关系. Exchange 和 Queue 可以理解成 "多对多" 关 系. 使⽤⼀个关联表就可以把这两个概念接洽起来.
[*]消息 (Message): 传递的内容.
所谓的 Exchange 和 Queue 可以理解成 "多对多" 关系, 和数据库中的 "多对多" ⼀样. 意思是: ⼀个 Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息). ⼀个 Queue 也可以被多个 Exchange 绑定 (⼀个 Queue 中的消息可以来⾃于多个 Exchange).
https://i-blog.csdnimg.cn/direct/881c8b915c234eb39d4dd0b8ae8f85eb.png
互换机范例 (Exchange Type)
这里主要实现 RabbitMQ 的三种互换机范例:
[*]Direct: ⽣产者发送消息时, 直接指定被该互换机绑定的队列名.
[*]Fanout: ⽣产者发送的消息会被复制到该互换机的所有队列中.
[*]Topic: 绑定队列到互换机上时, 指定⼀个字符串为 bindingKey. 发送消息指定⼀个字符串为 routingKey. 当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列.
持久化
Exchange, Queue, Binding, Message都有在硬盘和数据库中实现持久化的需求,当步伐重启 / 主机重启, 保证上述内容不丢失。
三、具体实现
首先实现 Broker 模块的焦点类,也就是 Exchange, Queue, Binding, Message 等焦点
焦点类:
创建 Exchange 类
public class Exchange {
// 此处使用 name 来作为交换机的身份标识(唯一的)
private String name;
// 交换机类型, DIRECT, FANOUT, TOPIC
private ExchangeType type = ExchangeType.DIRECT;
// 该交换机是否需要持久化存储, true 代表需要持久化, false 表示不需要持久化
private boolean durable = false;
// 如果当前交换机, 没人使用了, 就会自动被删除. 这个属性暂时先列在这里, 后续的代码中并没有真的实现这个自动删除功能~~ (RabbitMQ 是有的)
private boolean autoDelete = false;
// arguments 表示的是创建交换机时指定的一些额外的参数选项.
private Map<String, Object> arguments = new HashMap<>();// 为了把这个 arguments 存到数据库中, 就需要把 Map 转成 json 格式的字符串.
}
互换机的范例
public enum ExchangeType {
DIRECT(0), // 直接交换机
FANOUT(1), // 扇出交换机
TOPIC(2); // 主体交换机
private final int type;
private ExchangeType(int type){
this.type = type;
}
public int getType(){
return type;
}
} 创建 Queue 类
public class MSGQueue {
// 表示队列的身份标识
private String name;
// 表示队列是否持久化, true 表示需要持久化, false表示不需要持久化
private boolean durable = false;
// 这个属性为 true, 表示这个队列只能被一个消费者使用(别人用不了). 如果为 false 则是大家都能使用
private boolean exclusive = false;
// 为 true 表示没有人使用之后, 就自动删除. false 则是不会自动删除.
private boolean autoDelete = false;
// 为了把这个 arguments 存到数据库中,就需要把 Map 转成 json 格式的字符串.
private Map<String , Object> arguments = new HashMap<>();
// 当前队列都有哪些消费者订阅了.
private List<ConsumerEnv> consumerEnvList = new ArrayList<>();
// 记录当前取到了第几个消费者. 方便实现轮询策略.
private AtomicInteger consumerSeq = new AtomicInteger(0);
}
创建 Binding 类
public class Binding {
private String exchangeName;
private String queueName;
// 主题交换机的 key
private String bindingKey;
} 创建 Message 类
public class Message implements Serializable {
// 核心
private BasicProperties basicProperties = new BasicProperties(); // 消息的属性 集合成对象
private byte[] body; // 消息要传递的 数据
// 辅助用的属性
private transient long offsetBeg = 0; // 消息数据的 开头 距离文件开头的位置偏移(字节) / transient : 不可被序列化
private transient long offsetEnd = 0; // 消息数据的 结尾 距离文件开头的位置偏移(字节) / transient : 不可被序列化
// 该属性表示该消息在文件中是否有效。(逻辑删除)
private byte isValid = 0x1; // 0x1 表示有效, 0x0 表示无效
// 使用工厂方法 封装创建 Message 对象的过程,
public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body){
Message message = new Message();
if (basicProperties != null){
message.setBasicProperties(basicProperties);
}
// 此处生成的 MessageId 以 M- 作为前缀.
message.setMessageId("M-" + UUID.randomUUID()); // 这个方法中创建的 Message 对象, 会自动生成唯一的 MessageId
message.setRoutingKey(routingKey); //routingKey 和 basicProperties 里的 routingKey 冲突, 以外面的为主.
message.setBody(body);
// 此处是把 body 和 basicProperties 先设置出来. 他俩是 Message 的核心内容.
// 而 offsetBeg, offsetEnd, isValid, 则是消息持久化的时候才会用到. 在把消息写入文件之前再进行设定.
// 此处只是在内存中创建一个 Message 对象.
return message;
} 硬盘管理
硬盘管理主要是为了实现持久化,保证在服务器关机,重启之后重要的消息不会丢失
数据库管理
数据库主要是用来管理,互换机,队列,绑定的持久化操作的,因为这些是体系的焦点功能必要频繁的增删查操作,利用 SQLite 数据库 + MyBatis 框架可以更好管理这些数据。
互换机存储
private void createDefaultData() {
Exchange exchange = new Exchange();
exchange.setName("");
exchange.setType(ExchangeType.DIRECT);
exchange.setDurable(true);
exchange.setAutoDelete(false);
metaMapper.insertExchange(exchange);
System.out.println(" 创建初始数据完成!");
}
// exchangeTable 的查找、插入和删除
publicList<Exchange> selectAllexchanges(){
return metaMapper.selectAllexchanges();
}
public void insertExchange(Exchange exchange){
metaMapper.insertExchange(exchange);
}
public void deleteExchange(String exchangeName){
metaMapper.deleteExchange(exchangeName);
}
队列存储
// QueueTable 的查找、插入和删除
publicvoid insertQueue(MSGQueue queue){
metaMapper.insertQueue(queue);
}
public void deleteQueue(String queueName){
metaMapper.deleteQueue(queueName);
}
public List<MSGQueue> selectAllQueues(){
return metaMapper.selectAllQueues();
} 绑定存储
// BindingTable 的查找、插入和删除
public void insertBinding(Binding binding){
metaMapper.insertBinding(binding);
}
public void deleteBinding(Binding binding){
metaMapper.deleteBinding(binding);
}
public List<Binding> selectAllBindings(){
return metaMapper.selectAllBindings();
} 文件管理
消息必要在硬盘上存储. 但是并不直接放到数据库中, ⽽是直接使⽤⽂件存储。
缘故原由如下:
[*]对于消息的操作并不必要复杂的 增编削查 .
[*]对于⽂件的操作效率⽐数据库会⾼很多
给每个队列分配⼀个⽬录. ⽬录的名字为 data + 队列名. 形如 ./data/testQueue 该⽬录中包含两个固定名字的⽂件
[*]queue_data.txt 消息数据⽂件, ⽤来保存消息内容.
[*] queue_stat.txt 消息统计⽂件, ⽤来保存消息统计信息.
queue_data.txt ⽂件格式: 使⽤⼆进制⽅式存储. 每个消息分成两个部分:
[*] 前四个字节, 表⽰ Message 对象的⻓度(字节数)
[*]后⾯若⼲字节, 表⽰ Message 内容.
[*]消息和消息之间⾸尾相连.
每个 Message 基于 Java 尺度库的 ObjectInputStream / ObjectOutputStream 序列化
Message 对象中的 offsetBeg 和 offsetEnd 正是⽤来描述每个消息体所在的位置.
https://i-blog.csdnimg.cn/direct/97d109f304144add907801de18658395.png
queue_stat.txt ⽂件格式: 使⽤⽂本⽅式存储. ⽂件中只包含⼀⾏, ⾥⾯包含两列(都是整数), 使⽤ \t 分割. 第⼀列表⽰当前总的消息数⽬. 第⼆列表⽰有效消息数⽬. 比方:
2000\t1500
消息内容存储
// 这个方法用来把一个新 消息(Message)放到对应的队列中去
public void sendMessage(@NotNull MSGQueue queue, Message message) throws MessageQueueException, IOException {
// 检查当前要写入的队列对应的文件是否存在
if (!(checkQueueAndFileExits(queue.getName()))){
throw new MessageQueueException(" 队列对应的文件不存在! queueName = " + queue.getName());
}
// 把 Message 对象,进行序列化,转成二进制的字节数组
byte[] messageBinary = BinaryTool.toBytes(message);
synchronized (queue) { // 通过对 同一个队列加锁, 保证在读写文件多线程操作时,数据的一致性和准确性
// 获取到当前的队列数据文件的长度,计算出该 Message 对象的 offsetBeg 和 offsetEnd
// 通过方法 queueDataFile.length() 就能获取到文件的长度. 单位字节.
File queueDataFile = new File(getQueueDataPath(queue.getName())); // 获取到 队列的数据文件
message.setOffsetBeg(queueDataFile.length() + 4); // 把新的 Message 数据, 写入到队列数据文件的末尾. 此时 Message 对象的 offsetBeg , 就是当前文件长度 + 4
message.setOffsetEnd(queueDataFile.length() + messageBinary.length + 4); // offsetEnd 就是当前文件长度 + 4 + message 自身长度.
// 写入消息到数据文件中,是追加到数据文件末尾
try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {
try (DataOutputStream dataOutputStream =new DataOutputStream(outputStream)){
// 接下来要先写当前消息的长度, 占据 4 个字节的~~
dataOutputStream.writeInt(messageBinary.length);
// 写入消息本体
dataOutputStream.write(messageBinary);
}
}
// 更新消息统计文件
Stat stat = readStat(queue.getName());
stat.totalCount += 1;
stat.validCount += 1;
writeStat(queue.getName(), stat);
}
} 消息统计信息存储
private Stat readStat(String queueName){
Stat stat = new Stat();
try(InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))){
Scanner scanner = new Scanner(inputStream); // 因当前的消息统计文件是文本文件,所以使用 Scanner 来读取文件内容
stat.totalCount = scanner.nextInt();
stat.validCount = scanner.nextInt();
return stat;
}catch (IOException e) {
e.printStackTrace();
}
return null;
}
private void writeStat(String queueName, Stat stat){
//OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的.
try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))){
// 使用 PrintWriter 写文件
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.write(stat.totalCount + "\t" + stat.validCount);
printWriter.flush(); // 确保消息及时刷新到内存中去
} catch (IOException e) {
e.printStackTrace();
}
} 垃圾采取
因为信息的删除只是把消息在⽂件上标志成了⽆效. 并没有腾出硬盘空间,终极⽂件⼤⼩大概会越积越 多. 因此必要定期的进⾏批量扫除.
此处使⽤类似于复制算法. 当总消息数凌驾 2000, 并且有效消息数⽬少于 50% 的时候, 就触发 GC. GC 的时候会把所有有效消息加载出来, 写⼊到⼀个新的消息⽂件中, 使⽤新⽂件, 取代旧⽂件即可.
// 这个方法,使用复制算法来完成. 真正执行消息数据文件的垃圾回收操作.
public void gc(MSGQueue queue) throws MessageQueueException, IOException, ClassNotFoundException {
// 进行 gc 的时候, 是针对消息数据文件进行大洗牌. 在这个过程中, 其他线程不能针对该队列的消息文件做任何修改.
synchronized (queue){
// 由于 gc 操作可能比较耗时, 此处统计一下执行消耗的时间. gc 执行开始的时间
long gcBeg = System.currentTimeMillis();
// 创建一个新的文件, 名字就是 queue_data_new.txt
File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));
if (queueDataNewFile.exists()){
// 正常情况下, 这个文件不应该存在. 如果存在, 就是意外~~ 说明上次 gc 了一半, 程序意外崩溃了.
throw new MessageQueueException(" gc 时发现该队列的 queue_data_new 已经存在! queueName = " + queue.getName());
}
boolean creatOK = queueDataNewFile.createNewFile();
if (!creatOK){
thrownew MessageQueueException(" 创建文件失败! queueDataNewFile = " + queueDataNewFile.getAbsolutePath());
}
// 从旧的文件中, 读取出所有的有效消息对象了
LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());
// 把有效消息, 写入到新的文件中.
try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)){
try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
for (Message message : messages){
byte[] buffer = BinaryTool.toBytes(message);
// 先写四个字节消息的长度
dataOutputStream.writeInt(buffer.length);
dataOutputStream.write(buffer);
}
}
}
// 删除旧的数据文件, 并且把新的文件进行重命名
File queueDataOldFile = new File(getQueueDataPath(queue.getName()));
boolean deleteOK = queueDataOldFile.delete();
if (!deleteOK) {
thrownew MessageQueueException(" gc删除数据旧文件失败! queueDataOldFile = " +queueDataOldFile.getAbsolutePath());
}
// 把 queue_data_new.txt => queue_data.txt
boolean renameOK = queueDataNewFile.renameTo(queueDataOldFile);
if (!renameOK){
thrownew MessageQueueException(" gc重命名信数据文件失败! queueDataNewFile = " +queueDataNewFile.getAbsolutePath() + "queueDataOldFile = " + queueDataOldFile.getAbsolutePath());
}
// 更新统计文件
Stat stat = readStat(queue.getName());
stat.totalCount = messages.size();
stat.validCount = messages.size();
writeStat(queue.getName(), stat);
// gc 执行结束的时间
long gcEnd = System.currentTimeMillis();
// gc 耗时多久
System.out.println(" gc 执行完毕! queueName = " + queue.getName() + ", time = "
+ (gcEnd - gcBeg) + "ms");
}
} 内存管理
硬盘上存储数据, 只是为了实现 "持久化" 这样的效果. 但是实际的消息存储/转发, 还是主要靠内存的结 构. 对于 消息队列来说, 内存部分是更关键的, 内存速度更快, 可以达成更⾼的并发.
互换机管理
// 交换机 操作 Exchange
public void insertExchange(Exchange exchange){
exchangeMap.put(exchange.getName(), exchange);
System.out.println(" 新交换机添加成功! exchangeName = " + exchange.getName());
}
public Exchange getExchange(String exchangeName){
return exchangeMap.get(exchangeName);
}
public void deleteExchange(String exchangeName){
exchangeMap.remove(exchangeName);
System.out.println(" 交换机删除成功! exchangeName = " + exchangeName);
}
队列管理
// 队列 操作 MSGQueue
public void insertQueue(MSGQueue queue){
queueMap.put(queue.getName(), queue);
System.out.println(" 新队列添加成功! queueName = " + queue.getName());
}
public MSGQueue getQueue(String queueName){
return queueMap.get(queueName);
}
public void deleteQueue(String queueName){
queueMap.remove(queueName);
System.out.println(" 队列删除成功! queueName = " + queueName);
} 绑定管理
// 交换机和队列的绑定关系 操作, Binding
public void insertBinding(Binding binding) throws MessageQueueException {
ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), k -> new ConcurrentHashMap<>());
synchronized (bindingMap) {
// 再根据 queueName 查一下. 如果已经存在, 就抛出异常. 不存在才能插入.
if (bindingMap.get(binding.getQueueName()) != null) {
throw new MessageQueueException(" 绑定已经存在! exchangeName = " + binding.getExchangeName() +
", queueName = " + binding.getQueueName());
}
bindingMap.put(binding.getQueueName(), binding);
System.out.println(" 新绑定添加成功! exchangeName = " + binding.getExchangeName()
+ ", queueName = " + binding.getQueueName());
}
}
// 获取绑定
// 根据 exchangeName 和 queueName 确定唯一一个 Binding
public Binding getBinding(String exchangeName, String queueName){
ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);
if (bindingMap == null){
return null;
}
return bindingMap.get(queueName);
}
// 根据 exchangeName 获取到所有的 Binding
public ConcurrentHashMap<String, Binding> getBindings(String exchangeName) {
return bindingsMap.get(exchangeName);
}
public void deleteBinding(Binding binding) throws MessageQueueException {
ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
if (bindingMap == null){
throw new MessageQueueException(" 绑定不存在! exchangeName = " + binding.getExchangeName()
+ ", queueName = " + binding.getQueueName());
}
bindingMap.remove(binding.getQueueName());
System.out.println(" 绑定删除成功! exchangeName = " + binding.getExchangeName()
+ ", queueName = " + binding.getQueueName());
} 消息管理
// 添加消息
public void addMessage(Message message) {
messageMap.put(message.getMessageId(), message);
System.out.println(" 新消息添加成功! messageId = " + message.getMessageId());
}
// 根据 id 查询消息
public Message getMessageById(String messageId) {
returnmessageMap.get(messageId);
}
// 根据 id 删除消息
public void removeMessageById(String messageId) {
messageMap.remove(messageId);
System.out.println(" 消息被移除! messageId = " + messageId);
}
// 发送消息到指定队列
public void sendMessageToQueue(MSGQueue queue, Message message) { // 把消息放到对应的队列数据结构中.
LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), k -> new LinkedList<>());
// 再把数据加到 messages 里面
synchronized (message) {
messages.add(message);
// 把该消息也往消息中心中 存入一份
addMessage(message);
}
System.out.println(" 消息被投递到队列中! messageId = " + message.getMessageId());
}
// 从队列中取消息
public Message pollMessageFromQueue(String queueName) {
// 根据队列名, 查找一下, 对应的队列的消息链表.
LinkedList<Message> messages = queueMessageMap.get(queueName);
if (messages == null) {
return null;
}
synchronized (messages) {
if (messages.size() == 0) {
// 如果没找到, 说明队列中没有任何消息.
return null;
}
// 链表中有元素, 就进行头删
Message currentMessage = messages.remove();
System.out.println(" 消息从队列中取出! messageId = " + currentMessage.getMessageId());
return currentMessage;
}
} 待确认消息管理
// 添加未确认的消息
public void addMessageWaitAck(String queueName, Message message) {
// 从通过 队列名来在哈希表中查询 未确认的消息哈希表,如果没有 则创建一个
ConcurrentHashMap<String, Message> waitAckMessages= queueMessageWaitAckMap.computeIfAbsent(queueName, k -> new ConcurrentHashMap<>());
// 添加未确认的消息
waitAckMessages.put(message.getMessageId(), message);
System.out.println(" 消息进入待确认队列! messageId = " + message.getMessageId());
}
// 删除未确认的消息(消息已经确认了)
public void removeMessageWaitAck(String queueName, String messageId) {
ConcurrentHashMap<String, Message> waitAckMessages = queueMessageWaitAckMap.get(queueName);
if (waitAckMessages == null){
return;
}
waitAckMessages.remove(messageId);
System.out.println(" 消息从待确认队列删除! messageId = " + messageId);
}
// 获取指定的未确认的消息
public Message getMessageWaitAck(String queueName, String messageId) {
ConcurrentHashMap<String, Message> waitAckMessages = queueMessageWaitAckMap.get(queueName);
if (waitAckMessages == null) {
return null;
}
return waitAckMessages.get(messageId);
} 消息转发
// 发送消息到指定的交换机/队列中.
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {
// 转换交换机的名字
exchangeName = virtualHostName + exchangeName;
try {
// 检查 routingKey 是否合法.
if (!router.checkRoutingKey(routingKey)) {
throw new MessageQueueException(" routingKey 非法! routingKey=" + routingKey);
}
// 查找交换机对象
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
if (exchange == null) {
throw new MessageQueueException(" 交换机不存在! exchangeName=" + exchangeName);
}
// 判定交换机的类型
if (exchange.getType() == ExchangeType.DIRECT) {
// 按照直接交换机的方式来转发消息
// 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.
// 此时, 可以无视绑定关系.
String queueName = virtualHostName + routingKey;
// 构造消息对象
Message message = Message.createMessageWithId(routingKey, basicProperties, body);
// 查找该队列名对应的对象
MSGQueue queue = memoryDataCenter.getQueue(queueName);
if (queue == null) {
throw new MessageQueueException(" 队列不存在! queueName=" + queueName);
}
// 队列存在, 直接给队列中写入消息
sendMessage(queue, message);
} else {
// 按照 fanout 和 topic 的方式来转发.
// 找到该交换机关联的所有绑定, 并遍历这些绑定对象
ConcurrentHashMap<String, Binding> bindings = memoryDataCenter.getBindings(exchangeName);
for (Map.Entry<String, Binding> entry : bindings.entrySet()) {
// 获取到绑定对象, 判定对应的队列是否存在
Binding binding = entry.getValue();
MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());
if (queue == null) {
// 此处咱们就不抛出异常了. 可能此处有多个这样的队列.
// 希望不要因为一个队列的失败, 影响到其他队列的消息的传输.
System.out.println(" basicPublish 发送消息时, 发现队列不存在! queueName=" + binding.getQueueName());
continue;
}
// 构造消息对象
Message message = Message.createMessageWithId(routingKey, basicProperties, body);
// 判定这个消息是否能转发给该队列.
// 如果是 fanout, 所有绑定的队列都要转发的.
// 如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配.
if (!router.route(exchange.getType(), binding, message)) {
continue;
}
// 真正转发消息给队列
sendMessage(queue, message);
}
}
return true;
} catch (Exception e) {
System.out.println(" 消息发送失败!");
e.printStackTrace();
return false;
}
} 服务器详细全部代码
客户端模块
生产者
生产者的demo
消耗者
消耗者的demo
公共模块
序列化和反序列化
主要是用来传输信息,因为我们进行网路通讯时是接纳的TCP 协议,以是在传输过程中,必要把数据转换成二进制代码
public class BinaryTool {
// 用来验证代码序列化版本的 如果序列化后的文件想反序列化时,和文件中的不对 则不能反序列化,直接报错
private static final Long serialVersionUID = 1L; // 改动了序列化对象的代码 这个 序列化版本也需要改动
// 把一个对象序列化成一个字节数组
public static byte[] toBytes(Object object) throws IOException {
// ByteArrayOutputStream 这个流对象相当于一个变长的字节数组.
// 就可以把 object 序列化的数据给逐渐的写入到 byteArrayOutputStream 中, 再统一转成 byte[]
try (ByteArrayOutputStreambyteArrayOutputStream = new ByteArrayOutputStream()){
try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
// 此处的 writeObject 就会把该对象进行序列化, 生成的二进制字节数据, 就会写入到
// ObjectOutputStream 中.
// 由于 ObjectOutputStream 又是关联到了 ByteArrayOutputStream, 最终结果就写入到 ByteArrayOutputStream 里了
objectOutputStream.writeObject(object);
}
return byteArrayOutputStream.toByteArray();
}
}
// 把一个字节数组, 反序列化成一个对象
public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {
Object object = null;
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {
try (ObjectInputStream objectInputStream =new ObjectInputStream(byteArrayInputStream)){
// 此处的 readObject, 就是从 data 这个 byte[] 中读取数据并进行反序列化.
object = objectInputStream.readObject();
}
}
return object;
}
}
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]