1. Semaphore
Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量,通過協(xié)調(diào)各個線程,保證合理的使用公共資源。 Semaphore維護(hù)了一個許可集,其實(shí)就是一定數(shù)量的“許可證”。當(dāng)有線程想要訪問共享資源時,需要先獲取(acquire)的許可;如果許可不夠了,線程需要一直等待,直到許可可用。當(dāng)線程使用完共享資源后,可以歸還(release)許可,以供其它需要的線程使用。
和ReentrantLock類似,Semaphore支持公平/非公平策略。
Semaphore的主要方法摘要: void acquire():從此信號量獲取一個許可,在提供一個許可前一直將線程阻塞,否則線程被中斷。 void release():釋放一個許可,將其返回給信號量。 int availablePermits():返回此信號量中當(dāng)前可用的許可數(shù)。 boolean hasQueuedThreads():查詢是否有線程正在等待獲取。
使用
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
//創(chuàng)建一個會實(shí)現(xiàn)print queue的類名為 PrintQueue。
class PrintQueue {
// 聲明一個對象為Semaphore,稱它為semaphore。
private final Semaphore semaphore;
// 實(shí)現(xiàn)類的構(gòu)造函數(shù)并初始能保護(hù)print quere的訪問的semaphore對象的值。
public PrintQueue() {
semaphore = new Semaphore(1);
}
//實(shí)現(xiàn)Implement the printJob()方法,此方法可以模擬打印文檔,并接收document對象作為參數(shù)。
public void printJob(Object document) {
//在這方法內(nèi),首先,你必須調(diào)用acquire()方法獲得demaphore。這個方法會拋出 InterruptedException異常,使用必須包含處理這個異常的代碼。
try {
semaphore.acquire();
//然后,實(shí)現(xiàn)能隨機(jī)等待一段時間的模擬打印文檔的行。
long duration = (long) (Math.random() * 10);
System.out.printf("%s: PrintQueue: Printing a Job during %d seconds\n", Thread.currentThread().getName(), duration);
Thread.sleep(duration);
//最后,釋放semaphore通過調(diào)用semaphore的relaser()方法。
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
//創(chuàng)建一個名為Job的類并一定實(shí)現(xiàn)Runnable 接口。這個類實(shí)現(xiàn)把文檔傳送到打印機(jī)的任務(wù)。
class Job implements Runnable {
//聲明一個對象為PrintQueue,名為printQueue。
private PrintQueue printQueue;
//實(shí)現(xiàn)類的構(gòu)造函數(shù),初始化這個類里的PrintQueue對象。
public Job(PrintQueue printQueue) {
this.printQueue = printQueue;
}
//實(shí)現(xiàn)方法run()。
@Override
public void run() {
//首先, 此方法寫信息到操控臺表明任務(wù)已經(jīng)開始執(zhí)行了。
System.out.printf("%s: Going to print a job\n", Thread.currentThread().getName());
// 然后,調(diào)用PrintQueue 對象的printJob()方法。
printQueue.printJob(new Object());
//最后, 此方法寫信息到操控臺表明它已經(jīng)結(jié)束運(yùn)行了。
System.out.printf("%s: The document has been printed\n", Thread.currentThread().getName());
}
}
public class SemaphoreTest {
public static void main(String args[]) {
// 創(chuàng)建PrintQueue對象名為printQueue。
PrintQueue printQueue = new PrintQueue();
//創(chuàng)建10個threads。每個線程會執(zhí)行一個發(fā)送文檔到print queue的Job對象。
Thread thread[] = new Thread[10];
for (int i = 0; i < 10; i++) {
thread[i] = new Thread(new Job(printQueue), "Thread" + i);
}
for (int i = 0; i < 10; i++) {
thread[i].start();
}
}
}
2. CountDownLatch
在多線程協(xié)作完成業(yè)務(wù)功能時,有時候需要等待其他多個線程完成任務(wù)之后,主線程才能繼續(xù)往下執(zhí)行業(yè)務(wù)功能,在這種的業(yè)務(wù)場景下,通常可以使用Thread類的join方法,讓主線程等待被join的線程執(zhí)行完之后,主線程才能繼續(xù)往下執(zhí)行。當(dāng)然,使用線程間消息通信機(jī)制也可以完成。其實(shí),java并發(fā)工具類中為我們提供了類似“倒計(jì)時”這樣的工具類,可以十分方便的完成所說的這種業(yè)務(wù)場景。
CountDownLatch允許一個或多個線程等待其他線程完成工作。
CountDownLatch相關(guān)方法:
- public CountDownLatch(int count) 構(gòu)造方法會傳入一個整型數(shù)N,之后調(diào)用CountDownLatch的countDown方法會對N減一,知道N減到0的時候,當(dāng)前調(diào)用await方法的線程繼續(xù)執(zhí)行。
- await() throws InterruptedException:調(diào)用該方法的線程等到構(gòu)造方法傳入的N減到0的時候,才能繼續(xù)往下執(zhí)行;
- await(long timeout, TimeUnit unit):與上面的await方法功能一致,只不過這里有了時間限制,調(diào)用該方法的線程等到指定的timeout時間后,不管N是否減至為0,都會繼續(xù)往下執(zhí)行;
- countDown():使CountDownLatch初始值N減1;
- long getCount():獲取當(dāng)前CountDownLatch維護(hù)的值
CountDownLatch的用法 CountDownLatch典型用法:1、某一線程在開始運(yùn)行前等待n個線程執(zhí)行完畢。將CountDownLatch的計(jì)數(shù)器初始化為new CountDownLatch(n),每當(dāng)一個任務(wù)線程執(zhí)行完畢,就將計(jì)數(shù)器減1 countdownLatch.countDown(),當(dāng)計(jì)數(shù)器的值變?yōu)?時,在CountDownLatch上await()的線程就會被喚醒。一個典型應(yīng)用場景就是啟動一個服務(wù)時,主線程需要等待多個組件加載完畢,之后再繼續(xù)執(zhí)行。 CountDownLatch典型用法:2、實(shí)現(xiàn)多個線程開始執(zhí)行任務(wù)的最大并行性。注意是并行性,不是并發(fā),強(qiáng)調(diào)的是多個線程在某一時刻同時開始執(zhí)行。類似于賽跑,將多個線程放到起點(diǎn),等待發(fā)令槍響,然后同時開跑。做法是初始化一個共享的CountDownLatch(1),將其計(jì)算器初始化為1,多個線程在開始執(zhí)行任務(wù)前首先countdownlatch.await(),當(dāng)主線程調(diào)用countDown()時,計(jì)數(shù)器變?yōu)?,多個線程同時被喚醒。
CountDownLatch的不足 CountDownLatch是一次性的,計(jì)算器的值只能在構(gòu)造方法中初始化一次,之后沒有任何機(jī)制再次對其設(shè)置值,當(dāng)CountDownLatch使用完畢后,它不能再次被使用。
栗子:運(yùn)動員進(jìn)行跑步比賽時,假設(shè)有6個運(yùn)動員參與比賽,裁判員在終點(diǎn)會為這6個運(yùn)動員分別計(jì)時,可以想象沒當(dāng)一個運(yùn)動員到達(dá)終點(diǎn)的時候,對于裁判員來說就少了一個計(jì)時任務(wù)。直到所有運(yùn)動員都到達(dá)終點(diǎn)了,裁判員的任務(wù)也才完成。這6個運(yùn)動員可以類比成6個線程,當(dāng)線程調(diào)用CountDownLatch.countDown方法時就會對計(jì)數(shù)器的值減一,直到計(jì)數(shù)器的值為0的時候,裁判員(調(diào)用await方法的線程)才能繼續(xù)往下執(zhí)行。
public class CountDownLatchTest {
private static CountDownLatch startSignal = new CountDownLatch(1);
//用來表示裁判員需要維護(hù)的是6個運(yùn)動員
private static CountDownLatch endSignal = new CountDownLatch(6);
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(6);
for (int i = 0; i < 6; i++) {
executorService.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 運(yùn)動員等待裁判員響哨!!!");
startSignal.await();
System.out.println(Thread.currentThread().getName() + "正在全力沖刺");
endSignal.countDown();
System.out.println(Thread.currentThread().getName() + " 到達(dá)終點(diǎn)");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
System.out.println("裁判員響哨開始啦!!!");
startSignal.countDown();
endSignal.await();
System.out.println("所有運(yùn)動員到達(dá)終點(diǎn),比賽結(jié)束!");
executorService.shutdown();
}
}
該示例代碼中設(shè)置了兩個CountDownLatch,第一個endSignal用于控制讓main線程(裁判員)必須等到其他線程(運(yùn)動員)讓CountDownLatch維護(hù)的數(shù)值N減到0為止,相當(dāng)于一個完成信號;另一個startSignal用于讓main線程對其他線程進(jìn)行“發(fā)號施令”,相當(dāng)于一個入口或者開關(guān)。
startSignal引用的CountDownLatch初始值為1,而其他線程執(zhí)行的run方法中都會先通過 startSignal.await()讓這些線程都被阻塞,直到main線程通過調(diào)用startSignal.countDown();,將值N減1,CountDownLatch維護(hù)的數(shù)值N為0后,其他線程才能往下執(zhí)行,并且,每個線程執(zhí)行的run方法中都會通過endSignal.countDown();對endSignal維護(hù)的數(shù)值進(jìn)行減一,由于往線程池提交了6個任務(wù),會被減6次,所以endSignal維護(hù)的值最終會變?yōu)?,因此main線程在latch.await();阻塞結(jié)束,才能繼續(xù)往下執(zhí)行。
注意:當(dāng)調(diào)用CountDownLatch的countDown方法時,當(dāng)前線程是不會被阻塞,會繼續(xù)往下執(zhí)行。
3. CyclicBarrier
CountDownLatch是一個倒數(shù)計(jì)數(shù)器,在計(jì)數(shù)器不為0時,所有調(diào)用await的線程都會等待,當(dāng)計(jì)數(shù)器降為0,線程才會繼續(xù)執(zhí)行,且計(jì)數(shù)器一旦變?yōu)?,就不能再重置了。
CyclicBarrier可以認(rèn)為是一個柵欄,柵欄的作用是什么?就是阻擋前行。
CyclicBarrier是一個可以循環(huán)使用的柵欄,它做的事情就是:讓線程到達(dá)柵欄時被阻塞(調(diào)用await方法),直到到達(dá)柵欄的線程數(shù)滿足指定數(shù)量要求時,柵欄才會打開放行,被柵欄攔截的線程才可以執(zhí)行。
當(dāng)多個線程都達(dá)到了指定點(diǎn)后,才能繼續(xù)往下繼續(xù)執(zhí)行。這就有點(diǎn)像報(bào)數(shù)的感覺,假設(shè)6個線程就相當(dāng)于6個運(yùn)動員,到賽道起點(diǎn)時會報(bào)數(shù)進(jìn)行統(tǒng)計(jì),如果剛好是6的話,這一波就湊齊了,才能往下執(zhí)行。這里的6個線程,也就是計(jì)數(shù)器的初始值6,是通過CyclicBarrier的構(gòu)造方法傳入的。
CyclicBarrier的主要方法:
- await() throws InterruptedException, BrokenBarrierException 等到所有的線程都到達(dá)指定的臨界點(diǎn);
- await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException 與上面的await方法功能基本一致,只不過這里有超時限制,阻塞等待直至到達(dá)超時時間為止;
- int getNumberWaiting()獲取當(dāng)前有多少個線程阻塞等待在臨界點(diǎn)上;
- boolean isBroken()用于查詢阻塞等待的線程是否被中斷
- void reset()將屏障重置為初始狀態(tài)。如果當(dāng)前有線程正在臨界點(diǎn)等待的話,將拋出BrokenBarrierException。
另外需要注意的是,CyclicBarrier提供了這樣的構(gòu)造方法:
public CyclicBarrier(int parties, Runnable barrierAction)
可以用來,當(dāng)指定的線程都到達(dá)了指定的臨界點(diǎn)的時,接下來執(zhí)行的操作可以由barrierAction傳入即可。
栗子:6個運(yùn)動員準(zhǔn)備跑步比賽,運(yùn)動員在賽跑需要在起點(diǎn)做好準(zhǔn)備,當(dāng)裁判發(fā)現(xiàn)所有運(yùn)動員準(zhǔn)備完畢后,就舉起發(fā)令槍,比賽開始。這里的起跑線就是屏障,是臨界點(diǎn),而這6個運(yùn)動員就類比成線程的話,就是這6個線程都必須到達(dá)指定點(diǎn)了,意味著湊齊了一波,然后才能繼續(xù)執(zhí)行,否則每個線程都得阻塞等待,直至湊齊一波即可。
public class CyclicBarrierTest {
public static void main(String[] args) {
int N = 6; // 運(yùn)動員數(shù)
CyclicBarrier cb = new CyclicBarrier(N, new Runnable() {
@Override
public void run() {
System.out.println("所有運(yùn)動員已準(zhǔn)備完畢,發(fā)令槍:跑!");
}
});
for (int i = 0; i < N; i++) {
Thread t = new Thread(new PrepareWork(cb), "運(yùn)動員[" + i + "]");
t.start();
}
}
private static class PrepareWork implements Runnable {
private CyclicBarrier cb;
PrepareWork(CyclicBarrier cb) {
this.cb = cb;
}
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName() + ": 準(zhǔn)備完成");
cb.await(); // 在柵欄等待
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
從輸出結(jié)果可以看出,當(dāng)6個運(yùn)動員(線程)都到達(dá)了指定的臨界點(diǎn)(barrier)時候,才能繼續(xù)往下執(zhí)行,否則,則會阻塞等待在調(diào)用await()處。 (在CyclicBarrier構(gòu)造函數(shù)中傳入N參數(shù),代表當(dāng)由N個線程調(diào)用cb.await(); 方法,CyclicBarrier才會往下執(zhí)行barrierAction)
CyclicBarrier對異常的處理 線程在阻塞過程中,可能被中斷,那么既然CyclicBarrier放行的條件是等待的線程數(shù)達(dá)到指定數(shù)目,萬一線程被中斷導(dǎo)致最終的等待線程數(shù)達(dá)不到柵欄的要求怎么辦?
public int await() throws InterruptedException, BrokenBarrierException {
//...
}
可以看到,這個方法除了拋出InterruptedException異常外,還會拋出BrokenBarrierException。 BrokenBarrierException表示當(dāng)前的CyclicBarrier已經(jīng)損壞了,等不到所有線程都到達(dá)柵欄了,所以已經(jīng)在等待的線程也沒必要再等了,可以散伙了。
出現(xiàn)以下幾種情況之一時,當(dāng)前等待線程會拋出BrokenBarrierException異常:
- 其它某個正在await等待的線程被中斷了;
- 其它某個正在await等待的線程超時了;
- 某個線程重置了CyclicBarrier;
另外,只要正在Barrier上等待的任一線程拋出了異常,那么Barrier就會認(rèn)為肯定是湊不齊所有線程了,就會將柵欄置為損壞(Broken)狀態(tài),并傳播BrokenBarrierException給其它所有正在等待(await)的線程。
異常情況模擬:
public class CyclicBarrierTest {
public static void main(String[] args) throws InterruptedException {
int N = 6; // 運(yùn)動員數(shù)
CyclicBarrier cb = new CyclicBarrier(N, new Runnable() {
@Override
public void run() {
System.out.println("所有運(yùn)動員已準(zhǔn)備完畢,發(fā)令槍:跑!");
}
});
List<Thread> list = new ArrayList<>();
for (int i = 0; i < N; i++) {
Thread t = new Thread(new PrepareWork(cb), "運(yùn)動員[" + i + "]");
list.add(t);
t.start();
if (i == 3) {
t.interrupt(); // 運(yùn)動員[3]置中斷標(biāo)志位
}
}
Thread.sleep(2000);
System.out.println("Barrier是否損壞:" + cb.isBroken());
}
CountDownLatch與CyclicBarrier的比較 CountDownLatch與CyclicBarrier都是用于控制并發(fā)的工具類,都可以理解成維護(hù)的就是一個計(jì)數(shù)器,但是這兩者還是各有不同側(cè)重點(diǎn)的:
- CountDownLatch一般用于某個線程A等待若干個其他線程執(zhí)行完任務(wù)之后,它才執(zhí)行;而CyclicBarrier一般用于一組線程互相等待至某個狀態(tài),然后這一組線程再同時執(zhí)行;CountDownLatch強(qiáng)調(diào)一個線程等多個線程完成某件事情。CyclicBarrier是多個線程互等,等大家都完成,再攜手共進(jìn)。
- CountDownLatch方法比較少,操作比較簡單,而CyclicBarrier提供的方法更多,比如能夠通過getNumberWaiting(),isBroken()這些方法獲取當(dāng)前多個線程的狀態(tài),并且CyclicBarrier的構(gòu)造方法可以傳入barrierAction,指定當(dāng)所有線程都到達(dá)時執(zhí)行的業(yè)務(wù)功能;
- CountDownLatch是不能復(fù)用的,而CyclicLatch是可以復(fù)用的。
4. Exchanger
Exchanger可以用來在兩個線程之間交換持有的對象。當(dāng)Exchanger在一個線程中調(diào)用exchange方法之后,會等待另外的線程調(diào)用同樣的exchange方法,兩個線程都調(diào)用exchange方法之后,傳入的參數(shù)就會交換。
兩個主要方法
public V exchange(V x) throws InterruptedException
當(dāng)這個方法被調(diào)用的時候,當(dāng)前線程將會等待直到其他的線程調(diào)用同樣的方法。當(dāng)其他的線程調(diào)用exchange之后,當(dāng)前線程將會繼續(xù)執(zhí)行。 在等待過程中,如果有其他的線程interrupt當(dāng)前線程,則會拋出InterruptedException。
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
多了一個timeout時間。如果在timeout時間之內(nèi)沒有其他線程調(diào)用exchange方法,拋出TimeoutException。
栗子: 我們先定義一個帶交換的類: 然后定義兩個Runnable,在run方法中調(diào)用exchange方法:
public class ExchangerTest {
public static void main(String[] args) {
Exchanger<CustBook> exchanger = new Exchanger<>();
// Starting two threads
new Thread(new ExchangerOne(exchanger)).start();
new Thread(new ExchangerTwo(exchanger)).start();
}
}
public class CustBook {
private String name;
}
public class ExchangerOne implements Runnable{
Exchanger<CustBook> ex;
ExchangerOne(Exchanger<CustBook> ex){
this.ex=ex;
}
@Override
public void run() {
CustBook custBook= new CustBook();
custBook.setName("book one");
try {
CustBook exhangeCustBook=ex.exchange(custBook);
log.info(exhangeCustBook.getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ExchangerTwo implements Runnable{
Exchanger<CustBook> ex;
ExchangerTwo(Exchanger<CustBook> ex){
this.ex=ex;
}
@Override
public void run() {
CustBook custBook= new CustBook();
custBook.setName("book two");
try {
CustBook exhangeCustBook=ex.exchange(custBook);
log.info(exhangeCustBook.getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
5. Phaser
Phaser是一個同步工具類,適用于一些需要分階段的任務(wù)的處理。它的功能與 CyclicBarrier和CountDownLatch類似,類似于一個多階段的柵欄,并且功能更強(qiáng)大,我們來比較下這三者的功能:
CountDownLatch
倒數(shù)計(jì)數(shù)器,初始時設(shè)定計(jì)數(shù)器值,線程可以在計(jì)數(shù)器上等待,當(dāng)計(jì)數(shù)器值歸0后,所有等待的線程繼續(xù)執(zhí)行
CyclicBarrier
循環(huán)柵欄,初始時設(shè)定參與線程數(shù),當(dāng)線程到達(dá)柵欄后,會等待其它線程的到達(dá),當(dāng)?shù)竭_(dá)柵欄的總數(shù)滿足指定數(shù)后,所有等待的線程繼續(xù)執(zhí)行
Phaser
多階段柵欄,可以在初始時設(shè)定參與線程數(shù),也可以中途注冊/注銷參與者,當(dāng)?shù)竭_(dá)的參與者數(shù)量滿足柵欄設(shè)定的數(shù)量后,會進(jìn)行階段升級(advance)
相關(guān)概念:phase(階段) Phaser也有柵欄,在Phaser中,柵欄的名稱叫做phase(階段),在任意時間點(diǎn),Phaser只處于某一個phase(階段),初始階段為0,最大達(dá)到Integerr.MAX_VALUE,然后再次歸零。當(dāng)所有parties參與者都到達(dá)后,phase值會遞增。
parties(參與者) Phaser既可以在初始構(gòu)造時指定參與者的數(shù)量,也可以中途通過register、bulkRegister、arriveAndDeregister等方法注冊/注銷參與者。
arrive(到達(dá)) / advance(進(jìn)階) Phaser注冊完parties(參與者)之后,參與者的初始狀態(tài)是unarrived的,當(dāng)參與者到達(dá)(arrive)當(dāng)前階段(phase)后,狀態(tài)就會變成arrived。當(dāng)階段的到達(dá)參與者數(shù)滿足條件后(注冊的數(shù)量等于到達(dá)的數(shù)量),階段就會發(fā)生進(jìn)階(advance)——也就是phase值+1。
Termination(終止) 代表當(dāng)前Phaser對象達(dá)到終止?fàn)顟B(tài)。
Tiering(分層) Phaser支持分層(Tiering) —— 一種樹形結(jié)構(gòu),通過構(gòu)造函數(shù)可以指定當(dāng)前待構(gòu)造的Phaser對象的父結(jié)點(diǎn)。之所以引入Tiering,是因?yàn)楫?dāng)一個Phaser有大量參與者(parties)的時候,內(nèi)部的同步操作會使性能急劇下降,而分層可以降低競爭,從而減小因同步導(dǎo)致的額外開銷。 在一個分層Phasers的樹結(jié)構(gòu)中,注冊和撤銷子Phaser或父Phaser是自動被管理的。當(dāng)一個Phaser參與者(parties)數(shù)量變成0時,如果有該P(yáng)haser有父結(jié)點(diǎn),就會將它從父結(jié)點(diǎn)中溢移除。
核心方法:
- arriveAndDeregister() 該方法立即返回下一階段的序號,并且其它線程需要等待的個數(shù)減一, 取消自己的注冊、把當(dāng)前線程從之后需要等待的成員中移除。 如果該P(yáng)haser是另外一個Phaser的子Phaser(層次化Phaser), 并且該操作導(dǎo)致當(dāng)前Phaser的成員數(shù)為0,則該操作也會將當(dāng)前Phaser從其父Phaser中移除。
- arrive() 某個參與者完成任務(wù)后調(diào)用,該方法不作任何等待,直接返回下一階段的序號。 awaitAdvance(int phase) 該方法等待某一階段執(zhí)行完畢。 如果當(dāng)前階段不等于指定的階段或者該P(yáng)haser已經(jīng)被終止,則立即返回。 該階段數(shù)一般由arrive()方法或者arriveAndDeregister()方法返回。 返回下一階段的序號,或者返回參數(shù)指定的值(如果該參數(shù)為負(fù)數(shù)),或者直接返回當(dāng)前階段序號(如果當(dāng)前Phaser已經(jīng)被終止)。
- awaitAdvanceInterruptibly(int phase) 效果與awaitAdvance(int phase)相當(dāng), 唯一的不同在于若該線程在該方法等待時被中斷,則該方法拋出InterruptedException。
- awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) 效果與awaitAdvanceInterruptibly(int phase)相當(dāng), 區(qū)別在于如果超時則拋出TimeoutException。
- bulkRegister(int parties) 動態(tài)調(diào)整注冊任務(wù)parties的數(shù)量。如果當(dāng)前phaser已經(jīng)被終止,則該方法無效,并返回負(fù)數(shù)。 如果調(diào)用該方法時,onAdvance方法正在執(zhí)行,則該方法等待其執(zhí)行完畢。 如果該P(yáng)haser有父Phaser則指定的party數(shù)大于0,且之前該P(yáng)haser的party數(shù)為0,那么該P(yáng)haser會被注冊到其父Phaser中。
- forceTermination() 強(qiáng)制讓該P(yáng)haser進(jìn)入終止?fàn)顟B(tài)。 已經(jīng)注冊的party數(shù)不受影響。如果該P(yáng)haser有子Phaser,則其所有的子Phaser均進(jìn)入終止?fàn)顟B(tài)。 如果該P(yáng)haser已經(jīng)處于終止?fàn)顟B(tài),該方法調(diào)用不造成任何影響。
栗子:3個線程,4個階段,每個階段都并發(fā)處理
import java.util.concurrent.Phaser;
public class PhaserTest {
public static void main(String[] args) {
int parties = 3;
int phases = 4;
final Phaser phaser = new Phaser(parties) {
@Override
//每個階段結(jié)束時
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("====== Phase : " + phase + " end ======");
return registeredParties == 0;
}
};
for (int i = 0; i < parties; i++) {
int threadId = i;
Thread thread = new Thread(() -> {
for (int phase = 0; phase < phases; phase++) {
if (phase == 0) {
System.out.println(String.format("第一階段操作 Thread %s, phase %s", threadId, phase));
}
if (phase == 1) {
System.out.println(String.format("第二階段操作 Thread %s, phase %s", threadId, phase));
}
if (phase == 2) {
System.out.println(String.format("第三階段操作 Thread %s, phase %s", threadId, phase));
}
if (phase == 3) {
System.out.println(String.format("第四階段操作 Thread %s, phase %s", threadId, phase));
}
/**
* arriveAndAwaitAdvance() 當(dāng)前線程當(dāng)前階段執(zhí)行完畢,等待其它線程完成當(dāng)前階段。
* 如果當(dāng)前線程是該階段最后一個未到達(dá)的,則該方法直接返回下一個階段的序號(階段序號從0開始),
* 同時其它線程的該方法也返回下一個階段的序號。
**/
int nextPhaser = phaser.arriveAndAwaitAdvance();
}
});
thread.start();
}
}
}