消息队列——数十万级消息的消费方案

打印 上一主题 下一主题

主题 870|帖子 870|积分 2620

配景:

​        卑鄙平台通过消息队列上报监控消息,但是消息量很大,在三分钟左右可以到达百万级别,而对于我的服务来说,我需要对这些消息进行一些业务处理,然后再存入es中。(为了简化场景,以下对于消息的处理只是单纯的存储到es中)

服务启动不到10s,es中写入的数据
青铜方案:

​        MQ只要收到消息,就直接调用es进行存储。
伪代码如下:
  1. // 伪代码版本
  2. public void processRequestMessage(MessageInfo info) {
  3.     // 将接收到的信息对象复制为一个新对象(例如,监控数据对象)
  4.      MonitorData monitorData = Util.copyProperties(info, MonitorData.class);
  5.     // 将新对象的 JSON 字符串索引到 Elasticsearch 中
  6.      elasticClient.index("monitor_index", info.getId(), convertToJson(monitorData), false);
  7. }
复制代码
存在的问题:

​        不难发现,如许的实现方式,会导致消息消费速度非常慢,甚至导致消息积压和服务挂掉,因为这里对es的调用次数=消息条数,通过在本地的测试中也可以发现,纵然在关闭掉消息生产者后,还是需要很长一段时间才能将消息消费完全消费掉。
白银方案:

​        通过瓶颈,可以很自然的想到使用es的批量增加,那么只需要实现一个缓冲池,将消息暂存到缓冲池中,在到达一定大小的时候再同一在es存储
伪代码如下:
  1. @Component
  2. @Slf4j
  3. public class ESOperationMonitorBuffer{
  4.     private static final int BUFFER_SIZE = 100;  // 缓冲池大小
  5.     private List<ElasticDoc> buffer;  // 用于存储消息的缓冲池
  6.     @Autowired
  7.     private ElasticClient elasticClient;
  8.     private String indexName = EsConstans.NODE_MONITOR_INDEX;  // Elasticsearch 索引名称
  9.     public ESOperationMonitorBuffer() {
  10.         this.buffer = new ArrayList<>();
  11.     }
  12.     // 添加消息到缓冲池
  13.     public void addMessage(WlwMessageShareInfo message)  {
  14.         ElasticDoc elasticDoc = new ElasticDoc();
  15.         elasticDoc.setIndex(indexName);
  16.         MonitorESData monitorESData = BeanUtil.copyProperties(message, MonitorESData.class);
  17.         elasticDoc.setDoc(JSONObject.toJSONString(monitorESData));
  18.         buffer.add(elasticDoc);
  19.         if(buffer.size() > BUFFER_SIZE){
  20.                 flush();
  21.         }
  22.     }
  23.     /**
  24.      * 执行 flush 操作
  25.      */
  26.     private void flush()  {
  27.        log.info("开始批量插入 Elasticsearch,共 {} 条数据", buffer.size());
  28.             if (buffer.isEmpty()) {
  29.                 return;  // 如果缓冲池为空,不执行操作
  30.             }
  31.             BulkResponse index = elasticClient.index(buffer, false);// 批量插入
  32.             if (index.hasFailures()) {
  33.                 log.error("批量插入 Elasticsearch 失败,失败原因:{}", index.buildFailureMessage());
  34.             }
  35.             // 清空缓冲池
  36.             buffer.clear();
  37.     }
  38.     // 如果程序关闭前有剩余数据,执行 flush 操作
  39.     public void close() throws IOException {
  40.     }
  41. }
复制代码
存在的问题:


  • 若消息数目一直没到达阈值,就一直不会保存到es
  • 存在并发问题,ConcurrentModificationException(并发修改非常),是基于java聚集中的 快速失败(fail-fast) 机制产生的,在使用迭代器遍历一个聚集对象时,假如遍历过程中对聚集对象的内容进行了增编削,就会抛出该非常。快速失败机制使得java的聚集类不能在多线程下并发修改,也不能在迭代过程中被修改。在上面场景中的体现就是在flush操作中时,又有消息进入到了buffer中。
