告別Disruptor(一) 簡潔優雅的高性能并發隊列

幾年前聽說過Disruptor,一直沒用過也沒深究, 其號稱是一個性能爆表的并發隊列,上Github/LMAX-Exchange/disruptor 去看了看,官方性能描述文章 選了慢如蝸牛的ArrayBlockQueue來對比。在Nehalem 2.8Ghz – Windows 7 SP1 64-bit錄得性能見后(其中P,C分別代表 Producer和Consumer):

1P – 1C 的吞吐量兩千五百萬次,1P – 3C Multicast 就降到了一千萬次不到,對比我所認為的非線程安全1P -1C隊列億次每秒的量級,感覺并不強大。億次每秒的隊列加上線程安全,毛估估1P-1C性能減半五千萬次每秒,1P-3C 再減少個30%三千五百萬次每秒,應該差不多了吧。

繼續讀讀Disruptor的介紹,整體業務框架先不談,關于隊列的部分,我只想問可以說臟話不,太TMD的復雜了,實現方式一點都不優雅,講真,我想用100行代碼滅了它。

說干就干,先嘗試一下。

ArrayBlockingQueueDisruptor
Unicast: 1P – 1C5,339,25625,998,336
Pipeline: 1P – 3C2,128,91816,806,157
Sequencer: 3P – 1C5,539,53113,403,268
Multicast: 1P – 3C1,077,3849,377,871
Diamond: 1P – 3C2,113,94116,143,613

Comparative throughput (in ops per sec)

第一步 簡單粗暴的阻塞隊列

先簡單來個,這年頭用synchronized要被人瞧不起的(風聞現在性能好多了),大家言必談ReentrantLock,CAS,那么我也趕潮流CAS吧,主流程如下。

步驟主要工作失敗流程
1無論是put還是take,先對head/tail getAndIncrease 在Array中占位100%成功
2檢查是否能夠放/取失敗則循環檢查,條件允許了再操作(不能返回失敗,第1步的占位無法回退)
3執行放/取操作

第2步的操作,如果失敗,并非完全不能回退,只是需要牽扯到一套復雜的邏輯,在這個簡單粗暴的實現中,先不考慮非阻塞方案。

核心代碼如下:

private Object[] array;
private final int capacity;
private final int m;

private final AtomicLong tail;
private final AtomicLong head;
private final AtomicLong[] als = new AtomicLong[11];

public RiskBlockingQueue(int preferCapacity) {
	this.capacity = getPow2Value(preferCapacity, MIN_CAPACITY, MAX_CAPACITY);
	//這是個取比preferCapacity大的,最接近2的整數冪的函數(限制必須在 MIN MAX 之間)
	array = new Object[this.capacity];
	this.m = this.capacity - 1;		

	for (int i = 0; i < als.length; i++) {     // 并不一定100%成功的偽共享padding
		als[i] = new AtomicLong(0);
	}
	head = als[3];
	tail = als[7];
}

public void put(T obj) {
	ProgramError.when(obj == null, "Can't add null into this queue");
	int p = (int) (head.getAndIncrement() & this.m);
	int packTime = MIN_PACKTIME_NS;
	while(array[p] != null) {
		LockSupport.parkNanos(packTime);
		if(packTime < MAX_PACKTIME_NS) packTime <<= 1;
	}
	array[p] = obj;
}

public T take(){
	Object r;
	int p = (int) (tail.getAndIncrement() & this.m);
	int packTime = MIN_PACKTIME_NS;
	while((r = array[p]) == null) {
		LockSupport.parkNanos(packTime);
		if(packTime < MAX_PACKTIME_NS) packTime <<= 1;
	}
	array[p] = null;
	return (T)r;
}

代碼簡單的不要不要的,30來行代碼,一個線程安全的阻塞就基本完成。什么?你問構造函數為什么叫RiskBlockingQueue?,很簡單,有Risk,這并不是一個真正意義上的線程安全Queue,它有風險,那么風險在哪里呢?

各位看官先自己想想風險在哪里,我先來測個1P-1C 性能數據 (以下數據都在關閉了CPU超線程的環境下測試獲得,超線程時數據經??瓷先ズ苊?

1P – 1C

Producer 0 has completed. Cost Per Put 19ns.
Consumer 0 has completed. Cost Per Take 19ns.
Total 201M I/O, cost 3844ms, 52374243/s(52M/s)

??,還真的和預測差不多,性能減半。

接下來,揭曉這個隊列的風險,我們再來看看隊列中一個P線程(Producer 線程,Consumer稱為C線程,下同)put操作的工作流程

步驟主要工作
1getAndIncrease 占位
2檢查是否能夠put
3執行put操作

假設某線程P0,執行完了第1步,在執行第2或3步時被叫去喝茶了

這時P0線程本應填充位置array[x]但卻沒有填充(如果有對應的C線程,也take不到對象,被卡在array[x]上不斷pack)。

但線程P1 – Pn在歡快的繼續執行,不斷的put,沿著array往前跑,漸漸的,一圈過去了,P1 線程也來到了array[x]的位置。

這時,P0線程和P1線程對array[x]位置的訪問處于競爭狀態,array[x] 沒有任何鎖/同步/信號量/原子操作保護。這可能會造成對象丟失,并卡住一個C線程,并最終卡住整個隊列。C線程們也一樣,極端情況下,當一個C線程追上另一個C線程的時候,也會對數組的同一位置發生非線程安全的爭用。

哪里有問題,就解決哪里的問題

第二步 真正的并發阻塞Array隊列

對于數組,Java沒有提供直接的CAS操作方式(除非自己調Unsafe),不但沒有CAS,連volatile都沒有。不過,Java中有一個內置的類,叫AtomicReferenceArray,簡而言之,這個類提供了對 T[] 數組的CAS及類似操作。繼續上代碼,這次還少了2行。

private AtomicReferenceArray<T> array;  //原本是 Object[] array
private final int capacity;
private final int m;
private final AtomicLong tail;
private final AtomicLong head;

public ConcurrentBlockingQueue(int preferCapacity) {
	this.capacity = getPow2Value(preferCapacity, MIN_CAPACITY, MAX_CAPACITY);
	array = new AtomicReferenceArray<T>(this.capacity);
	this.m = this.capacity - 1;		

	for (int i = 0; i < als.length; i++) {
		als[i] = new AtomicLong(0);
	}
	head = als[3];
	tail = als[7];
}

public void put(T obj) {
	ProgramError.when(obj == null, "Queue object can't be null");
	int p = (int) (head.getAndIncrement() & this.m);
	int packTime = MIN_PACKTIME_NS;
	while(!array.compareAndSet(p, null, obj)) { //***
		LockSupport.parkNanos(packTime);
		if(packTime < MAX_PACKTIME_NS) packTime <<= 1;
	}
}

public T take(){
	T r;
	int p = (int) (tail.getAndIncrement() & this.m);
	int packTime = MIN_PACKTIME_NS;
	while((r=array.getAndSet(p, null)) == null) {  //***
		LockSupport.parkNanos(packTime);
		if(packTime < MAX_PACKTIME_NS) packTime <<= 1;
	}
	return r;
}

這段代碼的核心修改是,每次讀寫array的位置p,都通過CAS和GAS的原子操作來完成,保證了array的線程安全,在高度爭用時,依然可以確保放一個取一個的交替。 接下來繼續測試性能。

1P – 1C

Consumer 0 has completed. Cost Per Take 29ns.
Producer 0 has completed. Cost Per Add 29ns.
Total 201M I/O, cost 5912ms, 34053889/s(34M/s)

1P – 3C

Consumer 0 has completed. Cost Per Take 143ns.
Consumer 2 has completed. Cost Per Take 143ns.
Producer 0 has completed. Cost Per Put 47ns.
Consumer 1 has completed. Cost Per Take 143ns.
Total 201M I/O, cost 9665ms, 20830480/s (20M/s)

3P – 1C

Producer 0 has completed. Cost Per Put 185ns.
Producer 1 has completed. Cost Per Put 185ns.
Producer 2 has completed. Cost Per Put 185ns.
Consumer 0 has completed. Cost Per Take 62ns.
Total 201M I/O, cost 12551ms, 16040681/s(16M/s)

2P – 2C

Producer 0 has completed. Cost Per Put 147ns.
Producer 1 has completed. Cost Per Put 148ns.
Consumer 0 has completed. Cost Per Take 148ns.
Consumer 1 has completed. Cost Per Take 148ns.
Total 201M I/O, cost 14986ms, 13434311/s(13M/s)

感覺有點沮喪,可能是因為增加的CAS操作,性能進一步下降。

考慮到我的測試環境里CPU比Disruptor當年的CPU快不少,實際可比性能,1P-1C估計只有Disruptor的1.1倍,1P – 3C的性能,只有其1.5倍???,但是快的及其有限,號稱要挑戰Disruptor,卻僅憑點數小勝,而非直接擊倒,不爽。

具體的性能對比如下:

Disruptor
Nehalem 2.8Ghz
ConcurrentBlockingQueue
Skylake-X 3.3Ghz
Unicast: 1P – 1C25,998,33634,053,889
Pipeline: 1P – 3C16,806,157
Sequencer: 3P – 1C13,403,268
Multicast: 3P – 1C16,040,681
Multicast: 1P – 3C9,377,87120,830,480
Diamond: 1P – 3C16,143,613
Multicast: 2P – 2C13,434,311

問題出在哪里呢?仔細想了一下,可能是過多的變量共享訪問及數組的共享訪問造成了大量的 L1/2 緩存失效,描述如下。

1P – 1C時,一旦head和tail靠近或套圈,那么對64字節的數組內存就直接形成了共享爭用。

在指針壓縮時,Java的每個引用占用4字節,64字節一條的Cache line 一共有16個引用,兩個線程一個放一個取,不斷的 L1/2 緩存失效,鎖定,修改。

1P – 3C的時候,還要再加上C與C之間的 tail 爭用和 L1/2 緩存爭用,然后,性能就下降到這么個結果了。

還是那句話,哪里有問題,就解決哪里的問題

但是怎么改呢,想了下:
1P1C時,一旦隊列空了或是滿了,很容易陷在Producer/Consumer一個放一個取的過程中,而多P或多C時,多個P/C線程,排著隊一個個自增head/tail然后修改共享數組,P線程們爭完head之后,集中在共享數組的相鄰位置嘗試放置對象;C線程們爭用tail,再修改共享數組的相鄰位置,幾乎每次訪問都是無效 L1/2 緩存。
也就是說,在密集爭用時,后一個線程正好爭得前一個線程取得的位置的后一個的位置(head/tail + 1),然后這兩個線程對位于同一個cache line上的相鄰的數組位置進行訪問,造成緩存失效,性能下降,這是流程上的硬傷,沒法改,只能推倒重來。

3 非阻塞線程安全Array隊列

上一個隊列,不但性能不盡人意,而且很難支持非阻塞操作,換個思路,重新設計一個支持非阻塞操作的隊列,并盡可能少共享訪問內存的情況。流程如下:

步驟主要工作失敗流程
1檢查可能能夠放/取的數組位置(讀取競爭變量head或tail)不會失敗
2檢查是否能夠放/取(對應位置是否有對象,反映了隊列是否空/滿)回步驟1/或返回失敗
3getAndIncrease競爭變量head或tail,競爭該位置的操作權(放/取)回步驟1/或返回失敗
4執行放/取操作失敗就循環重試

先以offer為例看看代碼:

private AtomicReferenceArray<T> array;

public boolean offer(T obj) {
	if(obj == null) throw new NullPointExceprion("Can't put null object into this queue");
	long head = this.head.get(); //步驟1
	int p =(int) (head & this.m);
	if(array.get(p) != null) return false;   //步驟2
	if(this.head.compareAndSet(head, head + 1)) {  //步驟3
		int packTime = MIN_PACKTIME_NS;
		while(!array.compareAndSet(p, null, obj)) { //步驟4
			LockSupport.parkNanos(MIN_PACKTIME_NS);
			if(packTime < MAX_PACKTIME_NS) packTime <<= 1;
		};
		return true;
	}
	return false;
}

這里我們要注意,在offer的第2步,就算if判斷時,array.get(p) == null 可以放置對象,但這并不說明,在第4步,位置p仍然為空,因為可能有上一圈甚至再上一圈,不知道為什么停在這里的P線程往array的p位置放置了對象(雖然概率很小)。此時要等到一個C線程將這個對象取出之后,該位置才能被繼續放置對象,如果沒有C線程來取,將一直等在這里。

由于此時head已經被getAndIncrease,這又是一個已經占位,無法輕易回退的地方。我們也不可能將array.compareAndSet和head.getAndIncrease兩個原子操作,合并成一個原子操作。

采用不可回退,反復循環嘗試的方式,代碼雖然工作正常,但存在block較長時間的可能,甚至,在極其極端的情況下(多隊列,多線程,復雜流水線且存在特定的處理循環),還有可能引起死鎖。

難道又改成一個不支持非阻塞操作的隊列?心有不甘!最好有一個不怎么影響整體性能的方案來實現這里的回退。

此類的并發沖突是個小概率事件,需要回退的比例很低,回退部分可以性能較差,但正常處理時的性能要盡量不受影響。

高性能 = 減少真共享 + 消除偽共享 + 降低爭用 + …. 但是,怎么寫呢? 干到這里時正好是中午,下樓吃飯時邊吃邊想,然后,在開心的喝著牛肉湯時更開心的把這個問題想通了。 代碼如下:

private final byte[] falseOffer;   //新增
private final byte[] falsePoll;  //新增
private final AtomicReferenceArray<T> array;
final int capacity;
final int m;
final AtomicLong tail;  
final AtomicLong head;

public ConcurrentQueue(int preferCapacity) {
	this.capacity = ComUtils.getPow2Value(preferCapacity, MIN_CAPACITY, MAX_CAPACITY);
	array = new AtomicReferenceArray<T>(this.capacity);
	falseAdd = new byte[this.capacity];
	falsePoll = new byte[this.capacity];
	this.m = this.capacity - 1;

	for (int i = 0; i < als.length; i++) {
		als[i] = new AtomicLong(0);
	}
	head = als[3];
	tail = als[7];
}

public boolean offer(T obj) {
	if(obj == null) throw new NullPointExceprion("Can't put null object into this queue");
	while(true) {
		long head = this.head.get();
		int p =(int) (head & this.m);
		if(falsePoll[p] > 0) {
			synchronized(falsePoll) {  //運行比例很低,性能要求不高,直接同步處理
				if(falsePoll[p] > 0) {  //如果不滿足條件,說明失效計數已被其他線程處理,break; 回到最初重新嘗試offer
					if(this.head.compareAndSet(head, head + 1)){ //如果不滿足條件,說明位置P已經失效,回到最初重新嘗試offer
						falsePoll[p] --; //跳過一次存在poll失效計數的位置p, poll失效計數 - 1,回到最初重新嘗試offer
					}
				}
			}
			break;
		}
		if(array.get(p) != null) return false;
		if(this.head.compareAndSet(head, head + 1)) {
			for(int i = 0; i < INTERNAL_PACK_COUNT; i ++) {
				if(!array.compareAndSet(p, null, obj)) {
					LockSupport.parkNanos(2 << i);
				} else return true;
			}
			synchronized(falseOffer) {  //運行比例很低,性能要求不高,直接同步處理
				falseOffer[p] ++;  //位置p的add失效計數器
			}
		}
		return false;
	}
	return false;
}


public T poll(){
	while(true) {
		T r;
		long tail = this.tail.get();
		int p = (int) (tail & this.m);
		if(falseOffer[p] > 0) {
			synchronized(falseOffer) {
				if(this.tail.compareAndSet(tail, tail + 1)) {
					falseOffer[p]--;
				}
			}
			break;
		}
		r = array.get(p);
		if(r == null) return null;
		if(this.tail.compareAndSet(tail, tail + 1)) {
			for(int i = 0; i < INTERNAL_PACK_COUNT; i ++) {
				if((r = array.getAndSet(p, null)) == null){
					LockSupport.parkNanos(2 << i);
				} else return r;
			}
			synchronized(falsePoll) {
				falsePoll[p] ++;
			}
		}
		return null;
	}
	return null;
}

新增了兩個和array等長的byte數組falseOffer[] 和 falsePoll[]作為失敗回退計數器,如果在前文提到的offer/ poll 過程中,發生了array的p位置的 CAS或GAS失敗,并且無法通過重試少量次數迅速成功,那么將失敗回退計數器 falseOffer[p] 或是 falsePoll[p] +1 。

正常流程中,每次offer / poll 時,先讀取falsePoll[p] / falseOffer[p],如果poll 讀到的 falseOffer[p] 大于0,說明位置p發生過應該offer卻未能成功offer的回退,poll 操作應該忽略位置p一次,此時C線程同步鎖定,檢查,嘗試自增tail,將falseOffer[p] –,然后繼續嘗試下一個位置。這一連串的過程,是個小概率事件,簡單的同步鎖就好了,無需過多考慮性能。

同時,因為回退是小概率事件,所以falseOffer[] 和 falsePoll[]數組很少被修改,所有的對這兩個失敗回退計數數組的讀取,大部分時間都處于 L1/2 緩存有效狀態,平均訪問耗時應該在1ns左右,性能影響很小。

再看看這個解決方案的安全性,雖然失敗回退是個小概率事件,但數組byte會不會溢出?會不會同一個位置,累計超過127次失???測試下吧。

測試結果如下:

隊列長度線程數觀察到的byte計數最大值
2256P – 256C9
4512P-512C2
81024P-1024C1

在將隊列長度設置為最小值2,幾百個線程操作的時候,觀察到了byte數組中有最高9的計數,當隊列長度設置到8時,千余線程,經短時運行測試,沒有觀察到過大于1的計數。而在實際應用中,最小隊列長度會限制為1024或更大(高性能服務器弄個很小的隊列,沒啥意義),這個byte數組的溢出概率極極極極極極極小。如果想省點空間,這個byte數組應該還可以進一步優化,用4個bit來計數就夠了。

高性能 = 減少真共享 + 消除偽共享 + 降低爭用 + ….

還有降低爭用一招沒用,什么是關鍵競爭變量,head? tail? array? P線程競爭head,然后競爭array,C線程競爭tail 然后競爭array,當隊列長度居中時,array(連續16個引用)就比head/tail競爭更激烈,而當隊列滿/空時,array的爭用壓力還需要再相加一下,在大吞吐量,多線程競爭一個資源失敗時,如果大家都很激進的重復競爭,將導致這些爭用和共享資源反復處于緩存失效狀態,降低性能。因此,當某個offer/poll操作失敗時,失敗的線程需要等待的稍微久一點,再嘗試下一次,而不是簡單粗暴的packNano(1),當隊列一直空,或是滿的時候,相關線程更不應該反復循環,應該等久一點,然后重試。這部分就不專門貼代碼了,有興趣可以直接去github上拉源碼看。

在這個過程中,還有一些其他回退檢測流程上的小坑也被自然填平了,不再多說。核心要點上面已經全部列出來了。

老樣子,實踐是檢驗真理的唯一標準,繼續跑分:

1P – 1C

Producer 0 has completed. Cost Per Put 14ns.
Consumer 0 has completed. Cost Per Take 14ns.
Total 440M I/O, cost 6282ms, 70,105,367/s, 70.11M/s

1P – 3C

Consumer 2 has completed. Cost Per Take 19ns.
Consumer 1 has completed. Cost Per Take 36ns.
Producer 0 has completed. Cost Per Put 14ns.
Consumer 0 has completed. Cost Per Take 42ns.
Total 440M I/O, cost 6256ms, 70,396,726/s, 70.40M/s

3P – 1C

Producer 0 has completed. Cost Per Put 23ns.
Producer 1 has completed. Cost Per Put 38ns.
Producer 2 has completed. Cost Per Put 44ns.
Consumer 0 has completed. Cost Per Take 14ns.
Total 440M I/O, cost 6519ms, 67,556,668/s, 67.56M/s

2P – 2C

Producer 1 has completed. Cost Per Put 14ns.
Consumer 1 has completed. Cost Per Take 25ns.
Producer 0 has completed. Cost Per Put 28ns.
Consumer 0 has completed. Cost Per Take 28ns.
Total 440M I/O, cost 6315ms, 69,739,021/s, 69.74M/s

這下心滿意足了,下個新版的Distruptor, 比較了一下。 在當前的Distruptor版本中,所有1P的測試,均使用的createSingleProducer創建的非線程安全的Producer,所以*部分,使用了一個非線程安全的隊列進行性能比較。其余的1P-nC 的隊列,暫無對應的比較對象,將在后續代碼/文章中逐步添加。

Disruptor(Old Ver)
Nehalem 2.8Ghz
Disruptor(V3.3)
Skylake-X 3.3Ghz
ConcurrentQueue
Skylake-X 3.3Ghz
ConcurrentBlockingQueue
(本文第2節的隊列)
Skylake-X 3.3Ghz
1P – 1C*134,952,766
OneToOneSequencedThroughputTest
*310,360,761
SimpleBlockingQueue
Thread-Safe 1P – 1C10,373,443
RingBuffer.createMultiProducer
70,105,36734,053,889
Pipeline: 1P – 3C16,806,15722,128,789
OneToThreePipelineSequencedThroughputTest
Sequencer: 3P – 1C13,403,26811,344,299
ThreeToOneSequencedThroughputTest
67,556,66816,040,681
Multicast: 1P – 3C9,377,871168,350,168
OneToThreeSequencedThroughputTest
Diamond: 1P – 3C16,143,61322,899,015
OneToThreeDiamondSequencedThroughputTest
2P – 2C4,273,504
TwoToTwoWorkProcessorThroughputTest
69,739,02113,434,311

在與Disruptor的可比項之間的比較中,ConcurrentQueue線程安全隊列,取得了遠高于Disruptor的吞吐量,在多線程高并發爭用的條件下實現了超過六千萬次每秒的吞吐量。數倍于Disruptor

這次算是K.O.了。

這是終點嗎?并不是,整條服務器業務處理流水線的大部分地方,通常并不需要真正的線程安全隊列。而是更多的需要1P – 1C,或是n為確定數值的 nP – 1C這樣的隊列,后續將會參照1P – 1C的非線程安全隊列 SimpleBlockingQuere,繼續添加實現及介紹文章

本文所涉及到的代碼及測試代碼,可在https://github.com/Lofint/tachyon 查看/下載

原創文章,轉載請注明: 轉載自并發編程網 – www.okfdzs1908.com本文鏈接地址: 告別Disruptor(一) 簡潔優雅的高性能并發隊列

FavoriteLoading添加本文到我的收藏
  • Trackback 關閉
  • 評論 (10)
    • 海帶
    • 2019/05/21 10:42下午

    勘誤,第三部分頭幾段和表格中步驟3的 getAndIncrease 都應改為compareAndSet

    • whoyou223
    • 2019/06/13 4:30下午

    都是CAS,并沒有本質區別,只是disruptor功能強大

    • 海帶
    • 2019/06/13 7:44下午

    whoyou223 :
    都是CAS,并沒有本質區別,只是disruptor功能強大

    CAS 確實不是關鍵差異,也不是本文的重點

    1 關鍵差異一,少用一個共享變量,可參考 http://www.okfdzs1908.com/disruptor-writing-ringbuffer/ 里面有一句話 “Disruptor 由消費者負責通知(生產者Barrier)它們處理到了哪個序列號” 這一個共享變量的值變更,在多線程環境下會緩存行失效,需要通過 L3 Cache 讀取,通常需要耗費50個時鐘周期/12ns或更多的時間。這是造成性能差異的重要原因之一。

    2 我們通常提及的Disruptor的隊列,是阻塞的。在流水線中,阻塞隊列在應對單消費者線程處理多類和/或多生產者任務時,不得不面對附加的多線程和/或并發隊列、任務類別判斷等影響性能的問題。這一點可能需要些例子才方便講清楚。我空了會繼續寫。

      • xc19891112
      • 2019/06/14 10:38上午

      的確 disruptor做的很強大 緩存填充避免false sharing、CAS、GC、以及事件驅動模型,disruptor 生產和消費都可做到鎖無關 ,你是怎么使用disrupto進行測試的代碼列出來

    • 海帶
    • 2019/06/14 10:53上午

    xc19891112 :
    的確 disruptor做的很強大 緩存填充避免false sharing、CAS、GC、以及事件驅動模型,disruptor 生產和消費都可做到鎖無關 ,你是怎么使用disrupto進行測試的代碼列出來

    我做的Disruptor的測試都用其自帶的測試代碼,文中最后一個表格的第三列列出了所有測試代碼的類名。CPU型號也已列出,其他環境: 32G DDR4 2133 / Ubuntu 1804 / JDK 1.8.171

    Disruptor自己文章中用的啥測試代碼,我簡單找了一下沒找到明確說法。

    • 流逝的風
    • 2019/06/14 3:16下午

    SimpleBlockingQueue 和 disruptor的OneToOneSequencedThroughputTest 測試一下 沒有達到disruptor的吞吐量 相差甚遠并且disruptor雖然鎖無關 但是有數據一致性 相比SimpleBlockingQueue無法保證

      • 海帶
      • 2019/06/14 3:47下午

      這是我的測試結果

      disruptor: OneToOneSequencedThroughputTest

      Starting Disruptor tests
      Run 0, Disruptor=134,408,602 ops/sec BatchPercent=97.80% AverageBatchSize=45
      Run 1, Disruptor=53,590,568 ops/sec BatchPercent=85.72% AverageBatchSize=7
      Run 2, Disruptor=160,771,704 ops/sec BatchPercent=98.39% AverageBatchSize=61
      Run 3, Disruptor=134,770,889 ops/sec BatchPercent=97.18% AverageBatchSize=35
      Run 4, Disruptor=143,678,160 ops/sec BatchPercent=97.69% AverageBatchSize=43
      Run 5, Disruptor=179,533,213 ops/sec BatchPercent=99.07% AverageBatchSize=107
      Run 6, Disruptor=142,857,142 ops/sec BatchPercent=97.56% AverageBatchSize=41

      SimpleBlockingQueue: 1P – 1C
      Producer 0 has completed. Cost Per Put 2ns.
      Consumer 0 has completed. Cost Per Take 2ns.
      Total 440M I/O, cost 1304ms, 337,731,533/s, 337.73M/s

      這是個非線程安全隊列(要NUM_CONSUMER = 1;否則會跑失敗),但在OneToOneSequencedThroughputTest中Disruptor用的也是 ringBuffer = createSingleProducer(… … …) 而不是 createMultiProducer

      后面不帶*的是線程安全比較,

      API方面,當然和Disruptor不是一個量級。但從服務端流水線處理的角度,我認為Disruptor的api有點重。

        • 流逝的風
        • 2019/06/14 3:53下午

        多說無益 研究透了再說滅他吧 談不上告別

          • 海帶
          • 2019/06/14 4:08下午

          不急,這才第一篇。還真要多說。

    • 流逝的風
    • 2019/06/14 3:24下午

    你的代碼 我也過了一下 實話打敗disruptor還遠 disruptor針對并發的細節都做了優化 api設計的也很精妙了

您必須 登陸 后才能發表評論

return top

竞彩258网 5yg| yyi| u1m| u1w| eug| 1qy| gw1| gic| y9u| isy| 9as| uks| 0sc| qq0| kmu| g0s| c0e| uka| 0qu| cg8| gia| q9g| kas| 9ko| eu9| wke| k9k| kwc| 9uw| 9ik| ka0| wyg| w8m| cqg| 8wo| ce8| gkg| y8e| yae| 8ci| eio| 9yy| 9gk| gs7| you| c7m| uys| 7sw| sw8| yae| g8w| iyq| 8aw| sq8| oq8| ego| q6a| siw| 7km| iy7| sog| k7s| aqs| 7eu| yc7| wic| m7o| iks| wmi| m6w| wyo| 6ac| oc6| wks| m6m| yso| 6ci| yw7| wow| o5q| mcg| ugm| 5cg| ka5| oou| s5w| cei| 6cq| oe6| qek|