更多相干内容可查看
增加消费者数量
假设当前利用的是 RabbitMQ,且只有一个消费者在处置惩罚消息,该消费者处置惩罚一条消息平均耗时 100 毫秒(即每秒处置惩罚 10 条消息)。那么增加到 10 个消费者后,如果每个消费者的处置惩罚能力相同且不存在资源竞争等题目,理论上处置惩罚速度可提升到每秒 100 条消息。
案例:在一个电商订单处置惩罚体系中,订单创建成功后会发送一条消息到 MQ,本来单个消费者负责处置惩罚订单消息并更新库存、天生物流订单等操作。当消息大量堆积时,可启动多个消费者实例,每个实例都能独立地从 MQ 中获取订单消息并进行处置惩罚。但在实际情况中,若消费者必要对共享资源(如数据库连接池)进行操作,就必要公道配置资源,制止出现连接数过多导致数据库性能下降的题目。比如,利用连接池技术时,根据消费者数量和数据库服务器的负载能力,调整连接池的最大连接数和最小连接数,确保每个消费者都能获取到可用连接且不会过分消耗数据库资源。
示例代码,同时考虑了利用连接池技术来公道管理数据库连接:
- public class RabbitMQConsumer {
- private static final String QUEUE_NAME = "order_queue";
- private static final String RABBITMQ_HOST = "localhost";
- private static final int RABBITMQ_PORT = 5672;
- private static final String RABBITMQ_USERNAME = "guest";
- private static final String RABBITMQ_PASSWORD = "guest";
- // 数据库连接池配置
- private static final String JDBC_URL = "jdbc:mysql://localhost:3306/your_database";
- private static final String JDBC_USERNAME = "root";
- private static final String JDBC_PASSWORD = "password";
- private static final int MAX_POOL_SIZE = 20;
- private static final int MIN_POOL_SIZE = 5;
- private static final DataSource dataSource;
- static {
- BasicDataSource ds = new BasicDataSource();
- ds.setUrl(JDBC_URL);
- ds.setUsername(JDBC_USERNAME);
- ds.setPassword(JDBC_PASSWORD);
- ds.setInitialSize(MIN_POOL_SIZE);
- ds.setMaxTotal(MAX_POOL_SIZE);
- dataSource = ds;
- }
- public static void main(String[] args) {
- int numConsumers = 10; // 增加到 10 个消费者
- //使用 `ExecutorService` 来创建一个固定大小的线程池,大小为 `numConsumers`(这里是 10),用于启动多个消费者线程
- ExecutorService executorService = Executors.newFixedThreadPool(numConsumers);
- //ConnectionFactory用于创建 RabbitMQ 的连接。我们设置了 RabbitMQ 的主机、端口、用户名和密码
- try (ConnectionFactory factory = new ConnectionFactory()) {
- factory.setHost(RABBITMQ_HOST);
- factory.setPort(RABBITMQ_PORT);
- factory.setUsername(RABBITMQ_USERNAME);
- factory.setPassword(RABBITMQ_PASSWORD);
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //声明了一个持久化的队列,确保队列在 RabbitMQ 服务器重启后仍然存在。
- channel.queueDeclare(QUEUE_NAME, true, false, false, null);
- // 设置了每个消费者的预取数量为 10,这意味着每个消费者一次最多接收 10 条消息,防止消费者接收过多消息而无法及时处理
- channel.basicQos(10);
- for (int i = 0; i < numConsumers; i++) {
- executorService.execute(() -> {
- try {
- // 每个消费者创建一个新的 channel 来接收消息
- Channel consumerChannel = connection.createChannel();
- consumerChannel.basicConsume(QUEUE_NAME, false, new DeliverCallback() {
- @Override
- public void handle(String consumerTag, Delivery delivery) throws IOException {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println("Received: " + message);
- // 从连接池获取数据库连接
- try (Connection dbConnection = dataSource.getConnection()) {
- // 处理订单消息,更新库存、生成物流订单等操作
- processOrderMessage(dbConnection, message);
- // 手动确认消息
- consumerChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- } catch (SQLException e) {
- e.printStackTrace();
- // 消息处理失败,不确认消息,将重新放回队列
- }
- }
- }, consumerTag -> {
- });
- } catch (IOException | TimeoutException e) {
- e.printStackTrace();
- }
- });
- }
- // 等待一段时间后关闭资源
- Thread.sleep(60000);
- executorService.shutdown();
- channel.close();
- connection.close();
- } catch (IOException | TimeoutException | InterruptedException | SQLException e) {
- e.printStackTrace();
- }
- }
- private static void processOrderMessage(Connection connection, String message) throws SQLException {
- String sql = "INSERT INTO order_processing_log (message) VALUES (?)";
- try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
- preparedStatement.setString(1, message);
- preparedStatement.executeUpdate();
- }
- }
- }
复制代码 优化消费者处置惩罚逻辑
- 批量处置惩罚
例如在一个数据收集体系中,MQ 中堆积了大量的传感器数据消息,每个消息包罗单个传感器的一次数据收罗结果。本来消费者是收到一条消息就将数据插入数据库。优化后,可以每收集 100 条消息,将这 100 个传感器数据组合成一个批量插入语句,一次性插入数据库。如许可以大大减少数据库事件的开启和关闭次数,进步处置惩罚服从。比如,利用 MySQL 数据库时,本来逐条插入 100 万条数据可能必要数小时,而接纳批量插入(假设每次批量插入 100 条)可能将时间收缩到几非常钟。
示例:
- public class BatchProcessingConsumer {
- private static final String QUEUE_NAME = "sensor_data_queue";
- private static final String RABBITMQ_HOST = "localhost";
- private static final int RABBITMQ_PORT = 5672;
- private static final String RABBITMQ_USERNAME = "guest";
- private static final String RABBITMQ_PASSWORD = "guest";
- private static final String JDBC_URL = "jdbc:mysql://localhost:3306/your_database";
- private static final String JDBC_USERNAME = "root";
- private static final String JDBC_PASSWORD = "password";
- private static final int BATCH_SIZE = 100;
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- List<String> batch = new ArrayList<>();
- try {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(RABBITMQ_HOST);
- factory.setPort(RABBITMQ_PORT);
- factory.setUsername(RABBITMQ_USERNAME);
- factory.setPassword(RABBITMQ_PASSWORD);
- connection = DriverManager.getConnection(JDBC_URL, JDBC_USERNAME, JDBC_PASSWORD);
- channel = factory.newConnection().createChannel();
- channel.queueDeclare(QUEUE_NAME, true, false, false, null);
- channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
- @Override
- public void handle(String consumerTag, Delivery delivery) throws IOException {
- String message = new String(delivery.getBody(), "UTF-8");
- batch.add(message);
- if (batch.size() >= BATCH_SIZE) {
- insertBatch(connection, batch);
- batch.clear();
- }
- }
- }, consumerTag -> {
- });
- } catch (IOException | SQLException e) {
- e.printStackTrace();
- } finally {
- try {
- if (channel!= null) {
- channel.close();
- }
- if (connection!= null) {
- connection.close();
- }
- } catch (IOException | SQLException e) {
- e.printStackTrace();
- }
- }
- }
- private static void insertBatch(Connection connection, List<String> batch) {
- String sql = "INSERT INTO sensor_data (data) VALUES (?)";
- try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
- for (String message : batch) {
- preparedStatement.setString(1, message);
- preparedStatement.addBatch();
- }
- preparedStatement.executeBatch();
- } catch (SQLException e) {
- e.printStackTrace();
- }
- }
- }
复制代码
- 异步处置惩罚
以一个用户注册体系为例,当用户注册成功后,会向 MQ 发送一条注册成功消息,消费者必要记录注册日志、发送注册成功邮件以及更新用户统计信息。此中,记录注册日志相对不那么紧急,可以将其异步处置惩罚。比如,消费者在收到注册成功消息后,立即在主线程中发送注册成功邮件和更新用户统计信息,而将记录注册日志的操作交给一个异步线程池处置惩罚。如许,消费者就可以更快地处置惩罚下一条注册消息,进步团体处置惩罚速度。
- public class AsynchronousProcessingConsumer {
- private static final String QUEUE_NAME = "user_registration_queue";
- private static final String RABBITMQ_HOST = "localhost";
- private static final int RABBITMQ_PORT = 5672;
- private static final String RABBITMQ_USERNAME = "guest";
- private static final String RABBITMQ_PASSWORD = "guest";
- public static void main(String[] args) {
- ExecutorService executorService = Executors.newFixedThreadPool(2);
- try (ConnectionFactory factory = new ConnectionFactory()) {
- factory.setHost(RABBITMQ_HOST);
- factory.setPort(RABBITMQ_PORT);
- factory.setUsername(RABBITMQ_USERNAME);
- factory.setPassword(RABBITMQ_PASSWORD);
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- channel.queueDeclare(QUEUE_NAME, true, false, false, null);
- channel.basicConsume(QUEUE_NAME, false, new DeliverCallback() {
- @Override
- public void handle(String consumerTag, Delivery delivery) throws IOException {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println("Received: " + message);
- // 处理注册消息
- processRegistrationMessage(message, executorService);
- // 手动确认消息
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- }
- }, consumerTag -> {
- });
- // 让程序保持运行
- Thread.sleep(Long.MAX_VALUE);
- } catch (InterruptedException | TimeoutException e) {
- e.printStackTrace();
- }
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- executorService.shutdown();
- }
- }
- private static void processRegistrationMessage(String message, ExecutorService executorService) {
- // 发送注册成功邮件和更新用户统计信息在主线程
- sendRegistrationEmail(message);
- updateUserStats(message);
- // 异步处理记录注册日志
- executorService.execute(() -> logRegistration(message));
- }
- private static void sendRegistrationEmail(String message) {
- // 实际的发送邮件逻辑,这里仅作示例
- System.out.println("Sending registration email for: " + message);
- }
- private static void updateUserStats(String message) {
- // 实际的更新用户统计信息逻辑,这里仅作示例
- System.out.println("Updating user stats for: " + message);
- }
- private static void logRegistration(String message) {
- // 实际的日志记录逻辑,这里仅作示例
- System.out.println("Logging registration for: " + message);
- }
- }
复制代码 假设在一个商品推荐体系中,消费者从 MQ 中获取用户的欣赏历史消息,然后根据这些消息计算推荐商品。原始的计算逻辑可能是先查询数据库获取用户的全部历史欣赏商品类别,再对每个类别进行复杂的统计分析,最后根据分析结果查询商品库获取推荐商品。优化时,可以先对数据进行预处置惩罚,例如在用户欣赏商品时就将欣赏数据进行分类统计并存储在缓存中,消费者只需从缓存中获取统计结果并进行简单计算即可得到推荐商品,大大简化了业务逻辑,进步了处置惩罚速度。
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Map;
- public class OptimizedBusinessLogicConsumer {
- private static final String QUEUE_NAME = "browsing_history_queue";
- private static final String RABBITMQ_HOST = "localhost";
- private static final int RABBITMQ_PORT = 5672;
- private static final String RABBITMQ_USERNAME = "guest";
- private static final String RABBITMQ_PASSWORD = "guest";
- private static final Map<String, Map<String, Integer>> userBrowsingStatsCache = new HashMap<>();
- public static void main(String[] args) {
- try (ConnectionFactory factory = new ConnectionFactory()) {
- factory.setHost(RABBITMQ_HOST);
- factory.setPort(RABBITMQ_PORT);
- factory.setUsername(RABBITMQ_USERNAME);
- factory.setPassword(RABBITMQ_PASSWORD);
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- channel.queueDeclare(QUEUE_NAME, true, false, false, null);
- channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
- @Override
- public void handle(String consumerTag, Delivery delivery) throws IOException {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println("Received: " + message);
- // 处理浏览历史消息
- processBrowsingHistory(message);
- // 计算推荐商品
- recommendProducts(message);
- }
- }, consumerTag -> {
- });
- // 让程序保持运行
- Thread.sleep(Long.MAX_VALUE);
- } catch (InterruptedException | TimeoutException e) {
- e.printStackTrace();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- private static void processBrowsingHistory(String message) {
- // 假设 message 是 JSON 格式,包含用户 ID 和浏览的商品类别
- // 这里仅作示例,实际可能需要解析 JSON
- String userId = "user123";
- String category = "electronics";
- // 先更新缓存中的浏览统计信息
- userBrowsingStatsCache.computeIfAbsent(userId, k -> new HashMap<>())
- .compute(category, (k, v) -> v == null? 1 : v + 1);
- // 实际中可以将缓存更新到持久存储,这里暂不展示
- }
- private static void recommendProducts(String message) {
- // 假设 message 是 JSON 格式,包含用户 ID
- // 这里仅作示例,实际可能需要解析 JSON
- String userId = "user123";
- // 从缓存中获取用户的浏览统计信息
- Map<String, Integer> userStats = userBrowsingStatsCache.get(userId);
- if (userStats!= null) {
- // 根据缓存中的统计信息进行简单推荐
- System.out.println("Recommending products based on user stats in cache for user: " + userId);
- } else {
- System.out.println("No browsing history found for user: " + userId);
- }
- }
- }
复制代码 调整消息队列参数
- 增加队列容量和消息预取数量
-对于 RabbitMQ,如果当前队列容量设置为只能容纳 10 万条消息,而消息堆积量达到 100 万条,就必要调整队列容量。例如,将队列的最大长度参数(x-max-length)设置为 200 万或更大,以防止消息因队列满而被扬弃。同时,调整消费者的消息预取数量(basicQos)。
代码示例:
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class RabbitMQConsumerAdjustments {
- private static final String QUEUE_NAME = "promotion_queue";
- private static final String RABBITMQ_HOST = "localhost";
- private static final int RABBITMQ_PORT = 5672;
- private static final String RABBITMQ_USERNAME = "guest";
- private static final String RABBITMQ_PASSWORD = "guest";
- public static void main(String[] args) {
- try (ConnectionFactory factory = new ConnectionFactory()) {
- factory.setHost(RABBITMQ_HOST);
- factory.setPort(RABBITMQ_PORT);
- factory.setUsername(RABBITMQ_USERNAME);
- factory.setPassword(RABBITMQ_PASSWORD);
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- // 声明队列,并调整队列的最大长度(x-max-length)为 200 万
- Map<String, Object> argsMap = new HashMap<>();
- argsMap.put("x-max-length", 2000000);
- channel.queueDeclare(QUEUE_NAME, true, false, false, argsMap);
- // 调整消费者的消息预取数量为 100
- channel.basicQos(100);
- channel.basicConsume(QUEUE_NAME, false, new DeliverCallback() {
- @Override
- public void handle(String consumerTag, Delivery delivery) throws IOException {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println("Received: " + message);
- // 这里添加处理消息的逻辑,例如打印消息内容
- try {
- Thread.sleep(100); // 模拟消息处理时间
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- // 手动确认消息
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- }
- }, consumerTag -> {
- });
- // 等待一段时间,让消费者持续消费消息
- Thread.sleep(60000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- } catch (IOException | TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
复制代码
- 调整消息过期时间和死信队列设置
假设在一个限时促销活动的消息处置惩罚场景中,MQ 中有许多促销活动相干的消息,如优惠券发放消息等。本来优惠券发放消息的过期时间设置为 1 小时,由于消息堆积导致许多消息还未被处置惩罚就过期了。可以将过期时间调整为 2 小时,给消费者更多时间处置惩罚。同时,设置公道的死信队列。例如,当消费者尝试处置惩罚一条优惠券发放消息 5 次都失败(可能是因为接收用户不存在等原因),就将该消息转移到死信队列。如许可以制止这些无法处置惩罚的消息不停堆积在原队列中影响正常消息的处置惩罚,而且可以对死信队列中的消息进行后续分析,找出处置惩罚失败的原因并进行修复。
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.TimeoutException;
- public class RabbitMQDeadLetterQueue {
- private static final String QUEUE_NAME = "promotion_queue";
- private static final String DEAD_LETTER_QUEUE_NAME = "dead_letter_queue";
- private static final String RABBITMQ_HOST = "localhost";
- private static final int RABBITMQ_PORT = 5672;
- private static final String RABBITMQ_USERNAME = "guest";
- private static final String RABBITMQ_PASSWORD = "guest";
- public static void main(String[] args) {
- try (ConnectionFactory factory = new ConnectionFactory()) {
- factory.setHost(RABBITMQ_HOST);
- factory.setPort(RABBITMQ_PORT);
- factory.setUsername(RABBITMQ_USERNAME);
- factory.setPassword(RABBITMQ_PASSWORD);
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- // 声明死信队列
- channel.queueDeclare(DEAD_LETTER_QUEUE_NAME, true, false, false, null);
- // 声明主队列,并设置消息过期时间和死信队列参数
- Map<String, Object> argsMap = new HashMap<>();
- argsMap.put("x-message-ttl", 7200000); // 消息过期时间为 2 小时(2 * 60 * 60 * 1000 毫秒)
- argsMap.put("x-dead-letter-exchange", "");
- argsMap.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_NAME);
- channel.queueDeclare(QUEUE_NAME, true, false, false, argsMap);
- // 调整消费者的消息预取数量为 10
- channel.basicQos(10);
- channel.basicConsume(QUEUE_NAME, false, new DeliverCallback() {
- int retryCount = 0;
- @Override
- public void handle(String consumerTag, Delivery delivery) throws IOException {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println("Received: " + message);
- try {
- // 这里添加处理消息的逻辑,例如打印消息内容
- if (retryCount < 5) {
- // 模拟消息处理失败,仅在重试次数小于 5 次时重试
- retryCount++;
- throw new RuntimeException("Message processing failed");
- } else {
- System.out.println("Message processed successfully");
- // 手动确认消息
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- }
- } catch (RuntimeException e) {
- System.out.println("Message processing failed, retrying...");
- // 不确认消息,让消息重新排队或进入死信队列
- }
- }
- }, consumerTag -> {
- });
- // 等待一段时间,让消费者持续消费消息
- Thread.sleep(60000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- } catch (IOException | TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
复制代码 数据分流与持久化计谋调整
- 数据分流
- 在一个大型企业的消息处置惩罚体系中,MQ 中有多种范例的消息,如业务订单消息、体系日志消息、员工通知消息等。当出现 100 万消息堆积时,可以根据消息范例进行分流。例如,将业务订单消息分配给一组高性能的消费者,这些消费者运行在性能较好的服务器上,而且接纳更优化的处置惩罚逻辑(如批量处置惩罚、异步处置惩罚等);将体系日志消息分配给另一组消费者,由于日志消息对处置惩罚实时性要求相对较低,可以接纳较低优先级的处置惩罚方式;员工通知消息则分配给专门的通知服务消费者进行处置惩罚。如许可以制止差别范例的消息相互干扰,进步团体处置惩罚服从。
- 持久化计谋调整
- 例如在一个数据备份相对美满的文件处置惩罚体系中,MQ 中有大量的文件处置惩罚使命消息。本来消息接纳磁盘持久化,以确保在任何情况下消息都不会丢失。但在消息大量堆积且处置惩罚速度成为关键题目时,可以考虑将部分可重新天生的文件处置惩罚使命消息的持久化计谋调整为内存持久化(如果消息队列支持)。比如,对于一些可以从源文件重新解析天生处置惩罚使命的消息,接纳内存持久化可以加快消费者获取和处置惩罚消息的速度。不外,必要在体系中增加额外的监控和规复机制,一旦出现体系故障导致内存中的消息丢失,可以从源文件重新天生使命消息并发送到 MQ 中进行处置惩罚,以降低消息丢失的风险。
监控与预警体系优化
- 实时监控消息堆积情况
- 利用 Prometheus 联合 RabbitMQ 的 Exporter 插件来监控 RabbitMQ 的各项指标。例如,通过配置 Exporter 可以获取 RabbitMQ 队列中的消息数量(rabbitmq_queue_messages指标)、消费者数量(rabbitmq_queue_consumers指标)、消息入队速率(rabbitmq_queue_messages_publish_rate指标)和消息出队速率(rabbitmq_queue_messages_deliver_rate指标)等。在 Grafana 中创建仪表盘,直观地展示这些指标的变化趋势。设置告警规则,当消息堆积数量(即rabbitmq_queue_messages指标)超过 50 万条时,通过邮件、短信或者即时通讯工具(如 Slack)向运维人员发送告警信息,以便及时采取步伐。
- 分析堆积原因
- 除了监控指标外,还必要对消息堆积的原因进行深入分析。可以通过查看消息队列的日志文件来获取更多信息。例如,在 RabbitMQ 中,日志文件可能记录了生产者发送消息的速率变化、消费者连接断开的时间和原因等。如果发现生产者发送消息的速率忽然大幅增加,可能是因为业务高峰期或者某个上游体系出现故障导致大量消息积存发送;如果是消费者连接断开频繁,可能是消费者所在的服务器资源不足(如内存溢出、网络故障等)导致消费者无法正常工作。通太过析这些原因,针对性地采取办理方案,如在业务高峰期增加资源或者优化上游体系的消息发送逻辑,修复消费者所在服务器的资源题目或网络故障等,从根本上办理消息堆积题目,防止再次发生。
简答总结:
1.增加消费者的数量,多线程异步去消费
2.如果有io操作,批量处置惩罚
3.优化业务逻辑,做一下预处置惩罚,放到缓存种,消费的时候直接去从缓存取
4.增加队列容积跟消息预取数量qos
5.增加消息过期时间,制止因为消费过慢产生消息丢失。
6.将消费失败的消息,进行重试,一定重试次数后还失败的扔到死信队列中
7.数据分流,如果时效性要求没那么高的比如日志的消息,可以交给异步线程池去处置惩罚,要求时效性高的消息,扔给高效的消费者组去消费
8.将磁盘持久化切换为内存持久化(如果队列支持),加快消费者的获取和处置惩罚消息的速度
9.上述文章末尾的监控本领及参数,并在达到一定堆积阈值,触发告警提示,比如邮件,短信等
10.根据消息队列的日志文件进行堆积原因分析
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |