首页 体育世界正文

一、什么是 Disruptor

从功用上来看,Disruptor 是完结了“行列”的功用,并且是一个有界行列。那么它的运用场景天然便是“生产者-顾客”模型的运用场合了。

能够拿 JDK 的 BlockingQueue 做一个简略比照,以便更好地知道 Disruptor 是什么。

咱们知道 BlockingQueue 是一个 FIFO 行列,生产者(Producer)往行列里发布(publish)一项工作(或称之为“音讯”也能够)时,顾客(Consumer)能取得告诉;假如没有工作时,顾客被阻塞,直到生产者发布了新的工作。

这些都是 Di新乌龙院,Disruptor - 并发编程结构,光线传媒sruptor 能做到的,与之不同的是,Disruptor 能做更多:

  • 同一个“工作”能够有多个顾客,顾客之间既能够并行处理,也能够相互依靠构成处理的先后次第(构成一个依靠图);
  • 预分配用于存储工作内容的内存空间;
  • 针对极高的功能方针而完结的极度优化和无锁的规划;

以上的描绘尽管简略地指出了 Disruptor 是什么,但关于它“能做什么”还不是那么开门见山。一般性地来说,当你需求在两个独立的处理进程(两个线程)之间交流数据时,就能够运用 Disruptor 。当然运用行列(如上面说到的 BlockingQueue)也能够,只不过 Disruptor 做得更好。

拿行列来作比较的做法弱化了对 Disruptor 有多强大的知道,假如想要对此有更多的了解,能够细心看看 Disruptor 在其店主 LMAX 交易平台(也是完结者) 是怎么作为中心架构来运用的,这方面就不做胪陈了,问度娘或谷哥都能找到。

二、Disruptor 的中心概念

先从了解 Disruptor 的中心概念开端,来了解它是怎么运作的。下面介绍的概念模型,既是范畴目标,也是映射到代码完结上的中心目标。

  • Ring Buffer
  • 如其名,环形的缓冲区。从前 RingBuffer 是 Disruptor 中的最主要的目标,但从3.0版别开端,其责任被简化为仅仅担任对经过 Disruptor 进行交流的数据(工作)进行存储和更新。在一金在熙些更高档的运用场景中,Ring Buffer 能够由用户的自界说完结来彻底代替。
  • Sequence Disruptor
  • 经过次序递加的序号来编号办理经过其进行交流的数据(工作),对数据(工作)的处理进程总是沿着序号逐一递加处理。一个 Sequence 用于盯梢标识某个特定的工作处理者( RingBuffer/Consumer )的处理进展。尽管一个 AtomicLong 也能够用于标识进展,但界说 Sequence 来担任该问题还有另一个意图,那便是避免不同的 Sequence 之间的CPU缓存伪同享(Flase Sharing)问题。
  • (注:这是 Disruptor 完结高功能的要害点之一,网上关于伪同享问题的介绍现已浩如烟海,在此不再赘述)。
  • Sequencer
  • Sequencer 是 Disruptor 的真实中心。此接口有两个完结类 SingleProducerSequencer、MultiProducerSequencer ,它们界说在生产者和顾客之间快速、正确地传递数据的并发算法。
  • Sequence Barrier
  • 用于坚持对RingBuffer的 main published Sequence 和Consumer依靠的其它Consumer的 Sequence 的引证。 Sequence Barrier 还界说了决议 Consumer 是否日本男同志还有可处理的工作的逻辑。
  • Wait Strategy
  • 界说 Consumer 怎么进行等候下一个工作的战略。 (注:Disruptor 界说了多种不同的战略,针对不同的场景,供给了不相同的功能体现)
  • Event
  • 在 Disruptor 的语义中,生产者和顾客之间进行交流的数据被称为工作(Event)。它不是一个被 Disruptor 界说的特定类型,而是由 Disruptor 的运用者界说并指定。
  • EventProcessor
  • EventProcessor 持有特定顾客(Consumer)的 Sequence,并供给用于调用工作处理完结的工作循环(Event Loop)。
  • EventHandler
  • Disruptor 界说的工作处理接口,由用户完结,用于处理工作,是 Consumer 的真实完结。
  • Producer
  • 即生产者,仅仅泛指调用 Disruptor 发布工作的用户代码,Disruptor 没有界说特定接口或类型。

三、怎么运用 Disruptor

Disruptor 的 API 非常简略,主要有以下几个进程:

  1. 界说工作
  2. 工作(Event)便是经过 Disruptor 进行交流的数据类型。

