redis开门之批量插入pipeLine

打印 上一主题 下一主题

主题 895|帖子 895|积分 2685

Redis开门之批量插入Pipeline
下发数据同步到Redis中,数据少的话几千条,多则达百万级。此中一个场景是把下发的数据同步到Redis中,数据同步完成后,把数据写入到文件中,下发给客户,客户调用。某天......
产物司理:小A,我发现我们这个数据整体下发的流程耗时有点长啊...从拉取数据随处理下发快要一个小时的时间,每天处理的XX数量也不是许多,看下是哪块耗时比较久,优化下
小A:嗯...我们如今的处理逻辑是所有的使命完成后 才会统一进行同步Redis的操纵,而且现在的代码并没有批量处理数据到Redis,而是直接一条一条的处理,就很慢....
产物司理:一条一条?百万的数据 那大概处理多长时间啊?优化下吧...
小A:嗯,我看下怎么处理吧
看了之前同步Redis代码的逻辑(之前不是小A写的),异步处理、线程池、分批次、应有尽有但就是很慢。给出小A 优化前的大概框架。
  1. try {
  2.             redisTemplate.executePipelined(new SessionCallback<Object>() {
  3.                 @Override
  4.                 public <K, V> Object execute(RedisOperations<K, V> redisOperations) throws DataAccessException {
  5.                     for (JSONObject redisSynchro : json) {
  6.                         // 同步数据到redis
  7.                         redisTemplate.opsForList().rightPush("XXX")
  8.                         // 设置过期时间
  9.                         redisTemplate.expire()
  10.                     }
  11.                     return null;
  12.                 }
  13.             });
  14.         } catch (Exception e) {
  15.             log.info("同步redis发生意外异常 :" + e.getMessage());
  16.         } finally {
  17.             countDownLatch.countDown();
  18.         }
复制代码
小A查了下Redis批量处理数据的方法 网上给出的博客就是pipeline操纵啊,代码里用的就是executePipelined 实行管道操纵。嗯... 不对再查查。executePipelined的实行原理是什么?

  • 管道(Pipeline) 是 Redis 提供的一种优化手段,允许客户端将多个命令一次性发送到 Redis 服务器,而不是每次发送一个命令并等待相应。这样可以淘汰网络延迟,提高操纵效率。
  • executePipelined() 方法的本意是将多个 Redis 操纵封装为一个管道批量实行,返回所有操纵的结果。
  • 客户端批量发送命令 → 服务器批量实行命令 → 客户端一次性接收所有结果。


为什么第一次的实行没有通过管道实行呢?
封装条理过高: executePipelined 中使用了 redisTemplate 的操纵方法,而不是直接通过 RedisConnection 来实行 Redis 的底层命令。
分散的命令: 每个 rightPush和 expire 方法都各自独立调用,即使这些操纵在同一个 executePipelined 方法中,也无法通过 Redis 的原生管道机制优化。
以是如果要想使用Redis的管道操纵,应该制止使用高条理封装的redisTemplat。于是乎,小A改变了上面的实行方式
  1. redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
  2.     StringRedisSerializer serializer = new StringRedisSerializer();
  3.     for (ResultInfo result : tmps) {
  4.         // 组装数据 key/value
  5.         // 检查元素是否已存在
  6.         if (检查条件) {
  7.             // 如果不存在,则插入
  8.             connection.rPush(serializer.serialize(key), serializer.serialize(value));
  9.         }
  10.         // 设置过期时间
  11.         connection.expire(serializer.serialize(key), 30 * 24 * 3600L);   
  12.     }
  13.     return null;
  14. });
复制代码
使用了 StringRedisSerializer 对键和值进行序列化。序列化方式直接处理 Redis 底层连接的操纵,通常适用于高性能的场景。
经过实测13W级数据插入耗时3秒,9W数据耗时1秒。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

圆咕噜咕噜

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表