java嵌入式持久化消息队列SMQ,改造自FQueue

打印 上一主题 下一主题

主题 507|帖子 507|积分 1521

一、说明

  之前项目中一直使用ConcurrentLinkedQueue做为缓冲队列(主要是单个项目内,单条改批量的场景,多个项目间使用的是rocketmq),虽然用着方便但是是纯内存的,
如果项目发生异常崩溃内存队列中的数据就会全部丢失(只能从日志中恢复)。所以一直想找一个简单高效支持持久化的嵌入式消息队列。中间用过activemq的嵌入模式,
虽然是支持持久化了,但是配置起来很繁琐,用起来也不简单,性能相比来说也不太行。
  后来偶然发现了FQueue,项目地址:https://github.com/tietang/fqueue 
看了看项目源码,纯java编写,总共没几个类。完全可以改造成我想要的 简单高效支持持久化的嵌入式消息队列。
二、改造

  1、因为是要做成嵌入式的,所以memcached协议相关的代码都删除了。
  2、预创建文件删除了,还有一些零零碎碎的改动。(好几年了,记不清了)。
  3、相较于原代码,改动最大的就是锁的部分,FQueue 读和写使用的是同一把锁,
我改成了读和写使用不同的锁,只在文件切换的时候使用同一把锁。性能大概提示了百分之20左右(本来就很快,锦上添花)。
  4、添加了内存队列,这个主要解决同一个机器创建了大量队列(上千)时,队列消息消费较快,因为使用了内存映射磁盘(每隔10ms就会调用force()同步磁盘),
频繁操作磁盘导致磁盘io过高的问题。默认情况下队列大小超过50时才会写入持久化队列。可以在项目启动时调用SMQ.setting(String dbPath, int logSize, int memoryQueueSize)
进行设置。
  1. [/code][size=6]三、使用[/size]
  2. [size=5]1、说明[/size]
  3.   目前是集成在我个人的工具类项目中的,已发布到中央仓库。项目地址:[url=https://github.com/shenbururen/sun-utils]https://github.com/shenbururen/sun-utils[/url]
  4. 该项目强依赖hutool,算是个人对hutool的个性化的扩展。如果不想依赖该项目,只想单纯的使用SMQ,可以将源码中 cn.sanenen.queue包复制出来,单独使用。
  5. [size=5]2、maven引用[/size]
  6. [code]<dependency>
  7.     <groupId>cn.sanenen</groupId>
  8.     <artifactId>sun-utils</artifactId>
  9.     <version>2.3.0</version>
  10. </dependency>
复制代码
3、调用

  SMQ使用时只有三个方法,向队列放入数据、从队列取出数据、获取队列大小(一般只在监控队列是否积压时使用,判断队列是否有数据,使用获取队列数据是否为null进行判断)。
  我一般是写一个单独的类,通过静态方法调用。
 
  1. /**
  2. * 本地持久化内存队列
  3. */
  4. public class MsgQueue {
  5.     private static final String testDataTopic = "testData";
  6.     /**
  7.      * 向队列放入数据,支持多线程。
  8.      */
  9.     public static void putTestData(TestData msg) {
  10.         SMQ.push(testDataTopic, JSON.toJSONString(msg));
  11.     }
  12.     /**
  13.      * 从队列取出数据,支持多线程。
  14.      */
  15.     public static TestData getTestData() {
  16.         String poll = SMQ.pop(testDataTopic);
  17.         if (StrUtil.isNotBlank(poll)) {
  18.             return JSON.parseObject(poll, TestData.class);
  19.         }
  20.         return null;
  21.     }
  22.     /**
  23.      * 获取队列大小
  24.      */
  25.     public static long getTestDataSize(){
  26.         return SMQ.size(testDataTopic);
  27.     }
  28. }
复制代码
四、注意事项

  1、默认会在项目目录下生成一个smq的文件夹用来存放队列数据。同一个smq的文件夹同时只可被一个项目使用。
  2、SMQ.setting(String dbPath, int logSize, int memoryQueueSize) 
    dbPath文件存储目录,默认是smq,会在项目目录下创建一个smq的目录。(还没测试过绝对路径)。
 
    logSize属性只可以在项目最开始时设置,之后不可以再设置不同的值。(也可以将生成的smq文件夹删除后重新启动进行设置)。
    memoryQueueSize是内存队列大小,默认是50,队列数据积压超过memoryQueueSize后才会写入持久化队列。(目前memoryQueueSize为0时还是会创建内存队列,这里之后会优化,不影响使用。)
项目使用了addShutdownHook,会在项目关闭时将内存队列消息写入持久化队列。结束项目时使用kill -15  不要用 -9。否则可能造成消息丢失。
    建议都使用默认的,也就是不要调用这个方法。避免调用出现问题。
  3、最好使用在不是要求百分百消息不丢失的场景。(在项目异常停止、服务器停电关机时,有概率丢失消息。)
  4、目前已经使用两年多。
 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

万万哇

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表