Disruptor-源码解读

打印 上一主题 下一主题

主题 877|帖子 877|积分 2631

前言

Disruptor的高性能,是多种技术结合以及本身架构的结果。本文主要讲源码,涉及到的相关知识点需要读者自行去了解,以下列出:

  • 锁和CAS
  • 伪共享和缓存行
  • volatile和内存屏障
原理

此节结合demo来看更容易理解:传送门
下图来自官方文档

官方原图有点乱,我翻译一下

在讲原理前,先了解 Disruptor 定义的术语

  • Event
    存放数据的单位,对应 demo 中的 LongEvent
  • Ring Buffer
    环形数据缓冲区:这是一个首尾相接的环,用于存放 Event ,用于生产者往其存入数据和消费者从其拉取数据
  • Sequence
    序列:用于跟踪进度(生产进度、消费进度)
  • Sequencer
    Disruptor的核心,用于在生产者和消费者之间传递数据,有单生产者和多生产者两种实现。
  • Sequence Barrier
    序列屏障,消费者之间的依赖关系就靠序列屏障实现
  • Wait Strategy
  • 等待策略,消费者等待生产者将发布的策略
  • Event Processor
    事件处理器,循环从 RingBuffer 获取 Event 并执行 EventHandler。
  • Event Handler
    事件处理程序,也就是消费者
  • Producer
    生产者
Ring Buffer

环形数据缓冲区(RingBuffer),逻辑上是首尾相接的环,在代码中用数组来表示Object[]。Disruptor生产者发布分两步

  • 步骤一:申请写入 n 个元素,如果可以写入,这返回最大序列号
  • 步骤二:根据序列号去 RingBuffer 中获取 Event,修改并发布
  1. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
  2. // 获取下一个可用位置的下标(步骤1)
  3. long sequence = ringBuffer.next();
  4. try {
  5.     // 返回可用位置的元素
  6.     LongEvent event = ringBuffer.get(sequence);
  7.     // 设置该位置元素的值
  8.     event.set(l);
  9. } finally {
  10.     // 发布
  11.     ringBuffer.publish(sequence);
  12. }
复制代码
这两个步骤由 Sequencer 完成,分为单生产者和多生产者实现
Sequencer

单生产者

如果申请 2 个元素,则如下图所示(圆表示 RingBuffer)
  1. // 一般不会有以下写法,这里为了讲解源码才使用next(2)
  2. // 向RingBuffer申请两个元素
  3. long sequence = ringBuffer.next(2);
  4. for (long i =  sequence-1; i <= sequence; i++) {
  5.     try {
  6.         // 返回可用位置的元素
  7.         LongEvent event = ringBuffer.get(i);
  8.         // 设置该位置元素的值
  9.         event.set(1);
  10.     } finally {
  11.         ringBuffer.publish(i);
  12.     }
  13. }
复制代码
要解释的都在注释里了,gatingSequences 是消费者队列末尾的序列,对应着就是下图中的 ApplicationConsumer 的 Sequence

多生产者

看完单生产者版,接下来看多生产者的实现。因为是多生产者,需要考虑并发的情况。
如果有A、B两个消费者都来申请 2 个元素

cursor 申请成功的序列,HPS 消费者最大可用序列,gatingSequence 表示能申请的最大序列号。红色表示待发布,绿色表示已发布。HPS 是我自己编的缩写,表示 getHighestPublishedSequence 方法的返回值
如图所示,只要申请成功,就移动 cursor 的位置。RingBuffer 并没有记录发布情况(图中的红绿颜色),这个发布情况由 MultiProducerSequencer的availableBuffer 来维护。
下面看代码
  1. abstract class SingleProducerSequencerPad extends AbstractSequencer
  2. {
  3.     protected long p1, p2, p3, p4, p5, p6, p7;
  4.     SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy)
  5.     {
  6.         super(bufferSize, waitStrategy);
  7.     }
  8. }
  9. abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad
  10. {
  11.     SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy)
  12.     {
  13.         super(bufferSize, waitStrategy);
  14.     }
  15.     long nextValue = Sequence.INITIAL_VALUE;
  16.     long cachedValue = Sequence.INITIAL_VALUE;
  17. }
  18. public final class SingleProducerSequencer extends SingleProducerSequencerFields
  19. {
  20.     protected long p1, p2, p3, p4, p5, p6, p7;
  21.     public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy)
  22.     {
  23.         super(bufferSize, waitStrategy);
  24.     }
  25. }
复制代码
MultiProducerSequencer和SingleProducerSequencer的 next()方法逻辑大致一样,只是多了CAS的步骤来保证并发的正确性。接着看发布方法
  1. // 调用路径
  2. // RingBuffer#next()
  3. // SingleProducerSequencer#next()
  4. public long next(int n)
  5. {
  6.     if (n < 1)
  7.     {
  8.         throw new IllegalArgumentException("n must be > 0");
  9.     }
  10.     long nextValue = this.nextValue;
  11.     //生产者当前序号值+期望获取的序号数量后达到的序号值
  12.     long nextSequence = nextValue + n;
  13.     //减掉RingBuffer的总的buffer值,用于判断是否出现‘覆盖’
  14.     long wrapPoint = nextSequence - bufferSize;
  15.     //从后面代码分析可得:cachedValue就是缓存的消费者中最小序号值,他不是当前最新的‘消费者中最小序号值’,而是上次程序进入到下面的if判定代码段时,被赋值的当时的‘消费者中最小序号值’
  16.     //这样做的好处在于:在判定是否出现覆盖的时候,不用每次都调用getMininumSequence计算‘消费者中的最小序号值’,从而节约开销。只要确保当生产者的节奏大于了缓存的cachedGateingSequence一个bufferSize时,从新获取一下 getMinimumSequence()即可。
  17.     long cachedGatingSequence = this.cachedValue;
  18.     //(wrapPoint > cachedGatingSequence) : 当生产者已经超过上一次缓存的‘消费者中最小序号值’(cachedGatingSequence)一个‘Ring’大小(bufferSize),需要重新获取cachedGatingSequence,避免当生产者一直在生产,但是消费者不再消费的情况下,出现‘覆盖’
  19.     //(cachedGatingSequence > nextValue) : https://github.com/LMAX-Exchange/disruptor/issues/76
  20.     // 这里判断就是生产者生产的填满BingBUffer,需要等待消费者消费
  21.     if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
  22.     {
  23.         cursor.setVolatile(nextValue);  // StoreLoad fence
  24.         //gatingSequences就是消费者队列末尾的序列,也就是消费者消费到哪里了
  25.         //实际上就是获得处理的队尾,如果队尾是current的话,说明所有的消费者都执行完成任务在等待新的事件了
  26.         long minSequence;
  27.         while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
  28.         {
  29.             // 等待1纳秒
  30.             LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
  31.         }
  32.         this.cachedValue = minSequence;
  33.     }
  34.     this.nextValue = nextSequence;
  35.     return nextSequence;
  36. }
  37. public void publish(long sequence)
  38. {
  39.     // 更新序列号
  40.     cursor.set(sequence);
  41.     // 等待策略的唤醒
  42.     waitStrategy.signalAllWhenBlocking();
  43. }
