快速入门:实现 RabbitMQ 简单队列:
- 在 RabbitMQ 平台创建 Virtual Hosts 和一个队列
- /boyaVirtualHosts
- 导入依赖:
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>3.6.5 </version>
- </dependency>
复制代码 - 编写毗连类:
- public class RabbitMQConnection {
- /**
- * 获取连接
- */
- public static Connection getConnection() throws IOException, TimeoutException {
- // 1.创建连接
- ConnectionFactory connectionFactory = new ConnectionFactory();
- // 2.设置连接地址
- connectionFactory.setHost("127.0.0.1");
- // 3.设置端口号
- connectionFactory.setPort(5672);
- // 4.设置账号和密码
- connectionFactory.setUsername("guest");
- connectionFactory.setPassword("guest");
- // 5.设置 VirtualHost
- connectionFactory.setVirtualHost("/boyaVirtualHostsR");
- return connectionFactory.newConnection();
- }
- }
复制代码 - 编写生产者代码:
- public class Producer {
- private static final String QUEUE_NAME = "BoyatopMamber";
- /**
- * 获取连接
- */
- public static void main(String[] args) throws IOException, TimeoutException {
- while (true){
- // 1.创建连接
- Connection connection = RabbitMQConnection.getConnection();
- // 2.设置通道
- Channel channel = connection.createChannel();
- // 3.设置消息
- String msg = "Hello World";
- System.out.println("msg:" + msg);
- channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
- channel.close();
- connection.close();
- }
- }
- }
复制代码 - 编写消费者代码:
- public class Comsumer {
- private static final String QUEUE_NAME = "BoyatopMamber";
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1.创建链接
- Connection connection = RabbitMQConnection.getConnection();
- // 2.设置通道
- Channel channel = connection.createChannel();
- DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String msg = new String(body,"UTF-8");
- System.out.println("消费者获取消息:" + msg);
- }
- };
- // 3.监听队列
- channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
- }
- }
复制代码 - RabbitMQ 怎样包管消息不丢失:
- 生产者角色:
- 确保生产者角色投递到 MQ 服务器端乐成
- Ack 消息确认机制
- 同步或异步的形式:
- 消费者角色:
- 在 RabbitMQ 情况下:
- 必须要将消息消费乐成之后,才会将消息从 MQ 服务器端移除
- 在 kafka 中的情况下:
- 不管是消费乐成照旧消费失败,该消息都不会立即从 MQ 服务器移除
- MQ 服务器端:
- 在默认的情况下,都会对队列中的消息长期化,长期化硬盘
- 利用消息确认机制 + 长期化技术实现:
- A 消费者确认收到消息机制
- channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
复制代码
- 第二个参数值为 false,代表关闭 RabbitMQ 的主动应答机制,改为手动应答
- 在处置惩罚完消息时,返回应答状态,true 表示为主动应答模式
- channel.basicAck(envelope.getDeliveryTag(),false);
复制代码
- B 生产者确认投递消息乐成,利用 Confirm 机制或者事件消息
- Confirm 机制,同步或异步的形式
- RabbitMQ 默认创建是长期化的形式:
- 将代码中的 durable 设为 true
- 参数详解:
- Durability:是否长期化
- durable:长期化
- Transient:不长期化
- Auto delete:是否主动删除
- 当最后一个消费者断开毗连之后队列是否主动被删除
- 可以通过 RabbitMQ Management 查看某个队列的消费者数量
- 当 consumers = 0 时,队列就会主动删除
- 利用 RabbitMQ 事件:
- //设置事务
- channel.txSelect();
- channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
- channel.txCommit();
复制代码
- 生产者:
- public class producer {
- private static final String QUEUE_NAME = "BoyatopMamber";
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- // 1.创建新的连接
- Connection connection = RabbitMQConnection.getConnection();
- // 2.设置 channel
- Channel channel = connection.createChannel();
- // 3.发送消息
- String msg = "Hello my Bro";
- channel.confirmSelect();
- channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
- boolean result = channel.waitForConfirms();
- if(result){
- System.out.println("消息投递成功");
- }else {
- System.out.println("消息投递失败");
- }
- // 4.关闭资源
- channel.close();
- connection.close();
- }
- }
复制代码 - 消费者:
- public class Consumer {
- private static final String QUEUE_ANME = "BoyatopMamber";
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1.创建连接
- Connection connection = RabbitMQConnection.getConnection();
- //2.设置通道
- Channel channel = connection.createChannel();
- DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String msg = new String(body,"UTF-8");
- System.out.println("消费者获取消息:" + msg);
- //消费者完成 消费该消息
- channel.basicAck(envelope.getDeliveryTag(),false);
- }
- };
- // 3.监听队列
- channel.basicConsume(QUEUE_ANME,false,defaultConsumer);
- }
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |