Redis Sorted Set 跳表的实现原理和分析

打印 上一主题 下一主题

主题 1050|帖子 1050|积分 3150

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
跳表(Skip List)是一种随机化的数据布局,基于有序链表,通过在链表上增长多级索引来提高数据的查找服从。它是由 William Pugh 在 1990 年提出的。
  为什么 Redis 中的 Sorted Set 使用跳跃表

Redis 的有序聚集(Sorted Set)使用跳跃表(Skip List)作为底层实现,主要是因为跳跃表在性能、实现复杂度和灵活性等方面具有显著优势,可以替代平衡树的数据布局。
跳跃表的原理

跳跃表是一种扩展的有序链表,通过维护多个层级的索引来提高查找服从。每个节点包罗一个数据元素和一组指向其他节点的指针,这些指针分布在差别的层级。最底层的链表包罗全部元素,而每一层的节点数目逐渐淘汰。如许,查找操作可以从高层开始,快速跳过不需要的元素,淘汰遍历的节点数,从而提高查找服从。


  • 查找过程:从最高层的头节点开始,沿着索引节点向右查找,如果当前节点的下一个节点的值大于或等于查找的目标值,则向下移动到下一层继续查找;否则,向右移动到下一个索引节点。这个过程一直持续到找到目标节点或到达最底层链表。
  • 插入和删除操作:跳跃表支持高效的动态插入和删除,时间复杂度均为 O(log n)。插入时,首先找到插入位置,然后根据随机函数决定新节点的层数,最后在相应的层中插入节点。
跳跃表的优势


  • 简单性:跳跃表的实现相对简单,易于理解和维护,而平衡树(如红黑树)的实现较为复杂,轻易出错。
  • 高效的范围查询:跳跃表在进行范围查询时服从更高,因为它是有序的链表,可以直接遍历后继节点,而平衡树需要中序遍历,复杂度更高。
  • 灵活性:跳跃表的层数可以根据需要动态调解,顺应差别的查询需求。
  • 高并发性能:跳跃表的节点可以支持多个线程同时插入或删除节点,而平衡树和哈希表通常需要复杂的并发控制。
  • 空间服从:跳跃表的空间复杂度为 O(n),而且通过调解节点的抽取隔断,可以灵活平衡空间和时间复杂度。
正是因为有这些优势,Redis 选择使用跳跃表来实现有序聚集,而不是红黑树或其他数据布局。这使得 Redis 在处置惩罚有序聚集时能够高效地支持插入、删除和查找操作,同时保持元素的有序性,非常适合实现如排行榜、范围查询等功能。
为了讲明白跳表的原理,现在我们来模拟一个简单的跳表实现。
在 Java 中模拟实现 Redis 的 Sorted Set 跳表,我们需要定义跳表的数据布局,包括节点和跳表本身。以下是一个简单的实现:
  1. import java.util.Random;
  2. public class SkipList {
  3.     private static final double P = 0.5; // 节点晋升的概率
  4.     private static final int MAX_LEVEL = 16; // 最大层数
  5.     private int levelCount; // 当前跳表的层数
  6.     private Node head; // 头节点
  7.     private int size; // 跳表中元素的数量
  8.     private Random random; // 随机数生成器
  9.     public SkipList() {
  10.         this.levelCount = 0;
  11.         this.size = 0;
  12.         this.head = new Node(-1, Integer.MIN_VALUE, MAX_LEVEL);
  13.         this.random = new Random();
  14.     }
  15.     // 节点定义
  16.     private class Node {
  17.         int value;
  18.         int key;
  19.         Node[] forward; // 向前指针数组
  20.         Node(int value, int key, int level) {
  21.             this.value = value;
  22.             this.key = key;
  23.             this.forward = new Node[level + 1];
  24.         }
  25.     }
  26.     // 随机生成节点的层数
  27.     private int randomLevel() {
  28.         int level = 0;
  29.         while (random.nextFloat() < P && level < MAX_LEVEL) {
  30.             level++;
  31.         }
  32.         return level;
  33.     }
  34.     // 搜索指定值的节点
  35.     public Node search(int key) {
  36.         Node current = head;
  37.         for (int i = levelCount - 1; i >= 0; i--) {
  38.             while (current.forward[i] != null && current.forward[i].key < key) {
  39.                 current = current.forward[i]; // 沿着当前层的指针移动
  40.             }
  41.         }
  42.         current = current.forward[0]; // 移动到第0层
  43.         return current;
  44.     }
  45.     // 插入节点
  46.     public void insert(int key, int value) {
  47.         Node[] update = new Node[MAX_LEVEL + 1];
  48.         Node current = head;
  49.         // 查找插入位置
  50.         for (int i = levelCount - 1; i >= 0; i--) {
  51.             while (current.forward[i] != null && current.forward[i].key < key) {
  52.                 current = current.forward[i];
  53.             }
  54.             update[i] = current;
  55.         }
  56.         int level = randomLevel(); // 随机生成层数
  57.         if (level > levelCount) {
  58.             levelCount = level;
  59.             for (int i = levelCount - 1; i >= 0; i--) {
  60.                 update[i] = head;
  61.             }
  62.         }
  63.         current = new Node(value, key, level);
  64.         for (int i = 0; i < level; i++) {
  65.             current.forward[i] = update[i].forward[i];
  66.             update[i].forward[i] = current;
  67.         }
  68.         size++;
  69.     }
  70.     // 删除节点
  71.     public void delete(int key) {
  72.         Node[] update = new Node[MAX_LEVEL + 1];
  73.         Node current = head;
  74.         // 查找要删除的节点
  75.         for (int i = levelCount - 1; i >= 0; i--) {
  76.             while (current.forward[i] != null && current.forward[i].key < key) {
  77.                 current = current.forward[i];
  78.             }
  79.             update[i] = current;
  80.         }
  81.         current = current.forward[0];
  82.         if (current != null && current.key == key) {
  83.             for (int i = 0; i < levelCount; i++) {
  84.                 if (update[i].forward[i] != current) {
  85.                     break;
  86.                 }
  87.                 update[i].forward[i] = current.forward[i];
  88.             }
  89.             size--;
  90.             while (levelCount > 0 && head.forward[levelCount - 1] == null) {
  91.                 levelCount--;
  92.             }
  93.         }
  94.     }
  95.     // 打印跳表
  96.     public void printList() {
  97.         Node current = head.forward[0];
  98.         while (current != null) {
  99.             System.out.println(current.key + ":" + current.value);
  100.             current = current.forward[0];
  101.         }
  102.     }
  103.     // 主函数测试跳表
  104.     public static void main(String[] args) {
  105.         SkipList list = new SkipList();
  106.         list.insert(3, 100);
  107.         list.insert(6, 300);
  108.         list.insert(4, 400);
  109.         list.insert(7, 700);
  110.         list.insert(5, 500);
  111.         list.printList();
  112.         list.delete(3);
  113.         list.printList();
  114.     }
  115. }
复制代码
下面,V 哥来表明一下:

  • 跳表节点:每个节点包罗一个值、一个键和一个向前指针数组。向前指针数组存储了指向同一层下一个节点的引用。
  • 随机层数:每个节点的层数是根据预设的概率 P 随机天生的,如许可以模拟出差别高度的索引层。
  • 搜索操作:从最高层开始,沿着当前层的指针移动,直到找到插入点或到达底层。
  • 插入操作:首先找到插入位置,然后根据随机天生的层数创建新节点,并更新每一层的指针。
  • 删除操作:找到要删除的节点,然后逐层更新指针,最后淘汰跳表的层数。
  • 打印跳表:从底层的头节点开始,遍历打印每个节点的键和值。
从这个简化版的跳表实现可以看到跳表的基本操作。可以帮助我们理解跳表实现的原理。
如安在 Java 中实现跳表的并发操作?

在 Java 中实现跳表的并发操作需要思量线程安全问题。可以通过以下方法来实现:

  • 使用同步块:在每个公共方法中使用 synchronized 关键字来确保同一时间只有一个线程可以执行方法。
  • 使用锁:使用 ReentrantLock 或其他并发锁来控制对跳表的并发访问。
  • 使用原子引用:使用 AtomicReference 或 AtomicReferenceArray 来确保节点的原子更新。
  • 使用并发聚集:使用 ConcurrentHashMap 等并发聚集作为辅助工具来实现线程安全的跳表。
