目次
一、引言
二、设计
三、代码
1.Request
2.Response
3.BasicArguments
4.BasicReturns
四、方法类
1.创建交换机
2.删除交换机
3.创建队列
4.删除队列
5.创建绑定
6.删除绑定
7.消息发布
8.消费消息
9.会合返回
五、实现Broker Server类
六、实现连接
1.connectionFactory类
2.connection类
3.channel类
七、总结
一、引言
本篇文章就介绍一下本次项目的末了一个大的部分,网络通讯协议的设计。
二、设计
生产者和消费者都是客户端,都需要通过网络和Broker Server举行通讯。
此处使用TCP协议,来作为通讯的底层协议,同时在这个基础上自定义应用层协议,完成客户端对服务器这边功能的远程调用。
三、代码
1.Request
- public class Request {
- private int type;
- private int length;
- private byte[] payload;
- public int getType() {
- return type;
- }
- public void setType(int type) {
- this.type = type;
- }
- public int getLength() {
- return length;
- }
- public void setLength(int length) {
- this.length = length;
- }
- public byte[] getPayload() {
- return payload;
- }
- public void setPayload(byte[] payload) {
- this.payload = payload;
- }
- }
复制代码 2.Response
- public class Response {
- private int type;
- private int length;
- private byte[] payload;
- public int getType() {
- return type;
- }
- public void setType(int type) {
- this.type = type;
- }
- public int getLength() {
- return length;
- }
- public void setLength(int length) {
- this.length = length;
- }
- public byte[] getPayload() {
- return payload;
- }
- public void setPayload(byte[] payload) {
- this.payload = payload;
- }
- }
复制代码 3.BasicArguments
- /*
- Request的payload
- */
- public class BasicArguments implements Serializable {
- protected String rid;
- protected String channelId;
- public String getRid() {
- return rid;
- }
- public void setRid(String rid) {
- this.rid = rid;
- }
- public String getChannelId() {
- return channelId;
- }
- public void setChannelId(String channelId) {
- this.channelId = channelId;
- }
- }
复制代码 4.BasicReturns
- /*
- Response的payload
- */
- public class BasicReturns implements Serializable {
- protected String rid;
- protected String channelId;
- protected boolean ok;
- public String getRid() {
- return rid;
- }
- public void setRid(String rid) {
- this.rid = rid;
- }
- public String getChannelId() {
- return channelId;
- }
- public void setChannelId(String channelId) {
- this.channelId = channelId;
- }
- public boolean isOk() {
- return ok;
- }
- public void setOk(boolean ok) {
- this.ok = ok;
- }
- }
复制代码 四、方法类
对于每个VirtualHost提供的方法都要有一个类表现对应的参数
1.创建交换机
- public class ExchangeDeclareArguments extends BasicArguments implements Serializable {
- private String exchangeName;
- private ExchangeType exchangeType;
- private boolean durable;
- private boolean autoDelete;
- private Map<String,Object> arguments;
-
- }
复制代码 2.删除交换机
- public class ExchangeDeleteArguments extends BasicArguments implements Serializable {
- private String exchangeName;
- }
复制代码 3.创建队列
- public class QueueDeclareArguments extends BasicArguments implements Serializable {
- private String queueName;
- private boolean durable;
- private boolean exclusive;
- private boolean autoDelete;
- private Map<String,Object> arguments;
- }
复制代码 4.删除队列
- public class QueueDeleteArguments extends BasicArguments implements Serializable {
- private String queueName;
- }
复制代码 5.创建绑定
- public class QueueBindArguments extends BasicArguments implements Serializable {
- private String exchangeName;
- private String queueName;
- private String bindingKey;
- }
复制代码 6.删除绑定
- public class QueueUnBindArguments extends BasicArguments implements Serializable {
- private String exchangeName;
- private String queueName;
- }
复制代码 7.消息发布
- public class BasicPublishArguments extends BasicArguments implements Serializable {
- private String exchangeName;
- private String routingKey;
- private BasicProperties basicProperties;
- private byte[] body;
- }
复制代码 8.消费消息
- public class BasicConsumeArguments {
- private String consumerTag;
- private String queueName;
- private boolean autoAck;
- }
复制代码 9.会合返回
- public class SubScribeReturns extends BasicArguments implements Serializable {
- private String consumerTag;
- private BasicProperties basicProperties;
- private byte[] body;
- }
复制代码 五、实现Broker Server类
- public class BrokerServer {
- private ServerSocket serverSocket = null;
- private VirtualHost virtualHost = new VirtualHost("default");
- private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<String,Socket>();
- private ExecutorService executorService = null;
- private volatile boolean runnbale = true;
- public BrokerServer(int port) throws IOException {
- serverSocket = new ServerSocket(port);
- }
- public void start() throws IOException {
- System.out.println("[BrokerServer] 启动!");
- executorService = Executors.newCachedThreadPool();
- try{
- while (runnbale){
- Socket clientSocket = serverSocket.accept();
- executorService.submit(() ->{
- processConnection(clientSocket);
- });
- }
- }catch (SocketException e){
- System.out.println("[BrokerServer]服务器停止运行!");
- }
- }
- public void stop() throws IOException {
- runnbale = false;
- executorService.shutdownNow();
- serverSocket.close();
- }
- private void processConnection(Socket clientSocket){
- try(InputStream inputStream = clientSocket.getInputStream(); OutputStream outputStream = clientSocket.getOutputStream()){
- try(DataInputStream dataInputStream = new DataInputStream(inputStream)
- ; DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
- while (true){
- Request request = readRequest(dataInputStream);
- Response response = process(request,clientSocket);
- writeResponse(dataOutputStream,response);
- }
- }catch (EOFException | SocketException e){
- System.out.println("[BrokerServer] connection关闭!客户端地址:"+clientSocket.getInetAddress().toString()
- +"端口号:"+clientSocket.getPort());
- }
- }catch (Exception e){
- System.out.println("[BrokerServer] connection连接出现异常!");
- e.printStackTrace();
- }finally {
- try{
- clientSocket.close();
- clearClosedSession(clientSocket);
- }catch (IOException e){
- e.printStackTrace();
- }
- }
- }
- private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
- // 1.把request中的payload做初步解析
- BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());
- System.out.println("[Request] rid="+basicArguments.getRid()+",channelId=" +basicArguments.getChannelId()+",type="
- +request.getType()+",length="+request.getLength());
- // 2.根据type的值区分要做什么操作
- boolean ok = true;
- if(request.getType()==0x1){
- // 创建channel
- sessions.put(basicArguments.getChannelId(),clientSocket);
- System.out.println("[BrokerServer]创建channel完成!channelId="+basicArguments.getChannelId());
- } else if (request.getType()==0x2) {
- // 销毁channel
- sessions.remove(basicArguments.getChannelId());
- System.out.println("[BrokerServer]销毁channel完成!channelId="+basicArguments.getChannelId());
- } else if (request.getType()==0x3) {
- // 创建交换机
- ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;
- ok = virtualHost.exchangeDeclare(arguments.getExchangeName(),arguments.getExchangeType()
- ,arguments.isDurable(),arguments.isAutoDelete(),arguments.getArguments());
- } else if (request.getType()==0x4) {
- ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;
- ok = virtualHost.exchangeDelete(arguments.getExchangeName());
- } else if (request.getType()==0x5) {
- QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;
- ok = virtualHost.queueDeclare(arguments.getQueueName(),arguments.isDurable(),arguments.isExclusive()
- ,arguments.isAutoDelete(),arguments.getArguments());
- } else if (request.getType()==0x6) {
- QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;
- ok = virtualHost.queueDelete(arguments.getQueueName());
- } else if (request.getType()==0x7) {
- QueueBindArguments arguments = (QueueBindArguments) basicArguments;
- ok = virtualHost.queueBind(arguments.getExchangeName(),arguments.getQueueName(),arguments.getBindingKey());
- } else if (request.getType()==0x8) {
- QueueUnBindArguments arguments = (QueueUnBindArguments) basicArguments;
- ok = virtualHost.queueUnBind(arguments.getExchangeName(),arguments.getQueueName());
- } else if (request.getType()==0x9) {
- BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;
- ok = virtualHost.basicPublish(arguments.getExchangeName(),arguments.getRoutingKey()
- ,arguments.getBasicProperties(),arguments.getBody());
- }else if (request.getType()==0xa){
- BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;
- ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(), new Consumer() {
- @Override
- // 回调函数:把服务器收到的消息直接推送回对应的消费者客户端
- public void handleDelivery(String consumeTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
- // 根据consumeTag 其实是channelId 去sessions中查询,找到对应的socket对象
- // 1.根据channelId找到socket对象
- Socket clientSocket = sessions.get(consumeTag);
- if(clientSocket==null || clientSocket.isClosed()){
- throw new MqException("[BrokerServer]订阅消息的客户端已经关闭!");
- }
- // 2.构造响应数据
- SubScribeReturns subScribeReturns = new SubScribeReturns();
- subScribeReturns.setChannelId(consumeTag);
- subScribeReturns.setRid(""); // 此处只有响应,没有请求,不需要去对应
- subScribeReturns.setOk(true);
- subScribeReturns.setConsumerTag(consumeTag);
- subScribeReturns.setBasicProperties(basicProperties);
- subScribeReturns.setBody(body);
- byte[] payload = BinaryTool.toBytes(subScribeReturns);
- Response response = new Response();
- // 0xc 表示服务器给消费者客户端推送的消息数据
- response.setType(0xc);
- response.setLength(payload.length);
- response.setPayload(payload);
- // 3.把数据写回客户端
- DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());
- writeResponse(dataOutputStream,response);
- }
- });
- } else if (request.getType()==0xb) {
- // 调用basicAck来确认消息
- BasicAckArguments arguments = (BasicAckArguments) basicArguments;
- ok = virtualHost.basicAck(arguments.getQueueName(),arguments.getMessageId());
- }else {
- // 当前的type是非法的
- throw new MqException("[BrokerServer] 未知的type!type="+request.getType());
- }
- // 3.构造响应
- BasicReturns basicReturns = new BasicReturns();
- basicReturns.setChannelId(basicArguments.getChannelId());
- basicReturns.setRid(basicArguments.getRid());
- basicReturns.setOk(ok);
- byte[] payload = BinaryTool.toBytes(basicReturns);
- Response response = new Response();
- response.setType(request.getType());
- response.setLength(payload.length);
- response.setPayload(payload);
- System.out.println("[Response] rid="+basicReturns.getRid()+",channelId="+basicReturns.getChannelId()
- +",type="+response.getType()+",length="+response.getLength());
- return response;
- }
- private void writeResponse(DataOutputStream dataOutputStream,Response response) throws IOException {
- dataOutputStream.writeInt(response.getType());
- dataOutputStream.writeInt(response.getLength());
- dataOutputStream.write(response.getPayload());
- dataOutputStream.flush();
- }
- private Request readRequest(DataInputStream dataInputStream) throws IOException {
- Request request = new Request();
- request.setType(dataInputStream.readInt());
- request.setLength(dataInputStream.readInt());
- byte[] payload = new byte[request.getLength()];
- int n = request.getLength();
- if(n!=request.getLength()){
- throw new IOException("读取请求格式出错!");
- }
- request.setPayload(payload);
- return request;
- }
- private void clearClosedSession(Socket clientSocket){
- List<String> toDeleteChannelId = new ArrayList<>();
- for(Map.Entry<String,Socket> entry:sessions.entrySet()){
- if(entry.getValue()==clientSocket){
- toDeleteChannelId.add(entry.getKey());
- }
- }
- for(String channelId:toDeleteChannelId){
- sessions.remove(channelId);
- }
- System.out.println("[BrokerServer] 清理session完成!被清理的channelId="+toDeleteChannelId);
- }
- }
复制代码 六、实现连接
1.connectionFactory类
- public class ConnectionFactory {
- private String host;
- private int port;
- public Connection newConnection(){
- Connection connection = new Connection(host,port);
- return connection;
- }
- }
复制代码 2.connection类
- public class Connection {
- private Socket socket =null;
- private ConcurrentHashMap<String,Channel> channelMap = new ConcurrentHashMap<>();
- private InputStream inputStream;
- private OutputStream outputStream;
- private DataInputStream dataInputStream;
- private DataOutputStream dataOutputStream;
- private ExecutorService callbackPool = null;
- public Connection(String host,int port) throws IOException {
- socket = new Socket(host,port);
- inputStream = socket.getInputStream();
- outputStream = socket.getOutputStream();
- dataInputStream = new DataInputStream(inputStream);
- dataOutputStream = new DataOutputStream(outputStream);
- callbackPool = Executors.newFixedThreadPool(4);
- Thread t = new Thread(() ->{
- try {
- while(!socket.isClosed()){
- Response response = readResponse();
- dispatchResponse(response);
- }
- }catch (SocketException e){
- System.out.println("[Connection] 连接正常断开!");
- }catch (IOException | ClassNotFoundException | MqException e){
- System.out.println("[Connection] 连接异常断开!");
- e.printStackTrace();
- }
- });
- }
- public void close(){
- try {
- callbackPool.shutdownNow();
- channelMap.clear();
- dataOutputStream.close();
- dataInputStream.close();
- socket.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {
- if(response.getType()==0xc){
- SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());
- Channel channel = channelMap.get(subScribeReturns.getChannelId());
- if(channel==null){
- throw new MqException("[Connection] 该消息对应的channel在客户端中不存在!channelId="+channel.getChannelId());
- }
- callbackPool.submit(() ->{
- try {
- channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag()
- , subScribeReturns.getBasicProperties(), subScribeReturns.getBody());
- }catch (MqException | IOException e){
- e.printStackTrace();
- }
- });
- }else {
- BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());
- Channel channel = channelMap.get(basicReturns.getChannelId());
- if(channel==null){
- throw new MqException("[Connection] 该消息对应的channel在客户端中不存在!channelId="+channel.getChannelId());
- }
- channel.putReturns(basicReturns);
- }
- }
- public void writeRequest(Request request) throws IOException {
- dataOutputStream.writeInt(request.getType());
- dataOutputStream.writeInt(request.getLength());
- dataOutputStream.write(request.getPayload());
- dataOutputStream.flush();
- System.out.println("[Connection]发送请求!type="+request.getType()+",length="+request.getLength());
- }
- public Response readResponse() throws IOException {
- Response response = new Response();
- response.setType(dataInputStream.readInt());
- response.setLength(dataInputStream.readInt());
- byte[] payload = new byte[response.getLength()];
- int n = dataInputStream.read(payload);
- if(n!=response.getLength()){
- throw new IOException("响应的数据不完整!");
- }
- response.setPayload(payload);
- System.out.println("[Connection] 收到响应!type="+response.getType()+",length="+response.getType());
- return response;
- }
- public Channel createChannel(){
- String channelId = "C-"+ UUID.randomUUID().toString();
- Channel channel = new Channel(channelId,this);
- System.out.println(channelId);
- channelMap.put(channelId,channel);
- boolean ok = channel.createChannel();
- if(!ok){
- channelMap.remove(channelId);
- return null;
- }
- return channel;
- }
- }
复制代码 3.channel类
- public class Channel {
- private String channelId;
- private Connection connection;
- private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
- private Consumer consumer = null;
- public Channel(String channelId,Connection connection){
- this.channelId = channelId;
- this.connection = connection;
- }
- public boolean createChannel() throws IOException {
- BasicArguments basicArguments = new BasicArguments();
- basicArguments.setChannelId(channelId);
- basicArguments.setRid(generateRid());
- System.out.println(basicArguments.getChannelId());
- System.out.println(basicArguments.getRid());
- byte[] payload = BinaryTool.toBytes(basicArguments);
- System.out.println(payload);
- Request request = new Request();
- request.setType(0x1);
- request.setLength(payload.length);
- request.setPayload(payload);
- connection.writeRequest(request);
- BasicReturns basicReturns =waitResult(basicArguments.getRid());
- return basicReturns.isOk();
- }
- private String generateRid(){
- return "R-"+ UUID.randomUUID().toString();
- }
- private BasicReturns waitResult(String rid){
- BasicReturns basicReturns =null;
- while((basicReturns = basicReturnsMap.get(rid))==null){
- synchronized (this){
- try {
- wait();
- }catch (InterruptedException e){
- e.printStackTrace();
- }
- }
- }
- basicReturnsMap.remove(rid);
- return basicReturns;
- }
- public void putReturns(BasicReturns basicReturns){
- basicReturnsMap.put(basicReturns.getRid(),basicReturns);
- synchronized (this){
- notifyAll();
- }
- }
- public boolean close() throws IOException {
- BasicArguments basicArguments = new BasicArguments();
- basicArguments.setRid(generateRid());
- basicArguments.setChannelId(channelId);
- byte[] payload = BinaryTool.toBytes(basicArguments);
- Request request = new Request();
- request.setType(0x2);
- request.setLength(payload.length);
- request.setPayload(payload);
- connection.writeRequest(request);
- BasicReturns basicReturns = waitResult(basicArguments.getRid());
- return basicReturns.isOk();
- }
- public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType
- , boolean durable, boolean autoDelete, Map<String,Object> arguments) throws IOException {
- ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();
- exchangeDeclareArguments.setRid(generateRid());
- exchangeDeclareArguments.setChannelId(channelId);
- exchangeDeclareArguments.setExchangeName(exchangeName);
- exchangeDeclareArguments.setExchangeType(exchangeType);
- exchangeDeclareArguments.setDurable(durable);
- exchangeDeclareArguments.setAutoDelete(autoDelete);
- exchangeDeclareArguments.setArguments(arguments);
- byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);
- Request request = new Request();
- request.setType(0x3);
- request.setLength(payload.length);
- request.setPayload(payload);
- connection.writeRequest(request);
- BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());
- return basicReturns.isOk();
- }
- public boolean exchangeDelete(String exchangeName) throws IOException {
- ExchangeDeleteArguments exchangeDeleteArguments = new ExchangeDeleteArguments();
- exchangeDeleteArguments.setRid(generateRid());
- exchangeDeleteArguments.setChannelId(channelId);
- exchangeDeleteArguments.setExchangeName(exchangeName);
- byte[] payload = BinaryTool.toBytes(exchangeDeleteArguments);
- Request request = new Request();
- request.setType(0x4);
- request.setLength(payload.length);
- request.setPayload(payload);
- connection.writeRequest(request);
- BasicReturns basicReturns =waitResult(exchangeDeleteArguments.getRid());
- return basicReturns.isOk();
- }
- public boolean queueDeclare(String queueName,boolean durable,boolean exclusive,boolean autoDelete
- ,Map<String,Object> arguments) throws IOException {
- QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();
- queueDeclareArguments.setRid(generateRid());
- queueDeclareArguments.setChannelId(channelId);
- queueDeclareArguments.setQueueName(queueName);
- queueDeclareArguments.setDurable(durable);
- queueDeclareArguments.setExclusive(exclusive);
- queueDeclareArguments.setExclusive(autoDelete);
- queueDeclareArguments.setArguments(arguments);
- byte[] payload= BinaryTool.toBytes(queueDeclareArguments);
- Request request = new Request();
- request.setType(0x5);
- request.setLength(payload.length);
- request.setPayload(payload);
- connection.writeRequest(request);
- BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());
- return basicReturns.isOk();
- }
- public boolean queueDelete(String queueName) throws IOException {
- QueueDeleteArguments queueDeleteArguments = new QueueDeleteArguments();
- queueDeleteArguments.setRid(generateRid());
- queueDeleteArguments.setChannelId(channelId);
- queueDeleteArguments.setQueueName(queueName);
- byte[] payload = BinaryTool.toBytes(queueDeleteArguments);
- Request request = new Request();
- request.setType(0x6);
- request.setLength(payload.length);
- request.setPayload(payload);
- connection.writeRequest(request);
- BasicReturns basicReturns = waitResult(queueDeleteArguments.getRid());
- return basicReturns.isOk();
- }
- public boolean queueBind(String exchangeName,String queueName,String bindingKey) throws IOException {
- QueueBindArguments queueBindArguments = new QueueBindArguments();
- queueBindArguments.setRid(generateRid());
- queueBindArguments.setChannelId(channelId);
- queueBindArguments.setExchangeName(exchangeName);
- queueBindArguments.setQueueName(queueName);
- queueBindArguments.setBindingKey(bindingKey);
- byte[] payload = BinaryTool.toBytes(queueBindArguments);
- Request request = new Request();
- request.setType(0x7);
- request.setLength(payload.length);
- request.setPayload(payload);
- connection.writeRequest(request);
- BasicReturns basicReturns =waitResult(queueBindArguments.getRid());
- return basicReturns.isOk();
- }
- public boolean queueUnBind(String exchangeName,String queueName) throws IOException {
- QueueUnBindArguments queueUnBindArguments = new QueueUnBindArguments();
- queueUnBindArguments.setRid(generateRid());
- queueUnBindArguments.setChannelId(channelId);
- queueUnBindArguments.setExchangeName(exchangeName);
- queueUnBindArguments.setQueueName(queueName);
- byte[] payload = BinaryTool.toBytes(queueUnBindArguments);
- Request request = new Request();
- request.setType(0x8);
- request.setLength(payload.length);
- request.setPayload(payload);
- connection.writeRequest(request);
- BasicReturns basicReturns = waitResult(queueUnBindArguments.getRid());
- return basicReturns.isOk();
- }
- public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties,byte[] body) throws IOException {
- BasicPublishArguments basicPublishArguments = new BasicPublishArguments();
- basicPublishArguments.setRid(generateRid());
- basicPublishArguments.setChannelId(channelId);
- basicPublishArguments.setRoutingKey(routingKey);
- basicPublishArguments.setBasicProperties(basicProperties);
- basicPublishArguments.setBody(body);
- byte[] payload = BinaryTool.toBytes(basicPublishArguments);
- Request request = new Request();
- request.setType(0x9);
- request.setLength(payload.length);
- request.setPayload(payload);
- connection.writeRequest(request);
- BasicReturns basicReturns = waitResult(basicPublishArguments.getRid());
- return basicReturns.isOk();
- }
- public boolean basicConsume(String queueName,boolean autoAck,Consumer consumer) throws MqException, IOException {
- if(this.consumer!=null){
- throw new MqException("[Channel] 已经设置过回调函数了!不能重复设置!");
- }
- this.consumer = consumer;
- BasicConsumeArguments basicConsumeArguments = new BasicConsumeArguments();
- basicConsumeArguments.setRid(generateRid());
- basicConsumeArguments.setChannelId(channelId);
- basicConsumeArguments.setConsumerTag(channelId);
- basicConsumeArguments.setQueueName(queueName);
- basicConsumeArguments.setAutoAck(autoAck);
- byte[] payload = BinaryTool.toBytes(basicConsumeArguments);
- Request request = new Request();
- request.setType(0xa);
- request.setLength(payload.length);
- request.setPayload(payload);
- connection.writeRequest(request);
- BasicReturns basicReturns = waitResult(basicConsumeArguments.getRid());
- return basicReturns.isOk();
- }
- public boolean basicAck(String queueName,String messageId) throws IOException {
- BasicAckArguments basicAckArguments = new BasicAckArguments();
- basicAckArguments.setRid(generateRid());
- basicAckArguments.setChannelId(channelId);
- basicAckArguments.setQueueName(queueName);
- basicAckArguments.setMessageId(messageId);
- byte[] payload = BinaryTool.toBytes(basicAckArguments);
- Request request = new Request();
- request.setType(0xb);
- request.setLength(payload.length);
- request.setPayload(payload);
- connection.writeRequest(request);
- BasicReturns basicReturns = waitResult(basicAckArguments.getRid());
- return basicReturns.isOk();
- }
- }
复制代码 七、总结
本篇文章就是本次Java项目“模拟消息队列”的末了一个大的部分了,下一篇文章就是对所写的这个项目举行一个案例编写,然后对此项目举行扩展。感谢观看!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |