2018年6月6日 星期三

CountDownLatch、CyclicBarrier、Semaphore、Exchanger、Phaser

CountDownLatch

執行緒的倒數計時器,全部的計時器都到了,再做想做的事
如果想知道執行緒執行完需要多少時間可以使用這個工具


CountDownLatch cdt = new CountDownLatch(3);
Runnable run = () -> {
    for (int i = 0; i < 10000000; i++) {
    }
    System.out.println(Thread.currentThread().getName());
    cdt.countDown();
};
    
Thread t1 = new Thread(run, "t1");
Thread t2 = new Thread(run, "t2");
Thread t3 = new Thread(run, "t3");
    
long start = System.currentTimeMillis();
t1.start();
t2.start();
t3.start();
    
try {
    System.out.println("wait...");
    cdt.await();
    System.out.println(System.currentTimeMillis() - start);
} catch (InterruptedException e) {
    e.printStackTrace();
}

※countDown() 將倒數計時器減1,await() 是等待全部的執行緒(此例是3)都到時才能通過



CyclicBarrier

一個班級到達玩的地點,全部都上車才能去下一個地點


CyclicBarrier cb = new CyclicBarrier(3, () -> System.out.println("barrier被觸發時呼叫,也就是所有執行緒都到時呼叫"));
System.out.println("總共有" + cb.getParties() + "個執行緒\r\n");
Runnable run = () -> {
    try {
        cb.await();
        TimeUnit.SECONDS.sleep(1);
        System.out.println("執行緒" + Thread.currentThread().getName() + "已到達一號站");
    
        TimeUnit.SECONDS.sleep(1);
        cb.await();
        System.out.println("執行緒" + Thread.currentThread().getName() + "已到達二號站");
    
        TimeUnit.SECONDS.sleep(1);
        cb.await();
        System.out.println("執行緒" + Thread.currentThread().getName() + "已到達三號站");
    } catch (InterruptedException | BrokenBarrierException e) {
        e.printStackTrace();
    }
};
    
Thread t1 = new Thread(run, "t1");
Thread t2 = new Thread(run, "t2");
Thread t3 = new Thread(run, "t3");
    
t1.start();
t2.start();
t3.start();

※有點類似 CountDownLatch,但可以執行多次

※建構子的第二個參數可以不寫,是 overloading



Phaser

有 CountDownLatch 和 CyclicBarrier 的功能,但還可以動態增加 parties,可參考這裡


public class PhaserTest extends Phaser {
    // 每個階段執行完會自動調用
    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
        if (phase == 0) return init();
        else if (phase == 1) return one();
        else if (phase == 2) return two();
        else if (phase == 3) return three();
        return true; // phase 應該結束就給 true
    }

    private boolean one() {
        System.out.println("共幾人=" + getRegisteredParties());
        return false;
    }

    private boolean two() {
        System.out.println("共幾人=" + getRegisteredParties());
        return false;
    }

    private boolean three() {
        System.out.println("共幾人=" + getRegisteredParties());
        return false;
    }

    private boolean init() {
        System.out.println("共幾人=" + getRegisteredParties());
        return false;
    }

    static class Runner implements Runnable {
        private Phaser phaser;

        public Runner(Phaser phaser) {
            this.phaser = phaser;
        }

        @Override
        public void run() {
            System.out.println("班級" + Thread.currentThread().getName() + ":已到出發站");
            phaser.arriveAndAwaitAdvance(); // 全到才會繼續往下走

            System.out.println("執行緒" + Thread.currentThread().getName() + "已到達1號站");
            phaser.arriveAndAwaitAdvance();

            System.out.println("執行緒" + Thread.currentThread().getName() + "已到達2號站");
            phaser.arriveAndAwaitAdvance();

            System.out.println("執行緒" + Thread.currentThread().getName() + "已到達3號站");
            phaser.arriveAndAwaitAdvance();

        }
    }

    public static void main(String[] args) {
        PhaserTest phaser = new PhaserTest();
        phaser.register(); // 每註冊一次,phaser 就會多維護一個執行緒
        for (int i = 0; i < 4; i++) {
            phaser.register();// 每註冊一次,phaser 就會多維護一個執行緒
            new Thread(new Runner(phaser)).start();
        }
        phaser.arriveAndDeregister();
    }
}

Semaphore

限制最多可以有幾個執行緒執行,以公共廁所為例


final int toilet = 3;// 設定3個廁所
final int user = 30;// 使用人數
ExecutorService es = Executors.newFixedThreadPool(user);
final Semaphore sp = new Semaphore(toilet, true);// 誰先等誰就先用(公平鎖)
    
for (int i = 0; i < user; i++) {
    try {
        TimeUnit.MILLISECONDS.sleep(100);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    
    Runnable runnable = () -> {
        try {
            System.out.println(Thread.currentThread().getName() + " 要求使用廁所");
            sp.acquire(); // 要求上廁所,如果還不滿3人才能使用
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        System.out.println(
                Thread.currentThread().getName() + " 拉屎中,目前有" + (toilet - sp.availablePermits()) + "個在使用");
    
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    
        System.out.println(Thread.currentThread().getName() + " 在沖馬桶了");
        sp.release(); // 歸還廁所
    
        System.out.println(
                Thread.currentThread().getName() + " 離開,目前有" + (toilet - sp.availablePermits()) + "個在使用\r\n");
    };
    es.execute(runnable);
}
es.shutdown();





Exchanger

執行緒相互交換資料


ExecutorService es = Executors.newFixedThreadPool(2);
Exchanger<String> exchanger = new Exchanger<>();
    
es.execute(() -> {
    try {
        String in = "xxx";
        System.out.println(Thread.currentThread().getName() + "將" + in + "送出去");
        Thread.sleep(1000);
        String out = exchanger.exchange(in);
        System.out.println(Thread.currentThread().getName() + "取得" + out);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});
    
es.execute(() -> {
    try {
        String in = "ooo";
        System.out.println(Thread.currentThread().getName() + "將" + in + "送出去");
        Thread.sleep(1000);
        String out = exchanger.exchange(in);
        System.out.println(Thread.currentThread().getName() + "取得" + out);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});
es.shutdown();

※可以用偶數個執行緒,但到底是哪兩個交換資料就不一定了

沒有留言:

張貼留言