IT评测·应用市场-qidao123.com技术社区

标题: 【Redisson使用手册】分布式对象 [打印本页]

作者: 罪恶克星    时间: 2024-8-15 12:43
标题: 【Redisson使用手册】分布式对象
每个 Redisson 对象实例都会有一个与之对应的 Redis 数据实例,可以通过调用 getName 方法来取得 Redis 数据实例的名称(key)。
一、通用对象桶(Object Bucket)

Redisson 的分布式 RBucket ,Java 对象是一种通用对象桶可以用来存放任范例的对象。 
  1. RBucket<Object> bucket = redisson.getBucket("test:bucket:key:001");
  2. bucket.set(new Object());
  3. Object obj = bucket.get();
  4. bucket.trySet(new Object());
  5. bucket.compareAndSet(new Object(), new OObject());
  6. bucket.getAndSet(new Object());
复制代码
还可以通过 RBuckets 接口实现批量操作多个 RBucket 对象
  1. RBuckets buckets = redisson.getBuckets();
  2. List<RBucket<V>> foundBuckets = buckets.find("test:bucket:key*");
  3. Map<String, V> loadedBuckets = buckets.get("test:bucket:key:001", "test:bucket:key:002", "test:bucket:key:003");
  4. Map<String, Object> map = new HashMap<>();
  5. map.put("myBucket1", new MyObject());
  6. map.put("myBucket2", new MyObject());
  7. // 利用Redis的事务特性,同时保存所有的通用对象桶,如果任意一个通用对象桶已经存在则放弃保存其他所有数据。
  8. buckets.trySet(map);
  9. // 同时保存全部通用对象桶。
  10. buckets.set(map);
复制代码
单位测试
  1.     /**
  2.      * 获取缓存
  3.      *
  4.      * @param key   key
  5.      * @param clazz 类型
  6.      * @param <T>   类型
  7.      */
  8.     public <T> T get(String key, Class<T> clazz) {
  9.         RBucket<T> rBucket = redissonClient().getBucket(key);
  10.         return clazz.cast(rBucket.get());
  11.     }
复制代码
  1. @Test
  2.     public void test_deleteRBucket_multi() {
  3.         String redisKey1 = "test:deleteRBucket:multi:1";
  4.         String redisValue1 = primaryRedissonHelper.get(redisKey1, String.class);
  5.         Assertions.assertNull(redisValue1);
  6.         primaryRedissonHelper.set(redisKey1, "test_deleteRBucket_single_multi_1", 30);
  7.         redisValue1 = primaryRedissonHelper.get(redisKey1, String.class);
  8.         Assertions.assertNotNull(redisValue1);
  9.         Assertions.assertEquals("test_deleteRBucket_single_multi_1", redisValue1);
  10.         String redisKey2 = "test_deleteRBucket_multi_2";
  11.         String redisValue2 = primaryRedissonHelper.get(redisKey2, String.class);
  12.         Assertions.assertNull(redisValue2);
  13.         primaryRedissonHelper.set(redisKey2, "test_deleteRBucket_single_multi_2", 30);
  14.         redisValue2 = primaryRedissonHelper.get(redisKey2, String.class);
  15.         Assertions.assertNotNull(redisValue2);
  16.         Assertions.assertEquals("test_deleteRBucket_single_multi_2", redisValue2);
  17.         primaryRedissonHelper.deleteBucket(redisKey1, redisKey2);
  18.         redisValue1 = primaryRedissonHelper.get(redisKey1, String.class);
  19.         Assertions.assertNull(redisValue1);
  20.         redisValue2 = primaryRedissonHelper.get(redisKey2, String.class);
  21.         Assertions.assertNull(redisValue2);
  22.     }
复制代码

二、二进制流(Binary Stream)

Redisson 的分布式 RBinaryStream ,Java 对象同时提供了 InputStream 接口和 OutputStream 接口的实现。流的最大容量受 Redis 主节点的内存大小限制。
  1. RBinaryStream stream = redisson.getBinaryStream("test:stream:001");
  2. byte[] content = ...
  3. stream.set(content);
  4. InputStream is = stream.getInputStream();
  5. byte[] readBuffer = new byte[512];
  6. is.read(readBuffer);
  7. OutputStream os = stream.getOuputStream();
  8. byte[] contentToWrite = ...
  9. os.write(contentToWrite);
复制代码
三、地理空间对象桶(Geospatial Bucket)

Redisson 的分布式RGeo,Java 对象是一种专门用来储存与地理位置有关的对象桶。
  1. RGeo<String> geo = redisson.getGeo("test:geo:001");
  2. geo.add(new GeoEntry(13.361389, 38.115556, "Palermo"),
  3.         new GeoEntry(15.087269, 37.502669, "Catania"));
  4. geo.addAsync(37.618423, 55.751244, "Moscow");
  5. Double distance = geo.dist("Palermo", "Catania", GeoUnit.METERS);
  6. geo.hashAsync("Palermo", "Catania");
  7. Map<String, GeoPosition> positions = geo.pos("test2", "Palermo", "test3", "Catania", "test1");
  8. List<String> cities = geo.radius(15, 37, 200, GeoUnit.KILOMETERS);
  9. Map<String, GeoPosition> citiesWithPositions = geo.radiusWithPosition(15, 37, 200, GeoUnit.KILOMETERS);