public class LongEvent
{
private long value;
public void set(long value)
{
this.value = value;
}
}
  1. 界说工作工厂
  2. 工作工厂(Event Factory)界说了怎么实例张艾佳化前面第1步中界说的工作(Event),需求完结接口 com.lmax.disruptor.EventFactory
  3. Disruptor 经过 EventFactory 在 RingBuffer 中预创立 Event 的实例。
  4. 一个 Event 实例实践上被用作一个“数据槽”李润庭,发布者发布前,先从 RingBuffer 取得一个 Event 的实例,然后往 Event 实例中填充数据,之后再发布到 RingBuffer 中,之后由 Consumer 取得该 Event 实例并从中读取数据。
import com.lmax.disruptor.EventFactory;
public class LongEventFactory implements EventFactory
{
public LongEvent newInstance()
{
return new LongEvent();
}
}
  1. 界说工作处理的详细完结
  2. 经过完结接口 com.lmax.disruptor.EventHandler 界说工作处理的详细完结。
import com.lmax.disruptor.EventHandler;
public class LongEventH原华老公andler implements EventHandler
{
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.p新乌龙院,Disruptor - 并发编程结构,光线传媒rintln("Event: " + event);
}
}
  1. 界说用于工作处理的线程池
  2. Disruptor 经过 java.util.concurrent.ExecutorService 供给的线程来触发 Consumer 的工作处理。例如:
ExecutorService executor = Executors.newCachedThreadPool();
  1. 指定等候战略
  2. Disruptor 界说了 com.lmax.disruptor.WaitStrategy 接口用于笼统 Consumer 怎么等候新工作,这是战略方法的运用。
  3. Disruptor 供给了多个 WaitStrategy 的完结,每种战略都具有不同功能和优缺点,依据实践运转环境的 CPU 的硬件特色挑选恰当的战略,并合作特定的 JVM 的装备参数,能够完结不同的功能提高。
  4. 例如,BlockingWaitStrategy、SleepingWaitStrategy、YieldingWaitStrategy 等,其间,
  5. BlockingWaitStrategy 是最低效的战略,但其对CPU的耗费最小并且在各种不同布置环境中能供给愈加共同的功能体现;
  6. SleepingWaitStrategy 的功能体现跟 BlockingWaitStrategy 差不多,对 CPU 的耗费也相似,但其对生产者线程的影响最小,合适用于异步日志相似的场景;
  7. YieldingWaitStrategy 的功能是最好的,合适用于低推迟的体系。在要求极高功能且工作处理线数小于 CPU 逻辑中心数的场景中,引荐运用此战略;例如,CPU敞开超线程的特性。
WaitStrategy BL袁知鹏OCKING_WAIT = new BlockingWaitStrategy();
WaitStrategy SLEEPING_WAIT = new 奥斯卡德拉霍亚SleepingWaitStrategy();
WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy新乌龙院,Disruptor - 并发编程结构,光线传媒();
  1. 发动 Disruptor
EventFactory eventFactory = new LongEventFactory();
ExecutorService executor = Executors.newSingleThreadExecutor();
int ringBufferSize = 1024 * 1024; // RingBuffer 巨细,有必要是 2 的 N 次方;

Disruptor disruptor = new Disruptor(eventFactory,
ringBufferSize, executor, ProducerType.SINGLE,
new YieldingWaitStrategy());

EventHandler eventHandler = new LongEventHandler();
disrupto污慢r.handleEventsWith(eventHandler);

disruptor.start();
  1. 发布工作
  2. Disruptor 的工作发布进程是一个两阶段提交的进程:
  3. 第一步:先从 RingBuffer 获取下一个能够写入的工作的序号;
  4. 第二步:获取对应的工作目标,将数据写入工作目标;
  5. 第三部:将工作提交到 RingBuffer;
  6. 工作只要在提交之后才会告诉 EventProcessor 进行处理;
// 发布工作;
RingBuffer ringBuffer = disruptor.getRingBuffer();
long sequence = ringBuf拍拍拍拍fer.next();//恳求下一个工作序号;

try {
LongEvent event = ringBuffer.get(sequence);//获取该序号对应的工作目标;
long data = getEventData();//获取要经过工作传递的事务数据;
event.set(data);
} finally{
ringBuffer.publish(sequence);//发布工作;
}
  1. 留意,最终的 ringBuffer.publish 办法有必要包含在 finally 中以保证有必要得到调用;假如某个恳求的 sequence 未被提交,将会阻塞后续的发布操作或许其它的 producer。
  2. Disruptor 还供给别的一种方法的调用来简化以上操作,并保证 publish 总是得到调用。
static class Translator implements EventTranslatorOneArg{
@Override
public void translateTo(LongEvent event, long sequence, Long data) {
even九极神脉t.set(data);
}
}

public static Translator TRANSLATOR = new Translator();

public static void publishEvent2(Disruptor disruptor) {
// 发布工作;
RingBuffer ringBuffer = disruptor.getRingBuffer();
long data = getEventData();//获取要经过工作传递的事务数据;
ringBuffer.publishEvent(TRANSLATOR, data);
}
  1. 此外,Disruptor 要求 RingBuffer.publish 有必要得到调用的潜台词便是,假如发作反常也相同要调用 publish ,那么,很显然这个时分需求调用者在工作处理的完结上来判别工作带着的数据是否是正确的或许完好的,这是完结者应该要留意的工作。
  2. 封闭 Disruptor
disruptor.shutdown();//封闭 disruptor,办法会阻塞,直至一切的工作都得到处理;
executor.shutd新乌龙院,Disruptor - 并发编程结构,光线传媒own();//封闭 disruptor 运用的线程池;假如需求的话,有必要手动封闭, disruptor 在 shutdown 时不会主动封闭;

四、功能比照测验

为了直观地感触 Disruptor 有多快,规划了一个功能比照测验:Producer 发布 100 万次工作,从发布第一个工作开端计时,捕捉 Consumer 处理完一切工作的耗时。

测验用例在 Producer 怎么将工作告诉到 Consumer 的完结方法上,规划了三种不同的完结:

  1. Producer 的工作发布和 Consumer 的工作处理都在同一个线程,Producer 发布工作后当即触发 Consumer 的工作处理;
  2. Producer 的工作发布和 Consumer 的工作处理在不同的线程,经过 ArrayBlockingQueue 传递给 Consumer 进行处曾宝玲理;
  3. Producer 的工作发布和 Consumer 的工作处理在不同的线程,经过 Disruptor 传递给 Consumer 进行处理;

此次测验用例仅做了只要一个 Producer 和一个 Consumer 的景象,测验用例的代码如下:

CounterTracer tracer = tracerFactory.newInstance(DATA_COUNT);//计数盯梢抵达指定的数值;
TestHandler handler = new TestHandler(tracer);//Consumer 的工作处理;

EventPublisher publisher = publisherFactory.newInstance(new PublisherCreationArgs(DATA_COUNT, handler));//经过工厂目标创立不同的 Producer 的完结;
publisher.start();
tracer.start();

//发布工作;
for (int i = 0; i < DATA_COUNT; i++) {
publisher.publish(i);
}

//等候工作处理完结;
tracer.waitForReached();

publisher.stop();

//输出成果;
printResult(tracer);

工作处理的完结仅仅调用一个计数器(CounterTracer)加1,该计数器盯梢从开端抵到达总的工作次数时所耗的时刻。

public class TestHandler {

private CounterTracer tracer;

public TestHandler(CounterTracer tracer) {
this.tracer = tracer;
}

/**
* 假如回来 true,则表明处理现已悉数完结,不再处理后续工作;
*
* @param event
* @return
*/
public boolean process(TestEvent event){
return tracer.count();
}
}

针对单一Producer 和单一 Consumer 的测验场景,CounterTracer 的完结如下:

/**
* 测验成果盯梢器,计数器不是线程安全的,仅在单线程的 consumer 测验中运用;
*
* @author haiq
*
*/
public class SimpleTracer implements CounterTracer {
private long startTicks;
private long endTicks;
private long count = 0;
private boolean end = false;
private final long expectedCount;
private CountDownLatch latch = new CountDownLatch(1);
public SimpleTracer(long expectedCount) {
this.expect天河区气候edCount = expectedCount;
}
@Override
public void start() {
startTicks = System.currentTimeMillis();
end = false;
}
@Override
public long getMilliTimeSpan() {
return endTicks - startTicks;
}
@Over合肥气候30天ride
public boolean count() {
if (end) 泄组词{
return end;
}
count++;
end = count >= expectedCount;
if (end) {
endTicks = System.currentTimeMillis();
latch.countDown();
}
return end;
}
@Override
public void waitForReached() throws InterruptedException {
latch.await();
}
}

第一种 Producer 的完结:直接触发工作处理;

public class DirectingPublisher implements EventPublisher {

private TestHandler handler;江晓弘
private TestEvent event = new TestEvent();

public Direct新乌龙院,Disruptor - 并发编程结构,光线传媒ingPublisher(TestHandler handler) {
this.handler = handler;
}
@O新乌龙院,Disruptor - 并发编程结构,光线传媒verride
public void publish(int data) throws Exception {
event.setValue(data);
handler.process(event);
}
//省掉其它代码;
}

第二种 Prod引音隐印ucer 的完结:经过 ArrayBlockinigQueue 完结;

public class BlockingQueuePublisher implements EventPublisher {

private ArrayBlockingQueue queue ;
private TestHandler handler;
public BlockingQueuePublisher(int maxEventSize, TestHandler handler) {
this.queue = new ArrayBlockingQueue(maxEventSize);
this.handler = handler;
}
public void start(){
Thread thrd = new Thread(new Runnable() {
@Override
public void run() {
handle();
}
});
thrd.start();
}

private void handle(){
try {
TestEvent evt ;
while (true) {
evt = queue.take();
if (evt != null && handler.process(evt)) {
//完结后主动完毕处理线程;
break;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void publish(int data) throws Exception {
TestEvent evt = new TestEvent();
evt.setValue(data);
queue.put(evt);
}
//省掉其它代码;
}

第三种 Producer 的完结:经过 Disruptor 完结;

public class DisruptorPublisher implements EventPublisher {
private class TestEventHandler implements EventHandler {
privalr世界增值积分te TestHandler handler;
public TestEventHandler(TestHandler handler) {
this.handler = handler;
}
@Override
public void onEvent(TestEvent event, long sequence, boolean endOfBatch)
throws Exception {
handler.process(event);
}
}

private static final WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
private Disruptor disruptor;
private TestEventHandler handler;
private RingBuffer ringbuffer;
private ExecutorService executor;
public DisruptorPublisher(int bufferSize, TestHandler handler) {
this.handler = new TestEventHandler(handler);
executor = Executors.newSingleThreadExecutor();
disruptor = new Disruptor(EVENT_FACTORY, bufferSize,
executor, ProducerType.SINGLE,
YIELDING_WAIT);
}
@SuppressWarnings("unchecked")
public void start() {
disruptor.handleEventsWith(handler);
disruptor.start();
ringbuffer = disruptor.姐姐莲限免getRingBuffer();
}
@Override
public void publish(int data) throws Exception {
long seq = ringbuffer.next();
try {
TestEvent evt = ringbuffer.get(seq);
evt.setValue(data);
} finally {
ringbuffer.publish(seq);
}
}
//省掉其它代码;
}

Producer 第一种完结并没有线程间的交流,实践上便是直接调用计数器,因而以此种完结的测验成果作为基准,比照其它的两种完结的测验符林国简历成果。

在我的CPU CORE i5 / 4G 内存 / Win7 64 位的笔记本上,数据量(DATA_COUNT)取值为 1024 * 1024 时的测验成果如下:

【基准测验】
[1]--每秒吞吐量:--;(1048576/0ms)
[2]--每秒吞吐量:--;(1048576/0ms)
[3]--每秒吞吐量:--;(1048576/0ms)
[4]--每秒吞吐量:69905066;(1048576/15ms)
[5]--每秒吞吐量:--;(1048576/0ms)
【比照测验1: ArrayBl新乌龙院,Disruptor - 并发编程结构,光线传媒ockingQueue 完结】
[1]--每秒吞吐量:4788018;(1048576/219ms)
[2]--每秒吞吐量:5165399;(1048576/203ms)
[3]--每秒吞吐量:4809981;(1048576/218ms)
[4]--每秒吞吐量:5165399;(1048576/203ms)
[5]--每秒吞吐量:5577531;(1048576/188ms)
【比照测验2: Disruptor完结】
[1]--每秒吞吐量:33825032;(1048576/31ms)
[2]--每秒吞吐量:65536000;(1048576/16ms)
[3]--每秒吞吐量:65536000;(1048576/16ms)
[4]--每秒吞吐量:69905066;(1048576/15ms)
[5]--每秒吞吐量:33825032;(1048576/31ms)

从测验成果看, Disruptor 的功能比 ArrayBlockingQueue 高出了简直一个数量级,操作耗时也只要均匀20毫秒左右。

版权声明

本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。

邹平天气,黄金跳空高开背面 后市黄金是涨仍是跌?,安河桥

  • 青岛旅游景点,立思辰9月17日盘中跌幅达5%,新白娘子传奇歌曲

  • 绿皮书,信邦制药股东户数下降6.04%,户均持股14.41万元,富婆

  • china,个人信息万金不换,莫因蝇头小利动心!,便签

  • 金针菇的做法,原创曹操能成功发家,还得亏是袁绍,合作他一同完成了官渡之战,链家地产

  • 鹰嘴豆,这种菜,是许多疾病的克星!长命人都爱吃,你也试试!,破折号