复制代码
记录发布情况,其实相当于 availableBuffer[sequence] = 圈数,前面说了,availableBuffer是用来标记元素是否可用的,如果消费者的圈数 ≠ availableBuffer中的圈数,则表示元素不可用
  1. public final class MultiProducerSequencer extends AbstractSequencer
  2. {
  3.     // 缓存的消费者中最小序号值,相当于SingleProducerSequencerFields的cachedValue
  4.     private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
  5.     // 标记元素是否可用
  6.     private final int[] availableBuffer;
  7.     public long next(int n)
  8.     {
  9.         if (n < 1)
  10.         {
  11.             throw new IllegalArgumentException("n must be > 0");
  12.         }
  13.         long current;
  14.         long next;
  15.         do
  16.         {
  17.             current = cursor.get();
  18.             next = current + n;
  19.             //减掉RingBuffer的总的buffer值,用于判断是否出现‘覆盖’
  20.             long wrapPoint = next - bufferSize;
  21.             //从后面代码分析可得:cachedValue就是缓存的消费者中最小序号值,他不是当前最新的‘消费者中最小序号值’,而是上次程序进入到下面的if判定代码段时,被赋值的当时的‘消费者中最小序号值’
  22.             //这样做的好处在于:在判定是否出现覆盖的时候,不用每次都调用getMininumSequence计算‘消费者中的最小序号值’,从而节约开销。只要确保当生产者的节奏大于了缓存的cachedGateingSequence一个bufferSize时,从新获取一下 getMinimumSequence()即可。
  23.             long cachedGatingSequence = gatingSequenceCache.get();
  24.             //(wrapPoint > cachedGatingSequence) : 当生产者已经超过上一次缓存的‘消费者中最小序号值’(cachedGatingSequence)一个‘Ring’大小(bufferSize),需要重新获取cachedGatingSequence,避免当生产者一直在生产,但是消费者不再消费的情况下,出现‘覆盖’
  25.             //(cachedGatingSequence > nextValue) : https://github.com/LMAX-Exchange/disruptor/issues/76
  26.             // 这里判断就是生产者生产的填满BingBUffer,需要等待消费者消费
  27.             if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
  28.             {
  29.                 long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
  30.                 if (wrapPoint > gatingSequence)
  31.                 {
  32.                     LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
  33.                     continue;
  34.                 }
  35.                 gatingSequenceCache.set(gatingSequence);
  36.             }
  37.             // 使用cas保证只有一个生产者能拿到next
  38.             else if (cursor.compareAndSet(current, next))
  39.             {
  40.                 break;
  41.             }
  42.         }
  43.         while (true);
  44.         return next;
  45.     }
  46. ......
  47. }
复制代码
isAvailable() 方法判断元素是否可用,此方法的调用堆栈看完消费者就清楚了。
消费者

本小节介绍两个方面,一是 Disruptor 的消费者如何实现依赖关系的,二是消费者如何拉取消息并消费
消费者的依赖关系实现


我们看回这张图,每个消费者前都有一个 SequenceBarrier ,这就是消费者之间能实现依赖的关键。每个消费者都有一个 Sequence,表示自身消费的进度,如图中,ApplicationConsumer 的 SequenceBarrier 就持有 ReplicaionConsumer  和 JournalConsumer 的 Sequence,这样就能控制 ApplicationConsumer 的消费进度不超过其依赖的消费者。
下面看源码,这是 disruptor 配置消费者的代码。
  1. public void publish(final long sequence)
  2. {
  3.     // 记录发布情况
  4.     setAvailable(sequence);
  5.     // 等待策略的唤醒
  6.     waitStrategy.signalAllWhenBlocking();
  7. }
  8. private void setAvailable(final long sequence)
  9. {
  10.     // calculateIndex(sequence):获取序号
  11.     // calculateAvailabilityFlag(sequence):RingBuffer的圈数
  12.     setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
  13. }
  14. private void setAvailableBufferValue(int index, int flag)
  15. {
  16.     long bufferAddress = (index * SCALE) + BASE;
  17.     UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
  18.     // 上面相当于 availableBuffer[index] = flag 的高性能版
  19. }
复制代码
先看ReplicaionConsumer  和 JournalConsumer 的配置 disruptor.handleEventsWith(journalConsumer, replicaionConsumer)
[code]/** 代码都在Disruptor类 **/public final EventHandlerGroup handleEventsWith(final EventHandler

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

雁过留声

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表