Synchronizes:Semaphore、CountDownLatch、CyclicBarrier、Exchanger、Phaser

    1. Semaphore Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量,通過協(xié)調(diào)各個線程,保證合理的使用公共資源。Semaphore維護(hù)了一個許可集,其實(shí)就是一定數(shù)量的“許可證”。當(dāng)有

    1. Semaphore

    Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量,通過協(xié)調(diào)各個線程,保證合理的使用公共資源。 Semaphore維護(hù)了一個許可集,其實(shí)就是一定數(shù)量的“許可證”。當(dāng)有線程想要訪問共享資源時,需要先獲取(acquire)的許可;如果許可不夠了,線程需要一直等待,直到許可可用。當(dāng)線程使用完共享資源后,可以歸還(release)許可,以供其它需要的線程使用。

    和ReentrantLock類似,Semaphore支持公平/非公平策略。

    Semaphore的主要方法摘要:   void acquire():從此信號量獲取一個許可,在提供一個許可前一直將線程阻塞,否則線程被中斷。   void release():釋放一個許可,將其返回給信號量。   int availablePermits():返回此信號量中當(dāng)前可用的許可數(shù)。   boolean hasQueuedThreads():查詢是否有線程正在等待獲取。

    使用

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    //創(chuàng)建一個會實(shí)現(xiàn)print queue的類名為 PrintQueue。
    class PrintQueue {
    
        // 聲明一個對象為Semaphore,稱它為semaphore。
        private final Semaphore semaphore;
        // 實(shí)現(xiàn)類的構(gòu)造函數(shù)并初始能保護(hù)print quere的訪問的semaphore對象的值。
        public PrintQueue() {
            semaphore = new Semaphore(1);
        }
    
        //實(shí)現(xiàn)Implement the printJob()方法,此方法可以模擬打印文檔,并接收document對象作為參數(shù)。
        public void printJob(Object document) {
    //在這方法內(nèi),首先,你必須調(diào)用acquire()方法獲得demaphore。這個方法會拋出 InterruptedException異常,使用必須包含處理這個異常的代碼。
            try {
                semaphore.acquire();
    
    //然后,實(shí)現(xiàn)能隨機(jī)等待一段時間的模擬打印文檔的行。
                long duration = (long) (Math.random() * 10);
    
                System.out.printf("%s: PrintQueue: Printing a Job during %d seconds\n", Thread.currentThread().getName(), duration);
    
                Thread.sleep(duration);
    
    //最后,釋放semaphore通過調(diào)用semaphore的relaser()方法。
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
        }
    
    }
    
    //創(chuàng)建一個名為Job的類并一定實(shí)現(xiàn)Runnable 接口。這個類實(shí)現(xiàn)把文檔傳送到打印機(jī)的任務(wù)。
    class Job implements Runnable {
        //聲明一個對象為PrintQueue,名為printQueue。
        private PrintQueue printQueue;
        //實(shí)現(xiàn)類的構(gòu)造函數(shù),初始化這個類里的PrintQueue對象。
        public Job(PrintQueue printQueue) {
            this.printQueue = printQueue;
        }
    
        //實(shí)現(xiàn)方法run()。
        @Override
        public void run() {
                    //首先, 此方法寫信息到操控臺表明任務(wù)已經(jīng)開始執(zhí)行了。
            System.out.printf("%s: Going to print a job\n", Thread.currentThread().getName());
                    // 然后,調(diào)用PrintQueue 對象的printJob()方法。
            printQueue.printJob(new Object());
                    //最后, 此方法寫信息到操控臺表明它已經(jīng)結(jié)束運(yùn)行了。
            System.out.printf("%s: The document has been printed\n", Thread.currentThread().getName());
    
        }
    }
    
    public class SemaphoreTest {
    
        public static void main(String args[]) {
    
                    // 創(chuàng)建PrintQueue對象名為printQueue。
            PrintQueue printQueue = new PrintQueue();
                //創(chuàng)建10個threads。每個線程會執(zhí)行一個發(fā)送文檔到print queue的Job對象。
            Thread thread[] = new Thread[10];
    
            for (int i = 0; i < 10; i++) {
                thread[i] = new Thread(new Job(printQueue), "Thread" + i);
            }
    
            for (int i = 0; i < 10; i++) {
                thread[i].start();
            }
    
        }
    }

    2. CountDownLatch

    在多線程協(xié)作完成業(yè)務(wù)功能時,有時候需要等待其他多個線程完成任務(wù)之后,主線程才能繼續(xù)往下執(zhí)行業(yè)務(wù)功能,在這種的業(yè)務(wù)場景下,通常可以使用Thread類的join方法,讓主線程等待被join的線程執(zhí)行完之后,主線程才能繼續(xù)往下執(zhí)行。當(dāng)然,使用線程間消息通信機(jī)制也可以完成。其實(shí),java并發(fā)工具類中為我們提供了類似“倒計(jì)時”這樣的工具類,可以十分方便的完成所說的這種業(yè)務(wù)場景。

    CountDownLatch允許一個或多個線程等待其他線程完成工作。

    CountDownLatch相關(guān)方法:

    • public CountDownLatch(int count) 構(gòu)造方法會傳入一個整型數(shù)N,之后調(diào)用CountDownLatch的countDown方法會對N減一,知道N減到0的時候,當(dāng)前調(diào)用await方法的線程繼續(xù)執(zhí)行。
    • await() throws InterruptedException:調(diào)用該方法的線程等到構(gòu)造方法傳入的N減到0的時候,才能繼續(xù)往下執(zhí)行;
    • await(long timeout, TimeUnit unit):與上面的await方法功能一致,只不過這里有了時間限制,調(diào)用該方法的線程等到指定的timeout時間后,不管N是否減至為0,都會繼續(xù)往下執(zhí)行;
    • countDown():使CountDownLatch初始值N減1;
    • long getCount():獲取當(dāng)前CountDownLatch維護(hù)的值

    CountDownLatch的用法 CountDownLatch典型用法:1、某一線程在開始運(yùn)行前等待n個線程執(zhí)行完畢。將CountDownLatch的計(jì)數(shù)器初始化為new CountDownLatch(n),每當(dāng)一個任務(wù)線程執(zhí)行完畢,就將計(jì)數(shù)器減1 countdownLatch.countDown(),當(dāng)計(jì)數(shù)器的值變?yōu)?時,在CountDownLatch上await()的線程就會被喚醒。一個典型應(yīng)用場景就是啟動一個服務(wù)時,主線程需要等待多個組件加載完畢,之后再繼續(xù)執(zhí)行。 CountDownLatch典型用法:2、實(shí)現(xiàn)多個線程開始執(zhí)行任務(wù)的最大并行性。注意是并行性,不是并發(fā),強(qiáng)調(diào)的是多個線程在某一時刻同時開始執(zhí)行。類似于賽跑,將多個線程放到起點(diǎn),等待發(fā)令槍響,然后同時開跑。做法是初始化一個共享的CountDownLatch(1),將其計(jì)算器初始化為1,多個線程在開始執(zhí)行任務(wù)前首先countdownlatch.await(),當(dāng)主線程調(diào)用countDown()時,計(jì)數(shù)器變?yōu)?,多個線程同時被喚醒。

    CountDownLatch的不足 CountDownLatch是一次性的,計(jì)算器的值只能在構(gòu)造方法中初始化一次,之后沒有任何機(jī)制再次對其設(shè)置值,當(dāng)CountDownLatch使用完畢后,它不能再次被使用。

    栗子:運(yùn)動員進(jìn)行跑步比賽時,假設(shè)有6個運(yùn)動員參與比賽,裁判員在終點(diǎn)會為這6個運(yùn)動員分別計(jì)時,可以想象沒當(dāng)一個運(yùn)動員到達(dá)終點(diǎn)的時候,對于裁判員來說就少了一個計(jì)時任務(wù)。直到所有運(yùn)動員都到達(dá)終點(diǎn)了,裁判員的任務(wù)也才完成。這6個運(yùn)動員可以類比成6個線程,當(dāng)線程調(diào)用CountDownLatch.countDown方法時就會對計(jì)數(shù)器的值減一,直到計(jì)數(shù)器的值為0的時候,裁判員(調(diào)用await方法的線程)才能繼續(xù)往下執(zhí)行。

    public class CountDownLatchTest {
    private static CountDownLatch startSignal = new CountDownLatch(1);
    //用來表示裁判員需要維護(hù)的是6個運(yùn)動員
    private static CountDownLatch endSignal = new CountDownLatch(6);
    
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(6);
        for (int i = 0; i < 6; i++) {
            executorService.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " 運(yùn)動員等待裁判員響哨!!!");
                    startSignal.await();
                    System.out.println(Thread.currentThread().getName() + "正在全力沖刺");
                    endSignal.countDown();
                    System.out.println(Thread.currentThread().getName() + "  到達(dá)終點(diǎn)");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        System.out.println("裁判員響哨開始啦!!!");
        startSignal.countDown();
        endSignal.await();
        System.out.println("所有運(yùn)動員到達(dá)終點(diǎn),比賽結(jié)束!");
        executorService.shutdown();
    }
    }
    
    該示例代碼中設(shè)置了兩個CountDownLatch,第一個endSignal用于控制讓main線程(裁判員)必須等到其他線程(運(yùn)動員)讓CountDownLatch維護(hù)的數(shù)值N減到0為止,相當(dāng)于一個完成信號;另一個startSignal用于讓main線程對其他線程進(jìn)行“發(fā)號施令”,相當(dāng)于一個入口或者開關(guān)。
    
    startSignal引用的CountDownLatch初始值為1,而其他線程執(zhí)行的run方法中都會先通過 startSignal.await()讓這些線程都被阻塞,直到main線程通過調(diào)用startSignal.countDown();,將值N減1,CountDownLatch維護(hù)的數(shù)值N為0后,其他線程才能往下執(zhí)行,并且,每個線程執(zhí)行的run方法中都會通過endSignal.countDown();對endSignal維護(hù)的數(shù)值進(jìn)行減一,由于往線程池提交了6個任務(wù),會被減6次,所以endSignal維護(hù)的值最終會變?yōu)?,因此main線程在latch.await();阻塞結(jié)束,才能繼續(xù)往下執(zhí)行。
    
    注意:當(dāng)調(diào)用CountDownLatch的countDown方法時,當(dāng)前線程是不會被阻塞,會繼續(xù)往下執(zhí)行。

    3. CyclicBarrier

    CountDownLatch是一個倒數(shù)計(jì)數(shù)器,在計(jì)數(shù)器不為0時,所有調(diào)用await的線程都會等待,當(dāng)計(jì)數(shù)器降為0,線程才會繼續(xù)執(zhí)行,且計(jì)數(shù)器一旦變?yōu)?,就不能再重置了。

    CyclicBarrier可以認(rèn)為是一個柵欄,柵欄的作用是什么?就是阻擋前行。

    CyclicBarrier是一個可以循環(huán)使用的柵欄,它做的事情就是:讓線程到達(dá)柵欄時被阻塞(調(diào)用await方法),直到到達(dá)柵欄的線程數(shù)滿足指定數(shù)量要求時,柵欄才會打開放行,被柵欄攔截的線程才可以執(zhí)行。

    當(dāng)多個線程都達(dá)到了指定點(diǎn)后,才能繼續(xù)往下繼續(xù)執(zhí)行。這就有點(diǎn)像報(bào)數(shù)的感覺,假設(shè)6個線程就相當(dāng)于6個運(yùn)動員,到賽道起點(diǎn)時會報(bào)數(shù)進(jìn)行統(tǒng)計(jì),如果剛好是6的話,這一波就湊齊了,才能往下執(zhí)行。這里的6個線程,也就是計(jì)數(shù)器的初始值6,是通過CyclicBarrier的構(gòu)造方法傳入的。

    CyclicBarrier的主要方法:

    • await() throws InterruptedException, BrokenBarrierException 等到所有的線程都到達(dá)指定的臨界點(diǎn);
    • await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException 與上面的await方法功能基本一致,只不過這里有超時限制,阻塞等待直至到達(dá)超時時間為止;
    • int getNumberWaiting()獲取當(dāng)前有多少個線程阻塞等待在臨界點(diǎn)上;
    • boolean isBroken()用于查詢阻塞等待的線程是否被中斷
    • void reset()將屏障重置為初始狀態(tài)。如果當(dāng)前有線程正在臨界點(diǎn)等待的話,將拋出BrokenBarrierException。

    另外需要注意的是,CyclicBarrier提供了這樣的構(gòu)造方法:

    public CyclicBarrier(int parties, Runnable barrierAction)

    可以用來,當(dāng)指定的線程都到達(dá)了指定的臨界點(diǎn)的時,接下來執(zhí)行的操作可以由barrierAction傳入即可。

    栗子:6個運(yùn)動員準(zhǔn)備跑步比賽,運(yùn)動員在賽跑需要在起點(diǎn)做好準(zhǔn)備,當(dāng)裁判發(fā)現(xiàn)所有運(yùn)動員準(zhǔn)備完畢后,就舉起發(fā)令槍,比賽開始。這里的起跑線就是屏障,是臨界點(diǎn),而這6個運(yùn)動員就類比成線程的話,就是這6個線程都必須到達(dá)指定點(diǎn)了,意味著湊齊了一波,然后才能繼續(xù)執(zhí)行,否則每個線程都得阻塞等待,直至湊齊一波即可。

    public class CyclicBarrierTest {
        public static void main(String[] args) {
    
            int N = 6;  // 運(yùn)動員數(shù)
            CyclicBarrier cb = new CyclicBarrier(N, new Runnable() {
                @Override
                public void run() {
                    System.out.println("所有運(yùn)動員已準(zhǔn)備完畢,發(fā)令槍:跑!");
                }
            });
    
            for (int i = 0; i < N; i++) {
                Thread t = new Thread(new PrepareWork(cb), "運(yùn)動員[" + i + "]");
                t.start();
            }
        }
    
    
    private static class PrepareWork implements Runnable {
            private CyclicBarrier cb;
    
            PrepareWork(CyclicBarrier cb) {
                this.cb = cb;
            }
    
            @Override
            public void run() {
    
                try {
                    Thread.sleep(500);
                    System.out.println(Thread.currentThread().getName() + ": 準(zhǔn)備完成");
                    cb.await();          // 在柵欄等待
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    從輸出結(jié)果可以看出,當(dāng)6個運(yùn)動員(線程)都到達(dá)了指定的臨界點(diǎn)(barrier)時候,才能繼續(xù)往下執(zhí)行,否則,則會阻塞等待在調(diào)用await()處。 (在CyclicBarrier構(gòu)造函數(shù)中傳入N參數(shù),代表當(dāng)由N個線程調(diào)用cb.await(); 方法,CyclicBarrier才會往下執(zhí)行barrierAction)

    CyclicBarrier對異常的處理 線程在阻塞過程中,可能被中斷,那么既然CyclicBarrier放行的條件是等待的線程數(shù)達(dá)到指定數(shù)目,萬一線程被中斷導(dǎo)致最終的等待線程數(shù)達(dá)不到柵欄的要求怎么辦?

    public int await() throws InterruptedException, BrokenBarrierException {
        //...
    }

    可以看到,這個方法除了拋出InterruptedException異常外,還會拋出BrokenBarrierException。 BrokenBarrierException表示當(dāng)前的CyclicBarrier已經(jīng)損壞了,等不到所有線程都到達(dá)柵欄了,所以已經(jīng)在等待的線程也沒必要再等了,可以散伙了。

    出現(xiàn)以下幾種情況之一時,當(dāng)前等待線程會拋出BrokenBarrierException異常:

    • 其它某個正在await等待的線程被中斷了;
    • 其它某個正在await等待的線程超時了;
    • 某個線程重置了CyclicBarrier;

    另外,只要正在Barrier上等待的任一線程拋出了異常,那么Barrier就會認(rèn)為肯定是湊不齊所有線程了,就會將柵欄置為損壞(Broken)狀態(tài),并傳播BrokenBarrierException給其它所有正在等待(await)的線程。

    異常情況模擬:

    public class CyclicBarrierTest {
        public static void main(String[] args) throws InterruptedException {
    
            int N = 6;  // 運(yùn)動員數(shù)
            CyclicBarrier cb = new CyclicBarrier(N, new Runnable() {
                @Override
                public void run() {
                    System.out.println("所有運(yùn)動員已準(zhǔn)備完畢,發(fā)令槍:跑!");
                }
            });
    
            List<Thread> list = new ArrayList<>();
            for (int i = 0; i < N; i++) {
                Thread t = new Thread(new PrepareWork(cb), "運(yùn)動員[" + i + "]");
                list.add(t);
                t.start();
                if (i == 3) {
                    t.interrupt();  // 運(yùn)動員[3]置中斷標(biāo)志位
                }
            }
    
            Thread.sleep(2000);
            System.out.println("Barrier是否損壞:" + cb.isBroken());
        }

    CountDownLatch與CyclicBarrier的比較 CountDownLatch與CyclicBarrier都是用于控制并發(fā)的工具類,都可以理解成維護(hù)的就是一個計(jì)數(shù)器,但是這兩者還是各有不同側(cè)重點(diǎn)的:

    • CountDownLatch一般用于某個線程A等待若干個其他線程執(zhí)行完任務(wù)之后,它才執(zhí)行;而CyclicBarrier一般用于一組線程互相等待至某個狀態(tài),然后這一組線程再同時執(zhí)行;CountDownLatch強(qiáng)調(diào)一個線程等多個線程完成某件事情。CyclicBarrier是多個線程互等,等大家都完成,再攜手共進(jìn)。
    • CountDownLatch方法比較少,操作比較簡單,而CyclicBarrier提供的方法更多,比如能夠通過getNumberWaiting(),isBroken()這些方法獲取當(dāng)前多個線程的狀態(tài),并且CyclicBarrier的構(gòu)造方法可以傳入barrierAction,指定當(dāng)所有線程都到達(dá)時執(zhí)行的業(yè)務(wù)功能;
    • CountDownLatch是不能復(fù)用的,而CyclicLatch是可以復(fù)用的。

    4. Exchanger

    Exchanger可以用來在兩個線程之間交換持有的對象。當(dāng)Exchanger在一個線程中調(diào)用exchange方法之后,會等待另外的線程調(diào)用同樣的exchange方法,兩個線程都調(diào)用exchange方法之后,傳入的參數(shù)就會交換。

    兩個主要方法 public V exchange(V x) throws InterruptedException

    當(dāng)這個方法被調(diào)用的時候,當(dāng)前線程將會等待直到其他的線程調(diào)用同樣的方法。當(dāng)其他的線程調(diào)用exchange之后,當(dāng)前線程將會繼續(xù)執(zhí)行。 在等待過程中,如果有其他的線程interrupt當(dāng)前線程,則會拋出InterruptedException。

    public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

    多了一個timeout時間。如果在timeout時間之內(nèi)沒有其他線程調(diào)用exchange方法,拋出TimeoutException。

    栗子: 我們先定義一個帶交換的類: 然后定義兩個Runnable,在run方法中調(diào)用exchange方法:

    public class ExchangerTest {
    
        public static void main(String[] args) {
            Exchanger<CustBook> exchanger = new Exchanger<>();
            // Starting two threads
            new Thread(new ExchangerOne(exchanger)).start();
            new Thread(new ExchangerTwo(exchanger)).start();
        }
    }
    public class CustBook {
    
        private String name;
    }
    public class ExchangerOne implements Runnable{
    
        Exchanger<CustBook> ex;
    
        ExchangerOne(Exchanger<CustBook> ex){
          this.ex=ex;
        }
    
        @Override
        public void run() {
        CustBook custBook= new CustBook();
            custBook.setName("book one");
    
            try {
                CustBook exhangeCustBook=ex.exchange(custBook);
                log.info(exhangeCustBook.getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public class ExchangerTwo implements Runnable{
    
        Exchanger<CustBook> ex;
    
        ExchangerTwo(Exchanger<CustBook> ex){
          this.ex=ex;
        }
    
        @Override
        public void run() {
        CustBook custBook= new CustBook();
            custBook.setName("book two");
    
            try {
                CustBook exhangeCustBook=ex.exchange(custBook);
                log.info(exhangeCustBook.getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    5. Phaser

    Phaser是一個同步工具類,適用于一些需要分階段的任務(wù)的處理。它的功能與 CyclicBarrier和CountDownLatch類似,類似于一個多階段的柵欄,并且功能更強(qiáng)大,我們來比較下這三者的功能:

    CountDownLatch

    倒數(shù)計(jì)數(shù)器,初始時設(shè)定計(jì)數(shù)器值,線程可以在計(jì)數(shù)器上等待,當(dāng)計(jì)數(shù)器值歸0后,所有等待的線程繼續(xù)執(zhí)行

    CyclicBarrier

    循環(huán)柵欄,初始時設(shè)定參與線程數(shù),當(dāng)線程到達(dá)柵欄后,會等待其它線程的到達(dá),當(dāng)?shù)竭_(dá)柵欄的總數(shù)滿足指定數(shù)后,所有等待的線程繼續(xù)執(zhí)行

    Phaser

    多階段柵欄,可以在初始時設(shè)定參與線程數(shù),也可以中途注冊/注銷參與者,當(dāng)?shù)竭_(dá)的參與者數(shù)量滿足柵欄設(shè)定的數(shù)量后,會進(jìn)行階段升級(advance)

    相關(guān)概念:phase(階段) Phaser也有柵欄,在Phaser中,柵欄的名稱叫做phase(階段),在任意時間點(diǎn),Phaser只處于某一個phase(階段),初始階段為0,最大達(dá)到Integerr.MAX_VALUE,然后再次歸零。當(dāng)所有parties參與者都到達(dá)后,phase值會遞增。

    parties(參與者) Phaser既可以在初始構(gòu)造時指定參與者的數(shù)量,也可以中途通過register、bulkRegister、arriveAndDeregister等方法注冊/注銷參與者。

    arrive(到達(dá)) / advance(進(jìn)階) Phaser注冊完parties(參與者)之后,參與者的初始狀態(tài)是unarrived的,當(dāng)參與者到達(dá)(arrive)當(dāng)前階段(phase)后,狀態(tài)就會變成arrived。當(dāng)階段的到達(dá)參與者數(shù)滿足條件后(注冊的數(shù)量等于到達(dá)的數(shù)量),階段就會發(fā)生進(jìn)階(advance)——也就是phase值+1。

    Termination(終止) 代表當(dāng)前Phaser對象達(dá)到終止?fàn)顟B(tài)。

    Tiering(分層) Phaser支持分層(Tiering) —— 一種樹形結(jié)構(gòu),通過構(gòu)造函數(shù)可以指定當(dāng)前待構(gòu)造的Phaser對象的父結(jié)點(diǎn)。之所以引入Tiering,是因?yàn)楫?dāng)一個Phaser有大量參與者(parties)的時候,內(nèi)部的同步操作會使性能急劇下降,而分層可以降低競爭,從而減小因同步導(dǎo)致的額外開銷。 在一個分層Phasers的樹結(jié)構(gòu)中,注冊和撤銷子Phaser或父Phaser是自動被管理的。當(dāng)一個Phaser參與者(parties)數(shù)量變成0時,如果有該P(yáng)haser有父結(jié)點(diǎn),就會將它從父結(jié)點(diǎn)中溢移除。

    核心方法:

    • arriveAndDeregister() 該方法立即返回下一階段的序號,并且其它線程需要等待的個數(shù)減一, 取消自己的注冊、把當(dāng)前線程從之后需要等待的成員中移除。 如果該P(yáng)haser是另外一個Phaser的子Phaser(層次化Phaser), 并且該操作導(dǎo)致當(dāng)前Phaser的成員數(shù)為0,則該操作也會將當(dāng)前Phaser從其父Phaser中移除。
    • arrive() 某個參與者完成任務(wù)后調(diào)用,該方法不作任何等待,直接返回下一階段的序號。 awaitAdvance(int phase) 該方法等待某一階段執(zhí)行完畢。 如果當(dāng)前階段不等于指定的階段或者該P(yáng)haser已經(jīng)被終止,則立即返回。 該階段數(shù)一般由arrive()方法或者arriveAndDeregister()方法返回。 返回下一階段的序號,或者返回參數(shù)指定的值(如果該參數(shù)為負(fù)數(shù)),或者直接返回當(dāng)前階段序號(如果當(dāng)前Phaser已經(jīng)被終止)。
    • awaitAdvanceInterruptibly(int phase) 效果與awaitAdvance(int phase)相當(dāng), 唯一的不同在于若該線程在該方法等待時被中斷,則該方法拋出InterruptedException。
    • awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) 效果與awaitAdvanceInterruptibly(int phase)相當(dāng), 區(qū)別在于如果超時則拋出TimeoutException。
    • bulkRegister(int parties) 動態(tài)調(diào)整注冊任務(wù)parties的數(shù)量。如果當(dāng)前phaser已經(jīng)被終止,則該方法無效,并返回負(fù)數(shù)。 如果調(diào)用該方法時,onAdvance方法正在執(zhí)行,則該方法等待其執(zhí)行完畢。 如果該P(yáng)haser有父Phaser則指定的party數(shù)大于0,且之前該P(yáng)haser的party數(shù)為0,那么該P(yáng)haser會被注冊到其父Phaser中。
    • forceTermination() 強(qiáng)制讓該P(yáng)haser進(jìn)入終止?fàn)顟B(tài)。 已經(jīng)注冊的party數(shù)不受影響。如果該P(yáng)haser有子Phaser,則其所有的子Phaser均進(jìn)入終止?fàn)顟B(tài)。 如果該P(yáng)haser已經(jīng)處于終止?fàn)顟B(tài),該方法調(diào)用不造成任何影響。

    栗子:3個線程,4個階段,每個階段都并發(fā)處理

    import java.util.concurrent.Phaser;
    
    public class PhaserTest {
        public static void main(String[] args) {
            int parties = 3;
            int phases = 4;
            final Phaser phaser = new Phaser(parties) {
                @Override
                //每個階段結(jié)束時
                protected boolean onAdvance(int phase, int registeredParties) {
                    System.out.println("====== Phase : " + phase + "  end ======");
                    return registeredParties == 0;
                }
            };
            for (int i = 0; i < parties; i++) {
                int threadId = i;
                Thread thread = new Thread(() -> {
                    for (int phase = 0; phase < phases; phase++) {
                        if (phase == 0) {
                            System.out.println(String.format("第一階段操作  Thread %s, phase %s", threadId, phase));
                        }
                        if (phase == 1) {
                            System.out.println(String.format("第二階段操作  Thread %s, phase %s", threadId, phase));
                        }
                        if (phase == 2) {
                            System.out.println(String.format("第三階段操作  Thread %s, phase %s", threadId, phase));
                        }
                        if (phase == 3) {
                            System.out.println(String.format("第四階段操作  Thread %s, phase %s", threadId, phase));
                        }
             /**
              * arriveAndAwaitAdvance() 當(dāng)前線程當(dāng)前階段執(zhí)行完畢,等待其它線程完成當(dāng)前階段。
              * 如果當(dāng)前線程是該階段最后一個未到達(dá)的,則該方法直接返回下一個階段的序號(階段序號從0開始),
              * 同時其它線程的該方法也返回下一個階段的序號。
              **/
                        int nextPhaser = phaser.arriveAndAwaitAdvance();
    
                    }
                });
                thread.start();
            }
        }
    }
    聲明:所有內(nèi)容來自互聯(lián)網(wǎng)搜索結(jié)果,不保證100%準(zhǔn)確性,僅供參考。如若本站內(nèi)容侵犯了原著者的合法權(quán)益,可聯(lián)系我們進(jìn)行處理。
    發(fā)表評論
    更多 網(wǎng)友評論0 條評論)
    暫無評論

    返回頂部

    主站蜘蛛池模板: 国产精品电影一区二区三区| 美女福利视频一区| 伊人色综合一区二区三区 | 久久se精品一区二区影院| 亚洲国产成人久久综合一区| 久久精品一区二区影院| 无遮挡免费一区二区三区| 国产SUV精品一区二区四 | 日韩电影在线观看第一区| 亚洲午夜精品一区二区公牛电影院 | 亚洲爆乳精品无码一区二区| 国产日韩一区二区三免费高清 | 日韩精品中文字幕无码一区 | 亚洲日韩AV一区二区三区四区| 日韩电影一区二区| 国产无线乱码一区二三区| 好湿好大硬得深一点动态图91精品福利一区二区 | 国产亚洲福利精品一区| 国产亚洲一区二区三区在线不卡| 精品一区二区ww| 成人区人妻精品一区二区三区| 亚洲一区精彩视频| 无码少妇一区二区三区芒果| 亚洲AV无码一区二区大桥未久 | 亚洲一区二区三区在线视频| 天天看高清无码一区二区三区| 国产成人一区二区在线不卡| 相泽南亚洲一区二区在线播放| 国产精品亚洲一区二区三区在线观看 | 亚洲综合一区国产精品| 久久久国产精品无码一区二区三区 | 精品一区二区三区东京热| 文中字幕一区二区三区视频播放| 国产午夜精品一区二区三区小说 | 亚洲国产福利精品一区二区| 亚洲中文字幕久久久一区| 国产精品成人99一区无码| 精品一区二区三区四区在线| 日本成人一区二区| 欧美人妻一区黄a片| 一区二区国产精品|