以下是一个使用 synchronized 关键字实现线程安全的跳表的示例:
  1. import java.util.Random;
  2. public class ConcurrentSkipList {
  3.     private static final double P = 0.5; // 节点晋升的概率
  4.     private static final int MAX_LEVEL = 16; // 最大层数
  5.     private int levelCount; // 当前跳表的层数
  6.     private Node head; // 头节点
  7.     private int size; // 跳表中元素的数量
  8.     private Random random; // 随机数生成器
  9.     public ConcurrentSkipList() {
  10.         this.levelCount = 0;
  11.         this.size = 0;
  12.         this.head = new Node(-1, Integer.MIN_VALUE, MAX_LEVEL);
  13.         this.random = new Random();
  14.     }
  15.     private class Node {
  16.         int value;
  17.         int key;
  18.         Node[] forward; // 向前指针数组
  19.         Node(int value, int key, int level) {
  20.             this.value = value;
  21.             this.key = key;
  22.             this.forward = new Node[level + 1];
  23.         }
  24.     }
  25.     private int randomLevel() {
  26.         int level = 0;
  27.         while (random.nextFloat() < P && level < MAX_LEVEL) {
  28.             level++;
  29.         }
  30.         return level;
  31.     }
  32.     public synchronized Node search(int key) {
  33.         Node current = head;
  34.         for (int i = levelCount - 1; i >= 0; i--) {
  35.             while (current.forward[i] != null && current.forward[i].key < key) {
  36.                 current = current.forward[i];
  37.             }
  38.         }
  39.         current = current.forward[0];
  40.         return current;
  41.     }
  42.     public synchronized void insert(int key, int value) {
  43.         Node[] update = new Node[MAX_LEVEL + 1];
  44.         Node current = head;
  45.         for (int i = levelCount - 1; i >= 0; i--) {
  46.             while (current.forward[i] != null && current.forward[i].key < key) {
  47.                 current = current.forward[i];
  48.             }
  49.             update[i] = current;
  50.         }
  51.         int level = randomLevel();
  52.         if (level > levelCount) {
  53.             for (int i = levelCount; i < level; i++) {
  54.                 update[i] = head;
  55.             }
  56.             levelCount = level;
  57.         }
  58.         current = new Node(value, key, level);
  59.         for (int i = 0; i < level; i++) {
  60.             current.forward[i] = update[i].forward[i];
  61.             update[i].forward[i] = current;
  62.         }
  63.         size++;
  64.     }
  65.     public synchronized void delete(int key) {
  66.         Node[] update = new Node[MAX_LEVEL + 1];
  67.         Node current = head;
  68.         for (int i = levelCount - 1; i >= 0; i--) {
  69.             while (current.forward[i] != null && current.forward[i].key < key) {
  70.                 current = current.forward[i];
  71.             }
  72.             update[i] = current;
  73.         }
  74.         current = current.forward[0];
  75.         if (current != null && current.key == key) {
  76.             for (int i = 0; i < levelCount; i++) {
  77.                 if (update[i].forward[i] != current) {
  78.                     break;
  79.                 }
  80.                 update[i].forward[i] = current.forward[i];
  81.             }
  82.             size--;
  83.             while (levelCount > 0 && head.forward[levelCount - 1] == null) {
  84.                 levelCount--;
  85.             }
  86.         }
  87.     }
  88.     public synchronized void printList() {
  89.         Node current = head.forward[0];
  90.         while (current != null) {
  91.             System.out.println(current.key + ":" + current.value);
  92.             current = current.forward[0];
  93.         }
  94.     }
  95.     public static void main(String[] args) {
  96.         ConcurrentSkipList list = new ConcurrentSkipList();
  97.         list.insert(3, 100);
  98.         list.insert(6, 300);
  99.         list.insert(4, 400);
  100.         list.insert(7, 700);
  101.         list.insert(5, 500);
  102.         list.printList();
  103.         list.delete(3);
  104.         list.printList();
  105.     }
  106. }
复制代码
在这个示例中,我们使用了 synchronized 关键字来确保 search、insert 和 delete 方法是线程安全的。这会锁定当前对象,确保同一时间只有一个线程可以执行这些方法。
请注意,虽然 synchronized 可以提供线程安全,但它也可能导致性能瓶颈,特殊是在高并发环境下。在现实应用中,可以思量使用更细粒度的锁,如 ReentrantLock,大概使用原子引用和其他并发工具来提高性能。
使用 ReentrantLock 的实现方式