黄金方案:

​        解决问题1:可以开启一个定时任务去执行flush方法
​        解决问题2:大概大家第一时间会想到对buffer加锁,但是如许又会导致在存入buffer的时候速度慢,所以不难想到可以对 flush() 方法加锁, 但是如许一来还是无法解决buffer存在的并发问题,怎么办呢?其实很简单,我们可以用两个buffer来分别给add()方法和flush()方法使用,如许一来,就可以制止并发问题,并且继续对flush()方法加锁,制止和定时任务同时执行,导致数据重复。
伪代码如下:
  1. @Component
  2. @Slf4j
  3. public class ESOperationMonitorBuffer implements CommandLineRunner {
  4.     private static final int BUFFER_SIZE = 100;  // 缓冲池大小
  5.     private List<ElasticDoc> buffer;  // 用于存储消息的缓冲池
  6.     private List<ElasticDoc> temBuffer;  // 用于存储临时消息的缓冲池
  7.     @Autowired
  8.     private ElasticClient elasticClient;
  9.     private String indexName = EsConstans.NODE_MONITOR_INDEX;  // Elasticsearch 索引名称
  10.     private Lock lock = new ReentrantLock();
  11.    
  12.     public ESOperationMonitorBuffer() {
  13.         this.buffer = new ArrayList<>();
  14.         this.temBuffer = new ArrayList<>();
  15.     }
  16.     // 添加消息到缓冲池
  17.     public void addMessage(WlwMessageShareInfo message)  {
  18.         ElasticDoc elasticDoc = new ElasticDoc();
  19.         elasticDoc.setIndex(indexName);
  20.         MonitorESData monitorESData = BeanUtil.copyProperties(message, MonitorESData.class);
  21.         elasticDoc.setDoc(JSONObject.toJSONString(monitorESData));
  22.         temBuffer.add(elasticDoc);
  23.         // 当缓冲池达到设定大小时,批量插入到 Elasticsearch
  24.         if (temBuffer.size() >= BUFFER_SIZE) {
  25.             lock.lock();
  26.             try{
  27.                 buffer.addAll(temBuffer);
  28.                 temBuffer.clear();
  29.             }catch(Exception e){
  30.                 log.error("添加消息到缓冲池失败",e);
  31.             }finally {
  32.                 lock.unlock();
  33.             }
  34.             flush();
  35.         }
  36.     }
  37.     /**
  38.      * 执行 flush 操作
  39.      */
  40.     private void flush()  {
  41.         lock.lock();
  42.         try{
  43.             log.info("开始批量插入 Elasticsearch,共 {} 条数据", buffer.size());
  44.             if (buffer.isEmpty()) {
  45.                 return;  // 如果缓冲池为空,不执行操作
  46.             }
  47.             BulkResponse index = elasticClient.index(buffer, false);// 批量插入
  48.             if (index.hasFailures()) {
  49.                 log.error("批量插入 Elasticsearch 失败,失败原因:{}", index.buildFailureMessage());
  50.             }
  51.             // 清空缓冲池
  52.             buffer.clear();
  53.         }catch (Exception e){
  54.             log.info("批量插入 Elasticsearch 失败",e);
  55.         }finally {
  56.             lock.unlock();
  57.         }
  58.     }
  59.     // 如果程序关闭前有剩余数据,执行 flush 操作
  60.     public void close() throws IOException {
  61.     }
  62.     @Override
  63.     public void run(String... args) throws Exception {
  64.         log.info("启动 ESOperationMonitorBuffer 缓冲池,开启线程池定时执行flush操作");
  65.         // 定时执行 flush 操作
  66.         ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
  67.         scheduledExecutorService.scheduleAtFixedRate(()->{
  68.             try{
  69.                 flush();
  70.             }catch (Exception e){
  71.                 log.error("定时执行 flush 操作失败",e);
  72.             }
  73.         },1,5, TimeUnit.SECONDS);
  74.     }
  75. }
复制代码



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

泉缘泉

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表