CountDownLatch
執行緒的倒數計時器,全部的計時器都到了,再做想做的事如果想知道執行緒執行完需要多少時間可以使用這個工具
※
CountDownLatch cdt = new CountDownLatch(3); Runnable run = () -> { for (int i = 0; i < 10000000; i++) { } System.out.println(Thread.currentThread().getName()); cdt.countDown(); }; Thread t1 = new Thread(run, "t1"); Thread t2 = new Thread(run, "t2"); Thread t3 = new Thread(run, "t3"); long start = System.currentTimeMillis(); t1.start(); t2.start(); t3.start(); try { System.out.println("wait..."); cdt.await(); System.out.println(System.currentTimeMillis() - start); } catch (InterruptedException e) { e.printStackTrace(); }
※countDown() 將倒數計時器減1,await() 是等待全部的執行緒(此例是3)都到時才能通過
CyclicBarrier
一個班級到達玩的地點,全部都上車才能去下一個地點※
CyclicBarrier cb = new CyclicBarrier(3, () -> System.out.println("barrier被觸發時呼叫,也就是所有執行緒都到時呼叫")); System.out.println("總共有" + cb.getParties() + "個執行緒\r\n"); Runnable run = () -> { try { cb.await(); TimeUnit.SECONDS.sleep(1); System.out.println("執行緒" + Thread.currentThread().getName() + "已到達一號站"); TimeUnit.SECONDS.sleep(1); cb.await(); System.out.println("執行緒" + Thread.currentThread().getName() + "已到達二號站"); TimeUnit.SECONDS.sleep(1); cb.await(); System.out.println("執行緒" + Thread.currentThread().getName() + "已到達三號站"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }; Thread t1 = new Thread(run, "t1"); Thread t2 = new Thread(run, "t2"); Thread t3 = new Thread(run, "t3"); t1.start(); t2.start(); t3.start();
※有點類似 CountDownLatch,但可以執行多次
※建構子的第二個參數可以不寫,是 overloading
Phaser
有 CountDownLatch 和 CyclicBarrier 的功能,但還可以動態增加 parties,可參考這裡※
public class PhaserTest extends Phaser { // 每個階段執行完會自動調用 @Override protected boolean onAdvance(int phase, int registeredParties) { if (phase == 0) return init(); else if (phase == 1) return one(); else if (phase == 2) return two(); else if (phase == 3) return three(); return true; // phase 應該結束就給 true } private boolean one() { System.out.println("共幾人=" + getRegisteredParties()); return false; } private boolean two() { System.out.println("共幾人=" + getRegisteredParties()); return false; } private boolean three() { System.out.println("共幾人=" + getRegisteredParties()); return false; } private boolean init() { System.out.println("共幾人=" + getRegisteredParties()); return false; } static class Runner implements Runnable { private Phaser phaser; public Runner(Phaser phaser) { this.phaser = phaser; } @Override public void run() { System.out.println("班級" + Thread.currentThread().getName() + ":已到出發站"); phaser.arriveAndAwaitAdvance(); // 全到才會繼續往下走 System.out.println("執行緒" + Thread.currentThread().getName() + "已到達1號站"); phaser.arriveAndAwaitAdvance(); System.out.println("執行緒" + Thread.currentThread().getName() + "已到達2號站"); phaser.arriveAndAwaitAdvance(); System.out.println("執行緒" + Thread.currentThread().getName() + "已到達3號站"); phaser.arriveAndAwaitAdvance(); } } public static void main(String[] args) { PhaserTest phaser = new PhaserTest(); phaser.register(); // 每註冊一次,phaser 就會多維護一個執行緒 for (int i = 0; i < 4; i++) { phaser.register();// 每註冊一次,phaser 就會多維護一個執行緒 new Thread(new Runner(phaser)).start(); } phaser.arriveAndDeregister(); } }
Semaphore
限制最多可以有幾個執行緒執行,以公共廁所為例※
final int toilet = 3;// 設定3個廁所 final int user = 30;// 使用人數 ExecutorService es = Executors.newFixedThreadPool(user); final Semaphore sp = new Semaphore(toilet, true);// 誰先等誰就先用(公平鎖) for (int i = 0; i < user; i++) { try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } Runnable runnable = () -> { try { System.out.println(Thread.currentThread().getName() + " 要求使用廁所"); sp.acquire(); // 要求上廁所,如果還不滿3人才能使用 } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println( Thread.currentThread().getName() + " 拉屎中,目前有" + (toilet - sp.availablePermits()) + "個在使用"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 在沖馬桶了"); sp.release(); // 歸還廁所 System.out.println( Thread.currentThread().getName() + " 離開,目前有" + (toilet - sp.availablePermits()) + "個在使用\r\n"); }; es.execute(runnable); } es.shutdown();
※
Exchanger
執行緒相互交換資料※
ExecutorService es = Executors.newFixedThreadPool(2); Exchanger<String> exchanger = new Exchanger<>(); es.execute(() -> { try { String in = "xxx"; System.out.println(Thread.currentThread().getName() + "將" + in + "送出去"); Thread.sleep(1000); String out = exchanger.exchange(in); System.out.println(Thread.currentThread().getName() + "取得" + out); } catch (InterruptedException e) { e.printStackTrace(); } }); es.execute(() -> { try { String in = "ooo"; System.out.println(Thread.currentThread().getName() + "將" + in + "送出去"); Thread.sleep(1000); String out = exchanger.exchange(in); System.out.println(Thread.currentThread().getName() + "取得" + out); } catch (InterruptedException e) { e.printStackTrace(); } }); es.shutdown();
※可以用偶數個執行緒,但到底是哪兩個交換資料就不一定了
沒有留言:
張貼留言