使用 ReentrantLock 实现跳表的并发操作可以提供比 synchronized 更细粒度的锁定,从而提高并发性能。ReentrantLock 允许您在差别的方法中锁定和解锁,乃至可以在差别的类中使用同一个锁对象。
以下是使用 ReentrantLock 实现线程安全的跳表的示例:
  1. import java.util.Random;
  2. import java.util.concurrent.locks.Lock;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. public class ConcurrentSkipList {
  5.     private static final double P = 0.5; // 节点晋升的概率
  6.     private static final int MAX_LEVEL = 16; // 最大层数
  7.     private final Lock lock = new ReentrantLock(); // 创建一个 ReentrantLock 对象
  8.     private int levelCount; // 当前跳表的层数
  9.     private Node head; // 头节点
  10.     private int size; // 跳表中元素的数量
  11.     private Random random; // 随机数生成器
  12.     public ConcurrentSkipList() {
  13.         this.levelCount = 0;
  14.         this.size = 0;
  15.         this.head = new Node(Integer.MIN_VALUE, MAX_LEVEL);
  16.         this.random = new Random();
  17.     }
  18.     private class Node {
  19.         int value;
  20.         int key;
  21.         Node[] forward; // 向前指针数组
  22.         Node(int key, int level) {
  23.             this.key = key;
  24.             this.forward = new Node[level + 1];
  25.         }
  26.     }
  27.     private int randomLevel() {
  28.         int level = 0;
  29.         while (random.nextFloat() < P && level < MAX_LEVEL) {
  30.             level++;
  31.         }
  32.         return level;
  33.     }
  34.     public void search(int key) {
  35.         lock.lock(); // 加锁
  36.         try {
  37.             Node current = head;
  38.             for (int i = levelCount - 1; i >= 0; i--) {
  39.                 while (current.forward[i] != null && current.forward[i].key < key) {
  40.                     current = current.forward[i];
  41.                 }
  42.             }
  43.             current = current.forward[0];
  44.             // ... 处理找到的节点
  45.         } finally {
  46.             lock.unlock(); // 释放锁
  47.         }
  48.     }
  49.     public void insert(int key, int value) {
  50.         lock.lock(); // 加锁
  51.         try {
  52.             Node[] update = new Node[MAX_LEVEL + 1];
  53.             Node current = head;
  54.             for (int i = levelCount - 1; i >= 0; i--) {
  55.                 while (current.forward[i] != null && current.forward[i].key < key) {
  56.                     current = current.forward[i];
  57.                 }
  58.                 update[i] = current;
  59.             }
  60.             int level = randomLevel();
  61.             if (level > levelCount) {
  62.                 levelCount = level;
  63.             }
  64.             current = new Node(value, key, level);
  65.             for (int i = 0; i < level; i++) {
  66.                 current.forward[i] = update[i].forward[i];
  67.                 update[i].forward[i] = current;
  68.             }
  69.             size++;
  70.         } finally {
  71.             lock.unlock(); // 释放锁
  72.         }
  73.     }
  74.     public void delete(int key) {
  75.         lock.lock(); // 加锁
  76.         try {
  77.             Node[] update = new Node[MAX_LEVEL + 1];
  78.             Node current = head;
  79.             for (int i = levelCount - 1; i >= 1; i--) {
  80.                 while (current.forward[i] != null && current.forward[i].key < key) {
  81.                     current = current.forward[i];
  82.                 }
  83.                 update[i] = current;
  84.             }
  85.             current = current.forward[0];
  86.             if (current != null && current.key == key) {
  87.                 for (int i = 0; i < levelCount; i++) {
  88.                     if (update[i].forward[i] != current) {
  89.                         break;
  90.                     }
  91.                     update[i].forward[i] = current.forward[i];
  92.                 }
  93.                 size--;
  94.                 while (levelCount > 0 && head.forward[levelCount - 1] == null) {
  95.                     levelCount--;
  96.                 }
  97.             }
  98.         } finally {
  99.             lock.unlock(); // 释放锁
  100.         }
  101.     }
  102.     public void printList() {
  103.         lock.lock(); // 加锁
  104.         try {
  105.             Node current = head.forward[0];
  106.             while (current != null) {
  107.                 System.out.println(current.key + ":" + current.value);
  108.                 current = current.forward[0];
  109.             }
  110.         } finally {
  111.             lock.unlock(); // 释放锁
  112.         }
  113.     }
  114.     public static void main(String[] args) {
  115.         ConcurrentSkipList list = new ConcurrentSkipList();
  116.         list.insert(3, 100);
  117.         list.insert(6, 300);
  118.         list.insert(4, 400);
  119.         list.insert(7, 700);
  120.         list.insert(5, 500);
  121.         list.printList();
  122.         list.delete(3);
  123.         list.printList();
  124.     }
  125. }
