本文共 16887 字,大约阅读时间需要 56 分钟。
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。与Kafka、RabbitMQ用于服务间的消息队列不同,disruptor一般用于线程间消息的传递。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。其他关于disruptor的背景就不在此多言,可以自己google。
disruptor是用于一个JVM中多个线程之间的消息队列,作用与ArrayBlockingQueue有相似之处,但是disruptor从功能、性能都远好于ArrayBlockingQueue,当多个线程之间传递大量数据或对性能要求较高时,可以考虑使用disruptor作为ArrayBlockingQueue的替代者。
官方也对disruptor和ArrayBlockingQueue的性能在不同的应用场景下做了对比,本文列出其中一组数据,数据中P代表producer,C代表consumer,ABS代表ArrayBlockingQueue,目测性能只有有5~10倍左右的提升。完整的官方性能测试数据在可以看到,性能测试的代码已经包含在disruptor的代码中,有兴趣的可以直接过去看看。
Sequence是Disruptor最核心的组件,上面已经提到过了。生产者对RingBuffer的互斥访问,生产者与消费者之间的协调以及消费者之间的协调,都是通过Sequence实现。几乎每一个重要的组件都包含Sequence。那么Sequence是什么呢?首先Sequence是一个递增的序号,说白了就是计数器;其次,由于需要在线程间共享,所以Sequence是引用传递,并且是线程安全的;再次,Sequence支持CAS操作;最后,为了提高效率,Sequence通过padding来避免伪共享。
RingBuffer是存储消息的地方,通过一个名为cursor的Sequence对象指示队列的头,协调多个生产者向RingBuffer中添加消息,并用于在消费者端判断RingBuffer是否为空。巧妙的是,表示队列尾的Sequence并没有在RingBuffer中,而是由消费者维护。这样的好处是多个消费者处理消息的方式更加灵活,可以在一个RingBuffer上实现消息的单播,多播,流水线以及它们的组合。其缺点是在生产者端判断RingBuffer是否已满是需要跟踪更多的信息,为此,在RingBuffer中维护了一个名为gatingSequences的Sequence数组来跟踪相关Seqence。
SequenceBarrier用来在消费者之间以及消费者和RingBuffer之间建立依赖关系。在Disruptor中,依赖关系实际上指的是Sequence的大小关系,消费者A依赖于消费者B指的是消费者A的Sequence一定要小于等于消费者B的Sequence,这种大小关系决定了处理某个消息的先后顺序。因为所有消费者都依赖于RingBuffer,所以消费者的Sequence一定小于等于RingBuffer中名为cursor的Sequence,即消息一定是先被生产者放到Ringbuffer中,然后才能被消费者处理。
SequenceBarrier在初始化的时候会收集需要依赖的组件的Sequence,RingBuffer的cursor会被自动的加入其中。需要依赖其他消费者和/或RingBuffer的消费者在消费下一个消息时,会先等待在SequenceBarrier上,直到所有被依赖的消费者和RingBuffer的Sequence大于等于这个消费者的Sequence。当被依赖的消费者或RingBuffer的Sequence有变化时,会通知SequenceBarrier唤醒等待在它上面的消费者。
当消费者等待在SequenceBarrier上时,有许多可选的等待策略,不同的等待策略在延迟和CPU资源的占用上有所不同,可以视应用场景选择:
首先是CPU false sharing的解决,Disruptor通过将基本对象填充冗余基本类型变量来填充满整个缓存行,减少false sharing的概率,这部分没怎么看懂,Disruptor通过填充失效这个效果。
无锁队列的实现,对于传统并发队列,至少要维护两个指针,一个头指针和一个尾指针。在并发访问修改时,头指针和尾指针的维护不可避免的应用了锁。Disruptor由于是环状队列,对于Producer而言只有头指针而且锁是乐观锁,在标准Disruptor应用中,只有一个生产者,避免了头指针锁的争用。所以我们可以理解Disruptor为无锁队列。
每个RingBuffer是一个环状队列,队列中每个元素可以理解为一个槽。在初始化时,RingBuffer规定了总大小,就是这个环最多可以容纳多少槽。这里Disruptor规定了,RingBuffer大小必须是2的n次方。这里用了一个小技巧,就是将取模转变为取与运算。在内存管理中,我们常用的就是取余定位操作。如果我们想在Ringbuffer定位,一般会用到某个数字对Ringbuffer的大小取余。如果是对2的n次方取余,则可以简化成m % 2^n = m & ( 2^n - 1 )
Producer会向这个RingBuffer中填充元素,填充元素的流程是首先从RingBuffer读取下一个Sequence,之后在这个Sequence位置的槽填充数据,之后发布。
Consumer消费RingBuffer中的数据,通过SequenceBarrier来协调不同的Consumer的消费先后顺序,以及获取下一个消费位置Sequence。
Producer在RingBuffer写满时,会从头开始继续写替换掉以前的数据。但是如果有SequenceBarrier指向下一个位置,则不会覆盖这个位置,阻塞到这个位置被消费完成。Consumer同理,在所有Barrier被消费完之后,会阻塞到有新的数据进来。
如何使用 Disruptor ,Disruptor 的 API 十分简单,主要有以下几个步骤:
1、定义事件:事件(Event)就是通过 Disruptor 进行交换的数据类型。
2、定义事件工厂:事件工厂(Event Factory)定义了如何实例化前面第1步中定义的事件(Event),需要实现接口 com.lmax.disruptor.EventFactory<T>。Disruptor 通过 EventFactory 在 RingBuffer 中预创建 Event 的实例。一个 Event 实例实际上被用作一个“数据槽”,发布者发布前,先从 RingBuffer 获得一个 Event 的实例,然后往 Event 实例中填充数据,之后再发布到 RingBuffer 中,之后由 Consumer 获得该 Event 实例并从中读取数据。
3、定义事件处理的具体实现:通过实现接口 com.lmax.disruptor.EventHandler<T> 定义事件处理的具体实现。
4、定义用于事件处理的线程池:Disruptor 通过 java.util.concurrent.ExecutorService 提供的线程来触发 Consumer 的事件处理。
5、指定等待策略:Disruptor 定义了 com.lmax.disruptor.WaitStrategy 接口用于抽象 Consumer 如何等待新事件,这是策略模式的应用。
Disruptor 提供了多个 WaitStrategy 的实现,每种策略都具有不同性能和优缺点,根据实际运行环境的 CPU 的硬件特点选择恰当的策略,并配合特定的 JVM 的配置参数,能够实现不同的性能提升。例如,BlockingWaitStrategy、SleepingWaitStrategy、YieldingWaitStrategy 等,BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现;SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景;YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。6、启动 Disruptor。
7、发布事件:Disruptor 的事件发布过程是一个两阶段提交的过程:
第一步:先从 RingBuffer 获取下一个可以写入的事件的序号; 第二步:获取对应的事件对象,将数据写入事件对象; 第三部:将事件提交到 RingBuffer; 事件只有在提交之后才会通知 EventProcessor 进行处理8、关闭 Disruptor。
// step_1 定义事件public class LongEvent{ private long value; public void set(long value) { this.value = value; }}// step_2 定义事件工厂import com.lmax.disruptor.EventFactory;public class LongEventFactory implements EventFactory{ public LongEvent newInstance() { return new LongEvent(); }}// step_3 定义事件处理的具体实现import com.lmax.disruptor.EventHandler;public class LongEventHandler implements EventHandler { public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println("Event: " + event); }}// step_4 定义用于事件处理的线程池ExecutorService executor = Executors.newCachedThreadPool();// 指定等待策略WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();// step_5 启动 DisruptorEventFactory eventFactory = new LongEventFactory();ExecutorService executor = Executors.newSingleThreadExecutor();int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必须是 2 的 N 次方; // step_6 发布事件Disruptor disruptor = new Disruptor (eventFactory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());EventHandler eventHandler = new LongEventHandler();disruptor.handleEventsWith(eventHandler);disruptor.start();// step_7 发布事件;RingBuffer ringBuffer = disruptor.getRingBuffer();long sequence = ringBuffer.next();//请求下一个事件序号; try { LongEvent event = ringBuffer.get(sequence);//获取该序号对应的事件对象; long data = getEventData();//获取要通过事件传递的业务数据; event.set(data);} finally{ ringBuffer.publish(sequence);//发布事件;}// step_8 Disruptor 还提供另外一种形式的调用来简化以上操作,并确保 publish 总是得到调用。static class Translator implements EventTranslatorOneArg { @Override public void translateTo(LongEvent event, long sequence, Long data) { event.set(data); } } public static Translator TRANSLATOR = new Translator(); public static void publishEvent2(Disruptor disruptor) { // 发布事件; RingBuffer ringBuffer = disruptor.getRingBuffer(); long data = getEventData();//获取要通过事件传递的业务数据; ringBuffer.publishEvent(TRANSLATOR, data);}// step_9 关闭 Disruptordisruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;executor.shutdown();//关闭 disruptor 使用的线程池;如果需要的话,必须手动关闭, disruptor 在 shutdown 时不会自动关闭;
public class Disruptor{ //Disruptor核心变量RingBuffer用于存储变量 private final RingBuffer ringBuffer; // Disruptor用于运行consumer的ExecutorService对象 private final Executor executor; // Disruptor保存的所有消费者信息 private final ConsumerRepository consumerRepository = new ConsumerRepository (); // 标记是否已经开始 private final AtomicBoolean started = new AtomicBoolean(false); // 标记异常处理的handler private ExceptionHandler exceptionHandler = new ExceptionHandlerWrapper (); @Deprecated public Disruptor(final EventFactory eventFactory, final int ringBufferSize, final Executor executor) { this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), executor); } @Deprecated public Disruptor( final EventFactory eventFactory, final int ringBufferSize, final Executor executor, final ProducerType producerType, final WaitStrategy waitStrategy) { this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor); } public Disruptor(final EventFactory eventFactory, final int ringBufferSize, final ThreadFactory threadFactory) { this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory)); } public Disruptor( final EventFactory eventFactory, final int ringBufferSize, final ThreadFactory threadFactory, final ProducerType producerType, final WaitStrategy waitStrategy) { this( RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), new BasicExecutor(threadFactory)); } private Disruptor(final RingBuffer ringBuffer, final Executor executor) { this.ringBuffer = ringBuffer; this.executor = executor; }}
abstract class RingBufferPad{ protected long p1, p2, p3, p4, p5, p6, p7;}abstract class RingBufferFieldsextends RingBufferPad{ //用于填充的对象引用,为什么填充不知道? private static final int BUFFER_PAD; //entry存储位置相对与array起始位置的偏移量,用于UNSAFE内存操作时进行寻址,注意这个偏移量加上了用于填充的BUFFER_PAD大小 private static final long REF_ARRAY_BASE; //对应对象引用占用内存大小,计算出来的相对位移数,比如对象引用大小是4byte,那么REF_ELEMENT_SHIFT=2,因为2的2次方=4; private static final int REF_ELEMENT_SHIFT; private static final Unsafe UNSAFE = Util.getUnsafe(); static { final int scale = UNSAFE.arrayIndexScale(Object[].class); if (4 == scale) { REF_ELEMENT_SHIFT = 2; } else if (8 == scale) { REF_ELEMENT_SHIFT = 3; } else { throw new IllegalStateException("Unknown pointer size"); } BUFFER_PAD = 128 / scale; REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT); } private final long indexMask; // 用于保存Entry的对象数组,也就是RingBuffer当中保存数据的数据结构 private final Object[] entries; // 环形数据库大小 protected final int bufferSize; // RingBuffer当中的sequencer,分为SingleProducerSequencer和MultiProducerSequencer两类 protected final Sequencer sequencer; RingBufferFields( EventFactory eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; // 2 * BUFFER_PAD代表头尾都需要增加BUFFER_PAD的空间,所以访问空间需要以数组的起始位置+BUFFER_PAD,当然需要转化为字节 this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; fill(eventFactory); } private void fill(EventFactory eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } } @SuppressWarnings("unchecked") protected final E elementAt(long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); }}public final class RingBuffer extends RingBufferFields implements Cursored, EventSequencer , EventSink { public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE; protected long p1, p2, p3, p4, p5, p6, p7; RingBuffer( EventFactory eventFactory, Sequencer sequencer) { super(eventFactory, sequencer); }}
单生产者的Sequenecer对象
public abstract class AbstractSequencer implements Sequencer{ private static final AtomicReferenceFieldUpdaterSEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences"); // RingBuffer的大小 protected final int bufferSize; // 等待策略 protected final WaitStrategy waitStrategy; // 当前RingBuffer对应的油表位置 protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); //各个消费者持有的取数sequence数组 protected volatile Sequence[] gatingSequences = new Sequence[0]; public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) { if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.bufferSize = bufferSize; this.waitStrategy = waitStrategy; }}abstract class SingleProducerSequencerPad extends AbstractSequencer{ protected long p1, p2, p3, p4, p5, p6, p7; public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); }}abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad{ public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); } protected long nextValue = Sequence.INITIAL_VALUE; protected long cachedValue = Sequence.INITIAL_VALUE;}// 适用于单生产者的场景,由于没有实现任何栅栏,使用多线程的生产者进行操作并不安全。public final class SingleProducerSequencer extends SingleProducerSequencerFields{ protected long p1, p2, p3, p4, p5, p6, p7; public SingleProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); }}
多生产者对象MultiProducerSequencer
public abstract class AbstractSequencer implements Sequencer{ private static final AtomicReferenceFieldUpdaterSEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences"); // RingBuffer的大小 protected final int bufferSize; // 等待策略 protected final WaitStrategy waitStrategy; // 当前RingBuffer对应的油表位置 protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); //各个消费者持有的取数sequence数组 protected volatile Sequence[] gatingSequences = new Sequence[0]; public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) { if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.bufferSize = bufferSize; this.waitStrategy = waitStrategy; }}public final class MultiProducerSequencer extends AbstractSequencer{ private static final Unsafe UNSAFE = Util.getUnsafe(); private static final long BASE = UNSAFE.arrayBaseOffset(int[].class); private static final long SCALE = UNSAFE.arrayIndexScale(int[].class); private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); private final int[] availableBuffer; private final int indexMask; private final int indexShift; public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); availableBuffer = new int[bufferSize]; indexMask = bufferSize - 1; indexShift = Util.log2(bufferSize); initialiseAvailableBuffer(); }
public class Sequence extends RhsPadding{ static final long INITIAL_VALUE = -1L; private static final Unsafe UNSAFE; private static final long VALUE_OFFSET; static { UNSAFE = Util.getUnsafe(); try { VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value")); } catch (final Exception e) { throw new RuntimeException(e); } } public Sequence() { this(INITIAL_VALUE); } public Sequence(final long initialValue) { UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue); } public long get() { return value; } public void set(final long value) { UNSAFE.putOrderedLong(this, VALUE_OFFSET, value); } public void setVolatile(final long value) { UNSAFE.putLongVolatile(this, VALUE_OFFSET, value); } public boolean compareAndSet(final long expectedValue, final long newValue) { return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue); } public long incrementAndGet() { return addAndGet(1L); } public long addAndGet(final long increment) { long currentValue; long newValue; do { currentValue = get(); newValue = currentValue + increment; } while (!compareAndSet(currentValue, newValue)); return newValue; } @Override public String toString() { return Long.toString(get()); }}
转载地址:http://xrzdo.baihongyu.com/