复制代码
四、原子长整型(AtomicLong)

Redisson 的分布式整长形 RAtomicLong 对象和 Java 中的 java.util.concurrent.atomic.AtomicLong  对象类似。
  1. RAtomicLong atomicLong = redisson.getAtomicLong("test:long:001");
  2. atomicLong.set(3);
  3. atomicLong.incrementAndGet();
  4. atomicLong.get();
复制代码

五、原子双精度浮点(AtomicDouble)

Redisson 还提供了分布式原子双精度浮点 RAtomicDouble,弥补了 Java 自身的不足。
  1. RAtomicDouble atomicDouble = redisson.getAtomicDouble("test:atomicDouble:001");
  2. atomicDouble.set(2.81);
  3. atomicDouble.addAndGet(4.11);
  4. atomicDouble.get();
复制代码

六、话题(订阅分发)(RTopic)

Redisson 的分布式话题 (RTopic)、反射式(Reactive)和 RxJava2 标准的接口。
  1. RTopic topic = redisson.getTopic("test:topic:001");
  2. topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {
  3.     @Override
  4.     public void onMessage(String channel, SomeObject message) {
  5.         //...
  6.     }
  7. });
  8. // 在其他线程或JVM节点
  9. RTopic topic = redisson.getTopic("test:topic:001");
  10. long clientsReceivedMessage = topic.publish(new SomeObject());
复制代码
在 Redis 节点故障转移(主从切换)或断线重连以后,全部的话题监听器将自动完成话题的重新订阅。

七、整长型累加器(LongAdder)

基于 Redis 的 Redisson 分布式整长型累加器(LongAdder)采用了与 java.util.concurrent.atomic.LongAdder 类似的接口。通过利用客户端内置的 LongAdder 对象,为分布式环境下递增和递减操作提供了很高得性能。据统计其性能最高比分布式 AtomicLong 对象快 12000 倍。完美适用于分布式统计计量场景。
  1. RLongAdder atomicLong = redisson.getLongAdder("test:atomicLong:001");
  2. atomicLong.add(12);
  3. atomicLong.increment();
  4. atomicLong.decrement();
  5. atomicLong.sum();
复制代码
当不再使用整长型累加器对象的时候应该自行手动销毁,假如 Redisson 对象被关闭(shutdown)了,则不消手动销毁。
  1. RLongAdder atomicLong = ...
  2. atomicLong.destroy();
复制代码
八、分布式锁(RLock)

  1.     /**
  2.      * 一直尝试获取锁,直到获取到为止
  3.      *
  4.      * @param lockName    锁名称
  5.      * @param leaseTime   使用时间
  6.      * @param maxAttempts 最大尝试次数
  7.      */
  8.     public void tryLock(String lockName, long leaseTime, int maxAttempts) {
  9.         RLock lock = getLock(lockName);
  10.         int attempt = 0;
  11.         try {
  12.             while (attempt < maxAttempts) {
  13.                 boolean isLocked = lock.tryLock(1, leaseTime, TimeUnit.SECONDS);
  14.                 if (isLocked) {
  15.                     log.info("tryLock, isLocked=true, lockName={}, leaseTime={}", lockName, leaseTime);
  16.                     return;
  17.                 }
  18.                 Thread.sleep(20);
  19.                 attempt++;
  20.             }
  21.             log.warn("tryLock, failed to acquire lock after {} attempts, lockName={}, leaseTime={}", maxAttempts, lockName, leaseTime);
  22.         } catch (InterruptedException e) {
  23.             Thread.currentThread().interrupt(); // 重新设置中断状态
  24.             log.warn("tryLock interrupted, lockName={}, leaseTime={}", lockName, leaseTime);
  25.         }
  26.     }
复制代码
测试
  1.     @Test
  2.     public void test_tryLock() throws InterruptedException {
  3.         ExecutorService executorService = Executors.newFixedThreadPool(10);
  4.         // 用于等待所有任务完成
  5.         int count = 10;
  6.         final CountDownLatch latch = new CountDownLatch(count);
  7.         for (int i = 0; i < count; i++) {
  8.             executorService.execute(() -> {
  9.                 String lockName = "test:lock:002";
  10.                 try {
  11.                     // 添加最大尝试次数
  12.                     primaryRedissonHelper.tryLock(lockName, 10, 10000);
  13.                     RLock rLock = primaryRedissonHelper.getLock(lockName);
  14.                     Assertions.assertTrue(rLock != null && rLock.isLocked());
  15.                     Thread.sleep(3000L);
  16.                 } catch (InterruptedException e) {
  17.                     log.warn("test_tryLock {}", ExceptionUtils.getStackTrace(e));
  18.                 } finally {
  19.                     // 无论是否成功,都减少计数
  20.                     latch.countDown();
  21.                     primaryRedissonHelper.unLock(lockName);
  22.                 }
  23.             });
  24.         }
  25.         // 等待所有任务完成
  26.         latch.await();
  27.         executorService.shutdown();
  28.         // 等待线程池完全关闭
  29.         log.info("executorService.awaitTermination {}", executorService.awaitTermination(1, TimeUnit.MINUTES));
  30.     }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/) Powered by Discuz! X3.4