复制代码
在这个示例中,我们使用了 ReentrantLock 对象来控制对跳表的并发访问。每个公共方法在执行之前都会调用 lock.lock() 加锁,并在执行完毕后(包括正常执行和异常退出)调用 lock.unlock() 释放锁。
使用 ReentrantLock 的利益是它提供了比 synchronized 更灵活的锁定机制,例如:

  • 可制止的锁定:ReentrantLock 允许线程在尝试获取锁时被制止。
  • 尝试非阻塞锁定:ReentrantLock 提供了 tryLock() 方法,允许线程尝试获取锁而不无穷等待。
  • 超时获取锁:ReentrantLock 还提供了 tryLock(long timeout, TimeUnit unit) 方法,允许线程在指定时间内等待锁。
  • 公平锁:ReentrantLock 可以选择是否使用公平锁,公平锁会按照线程哀求锁的顺序来分配锁。
  • 多个条件变量:ReentrantLock 可以与多个 Condition 对象配合使用,而 synchronized 只能与一个条件变量配合使用。
理解了以上代码实现原理后,我们再来理解 Redis Sorted Set 就不难了。
Redis 的 Sorted Set 是一种包罗元素和关联分数的数据布局,它能够根据分数对元素进行排序。Sorted Set 在 Redis 中的内部实现可以是跳跃表(Skip List)和字典(Hash Table)的组合,大概是压缩列表(Zip List),具体使用哪种实现取决于 Sorted Set 的巨细和元素的长度。
跳跃表 + 字典实现

当 Sorted Set 的元素数目较多大概元素长度较长时,Redis 使用跳跃表和字典来实现 Sorted Set。跳跃表是一种概率平衡的数据布局,它通过多级索引来提高搜索服从,类似于二分查找。字典则用于快速查找和更新操作。
跳跃表的每个节点包罗以下信息:


  • 元素(member)
  • 分数(score)
  • 后退指针(backward)
  • 多层前进指针(forward),每一层都是一个有序链表
字典则存储了元素到分数的映射,以便快速访问。
压缩列表实现

当 Sorted Set 的元素数目较少(默认小于128个),而且全部元素的长度都小于64字节时,Redis 使用压缩列表来存储 Sorted Set。压缩列表是一种连续内存块的顺序存储布局,它将全部的元素和分数紧凑地存储在一起,以节省内存空间。
应用场景

Sorted Set 常用于以下场景:

  • 排行榜:例如游戏中的玩家分数排名。
  • 范围查询:例如获取一定分数范围内的用户。
  • 任务调度:例如按照任务的优先级执行。
  • 实时排名:例如股票价格的实时排名。
代码分析

在 Redis 源码中,Sorted Set 的实现主要在 t_zset.c 文件中。插入操作(zaddCommand)会根据 Sorted Set 的编码类型(跳跃表或压缩列表)来执行差别的逻辑。如果是跳跃表编码,那么插入操作会涉及到字典的更新和跳跃表节点的添加。如果是压缩列表编码,则会检查是否需要转换为跳跃表编码。
总结

Sorted Set 是 Redis 提供的一种强盛的有序数据布局,它结合了跳跃表和字典的优点,提供了高效的插入、删除、更新和范围查询操作。通过合理的使用 Sorted Set,可以有用地办理许多现实问题。如何以上内容对你有帮助,恳请点赞转发,V 哥在此感谢各位兄弟的支持。88,洗洗睡了。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

北冰洋以北

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表