2018年6月11日 星期一

五種 synchronized


public class Synchronized {
    public static void main(String[] args) {
        new Synchronized().xxx();
    }
    
    private void xxx() {
        TestSynchronized t = new TestSynchronized();// 1.7 要加 final
    
        new Thread(() -> {
            while (true) {
                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                t.test5("0123456789");
            }
        }, "t1").start();
    
        new Thread(() -> {
            while (true) {
                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                t.test5("abcdefghij");
            }
        }, "t2").start();
    }
}
    
    
    
class TestSynchronized {
    public void test1(String data) {
        try {
            TimeUnit.SECONDS.sleep(1);
            synchronized (this) {
                for (int i = 0; i < data.length(); i++) {
                    System.out.print(data.charAt(i));
                }
                System.out.println();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public synchronized void test2(String data) {
        try {
            TimeUnit.SECONDS.sleep(1);
            for (int i = 0; i < data.length(); i++) {
                System.out.print(data.charAt(i));
            }
            System.out.println();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public void test3(String data) {
        try {
            TimeUnit.SECONDS.sleep(1);
            synchronized (TestSynchronized.class) {
                for (int i = 0; i < data.length(); i++) {
                    System.out.print(data.charAt(i));
                }
                System.out.println();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public static synchronized void test4(String data) {
        try {
            TimeUnit.SECONDS.sleep(1);
            for (int i = 0; i < data.length(); i++) {
                System.out.print(data.charAt(i));
            }
            System.out.println();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public void test5(String data) {
        try {
            String x = "";
            TimeUnit.SECONDS.sleep(1);
            synchronized (x) {
                for (int i = 0; i < data.length(); i++) {
                    System.out.print(data.charAt(i));
                    x = "ooo" + i; // 修改不影響,因為每個執行緒進來的 x 是一樣的,原子性不用加鎖
                }
                System.out.println();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

※Thread 要使用區域變數在 1.8 可以不加 final,但如果改值還是會編譯錯誤

※synchronized(this) 和寫在普通方法上一樣,只差在鎖的範圍 (wait()只能使用在這兩種);
synchronized(類.class) 和寫在靜態方法上一樣,只差在鎖的範圍
P.S 將 wait 寫在 synchronized 靜態方法裡會編譯錯誤;但如果寫在 synchronized(類.class) 會是執行期錯誤

※兩個線程的鎖必需是相同的鎖,如 xxx 方法的 t.test5() 改成 new TestSynchronized().test5(),因為有 new 了,所以就不是一樣的鎖了
但 test4 是靜態方法,所以 new 幾次都一樣

※test5 的鎖是自己寫的,內容不能改,但因為有字串池的關係,所以改也 ok;如果將「//修改不影響」這行改成 new String 也沒關係(因為每個執行緒進來是一樣的);String x 改成用 new 的方式就不行
.如果改成用 data,那傳進去的字串內容一樣也是 ok 的,因為有字串池;又如果傳進去的值是用 new String("") ,就沒有字串池了,所以就不行了

.Byte、Short、Integer、Long、Character 有 XxxCache 做快取,會在 -128~127,所以這個範圍內的鎖也像字串池一樣;我試了給 127,然後 一直+1 (或者 new Integer()),雖然已超過這個範圍但也是 ok 的(因為每個執行緒進來是一樣的);又如果一開始就超過這個範圍就不行了

※synchronized(this) 如果是鎖整個方法,那就和 synchronized 方法一樣

※synchronized(類別名稱) 如果是鎖整個方法,那就和 synchronized 靜態方法一樣


※練習一

主線程執行 3 次、子線程執行 10 次、主線程再執行 3 次、子線程再執行 10 次,如此循環 5 次

※第一步

public class Outer5Main3Sub10 {
    private void xxx() {
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                for (int i = 1; i <= 5; i++) {
                    synchronized (Outer5Main3Sub10.class) {
                        for (int j = 1; j <= 3; j++) {
                            System.out.println(Thread.currentThread().getName() + j + ",i=" + i);
                        }
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "子").start();
    
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                for (int i = 1; i <= 5; i++) {
                    synchronized (Outer5Main3Sub10.class) {
                        for (int j = 1; j <= 10; j++) {
                            System.out.println(Thread.currentThread().getName() + j + ",i=" + i);
                        }
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "主").start();
    }
    
    public static void main(String[] args) {
        new Outer5Main3Sub10().xxx();
    }
}

※先寫個互斥


※第二步

public class Outer5Main3Sub10 {
    private MainSub ms = new MainSub();
    
    private void xxx() {
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                for (int i = 1; i <= 5; i++) {
                    ms.sub(i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "子").start();
    
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                for (int i = 1; i <= 5; i++) {
                    ms.main(i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "主").start();
    }
    
    public static void main(String[] args) {
        new Outer5Main3Sub10().xxx();
    }
}
    
class MainSub {
    public synchronized void sub(int i) {
        for (int j = 1; j <= 3; j++) {
            System.out.println(Thread.currentThread().getName() + j + ",i=" + i);
        }
    }
    
    public synchronized void main(int i) {
        for (int j = 1; j <= 10; j++) {
            System.out.println(Thread.currentThread().getName() + j + ",i=" + i);
        }
    }
}

※將同步鎖放到新的類別


※第三步

public class Outer5Main3Sub10 {
    private MainSub ms = new MainSub();
    
    private void xxx() {
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                for (int i = 1; i <= 5; i++) {
                    ms.sub(i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "子").start();
    
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                for (int i = 1; i <= 5; i++) {
                    ms.main(i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "主").start();
    }
    
    public static void main(String[] args) {
        new Outer5Main3Sub10().xxx();
    }
}
    
class MainSub {
    private boolean sub = true;
    
    public synchronized void sub(int i) {
        while(!sub) { // 注意 spurious wake up,所以不要用 if
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        for (int j = 1; j <= 3; j++) {
            System.out.println(Thread.currentThread().getName() + j + ",i=" + i);
        }
         sub = false;
         this.notify();
    }
    
    public synchronized void main(int i) {
        while(sub) { // 注意 spurious wake up,所以不要用 if
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        for (int j = 1; j <= 10; j++) {
            System.out.println(Thread.currentThread().getName() + j + ",i=" + i);
        }
        sub = true;
        this.notify();
    }
}

※使用喚醒

※API 的 Object 物件的 wait 方法有說明,有時候會有 spurious wake up (偽喚醒),所以使用 while 代替 if



如果第二步不寫類別

public class Outer5Main3Sub10 {
    private boolean sub = true;
    
    private void xxx() {
        Outer5Main3Sub10 sm = new Outer5Main3Sub10();
    
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                for (int i = 1; i <= 5; i++) {
                    synchronized (sm) {
                        while (!sub) {
                            try {
                                sm.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        for (int j = 1; j <= 3; j++) {
                            System.out.println(Thread.currentThread().getName() + j + ",i=" + i);
                        }
                        sub = false;
                        sm.notify();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "子").start();
    
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                for (int i = 1; i <= 5; i++) {
                    synchronized (sm) {
                        while (sub) {
                            try {
                                sm.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        for (int j = 1; j <= 10; j++) {
                            System.out.println(Thread.currentThread().getName() + j + ",i=" + i);
                        }
                        sub = true;
                        sm.notify();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "主").start();
    }
    
    public static void main(String[] args) {
        new Outer5Main3Sub10().xxx();
    }
}




※練習二

兩個執行緒,第一個印 1~52;第二個印 A~Z,結果為 12A34B56C...5152Z

private void xxx() {
    SynchronizedTest st = new SynchronizedTest();
    
    new Thread(() -> {
        for (var i = 1; i <= 26; i++) {
            synchronized (st) {
                while (!flag) {
                    try {
                        st.wait();
                    } catch (InterruptedException e) {
                        System.err.println("t1err=" + e.getMessage());
                    }
                }
                System.out.print(i * 2 - 1 + " ");
                System.out.println(i * 2);
                System.out.println("========================");
                flag = false;
                st.notify();
            }
        }
    }).start();
    
    new Thread(() -> {
        for (var i = 1; i <= 26; i++) {
            synchronized (st) {
                while (flag) {
                    try {
                        st.wait();
                    } catch (InterruptedException e) {
                        System.err.println("t2err=" + e.getMessage());
                    }
                }
                System.out.println((char) (i + ('A' - 1)));
                System.out.println("========================");
                flag = true;
                st.notify();
            }
        }
    }).start();
}



2018年6月6日 星期三

CountDownLatch、CyclicBarrier、Semaphore、Exchanger、Phaser

CountDownLatch

執行緒的倒數計時器,全部的計時器都到了,再做想做的事
如果想知道執行緒執行完需要多少時間可以使用這個工具


CountDownLatch cdt = new CountDownLatch(3);
Runnable run = () -> {
    for (int i = 0; i < 10000000; i++) {
    }
    System.out.println(Thread.currentThread().getName());
    cdt.countDown();
};
    
Thread t1 = new Thread(run, "t1");
Thread t2 = new Thread(run, "t2");
Thread t3 = new Thread(run, "t3");
    
long start = System.currentTimeMillis();
t1.start();
t2.start();
t3.start();
    
try {
    System.out.println("wait...");
    cdt.await();
    System.out.println(System.currentTimeMillis() - start);
} catch (InterruptedException e) {
    e.printStackTrace();
}

※countDown() 將倒數計時器減1,await() 是等待全部的執行緒(此例是3)都到時才能通過



CyclicBarrier

一個班級到達玩的地點,全部都上車才能去下一個地點


CyclicBarrier cb = new CyclicBarrier(3, () -> System.out.println("barrier被觸發時呼叫,也就是所有執行緒都到時呼叫"));
System.out.println("總共有" + cb.getParties() + "個執行緒\r\n");
Runnable run = () -> {
    try {
        cb.await();
        TimeUnit.SECONDS.sleep(1);
        System.out.println("執行緒" + Thread.currentThread().getName() + "已到達一號站");
    
        TimeUnit.SECONDS.sleep(1);
        cb.await();
        System.out.println("執行緒" + Thread.currentThread().getName() + "已到達二號站");
    
        TimeUnit.SECONDS.sleep(1);
        cb.await();
        System.out.println("執行緒" + Thread.currentThread().getName() + "已到達三號站");
    } catch (InterruptedException | BrokenBarrierException e) {
        e.printStackTrace();
    }
};
    
Thread t1 = new Thread(run, "t1");
Thread t2 = new Thread(run, "t2");
Thread t3 = new Thread(run, "t3");
    
t1.start();
t2.start();
t3.start();

※有點類似 CountDownLatch,但可以執行多次

※建構子的第二個參數可以不寫,是 overloading



Phaser

有 CountDownLatch 和 CyclicBarrier 的功能,但還可以動態增加 parties,可參考這裡


public class PhaserTest extends Phaser {
    // 每個階段執行完會自動調用
    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
        if (phase == 0) return init();
        else if (phase == 1) return one();
        else if (phase == 2) return two();
        else if (phase == 3) return three();
        return true; // phase 應該結束就給 true
    }

    private boolean one() {
        System.out.println("共幾人=" + getRegisteredParties());
        return false;
    }

    private boolean two() {
        System.out.println("共幾人=" + getRegisteredParties());
        return false;
    }

    private boolean three() {
        System.out.println("共幾人=" + getRegisteredParties());
        return false;
    }

    private boolean init() {
        System.out.println("共幾人=" + getRegisteredParties());
        return false;
    }

    static class Runner implements Runnable {
        private Phaser phaser;

        public Runner(Phaser phaser) {
            this.phaser = phaser;
        }

        @Override
        public void run() {
            System.out.println("班級" + Thread.currentThread().getName() + ":已到出發站");
            phaser.arriveAndAwaitAdvance(); // 全到才會繼續往下走

            System.out.println("執行緒" + Thread.currentThread().getName() + "已到達1號站");
            phaser.arriveAndAwaitAdvance();

            System.out.println("執行緒" + Thread.currentThread().getName() + "已到達2號站");
            phaser.arriveAndAwaitAdvance();

            System.out.println("執行緒" + Thread.currentThread().getName() + "已到達3號站");
            phaser.arriveAndAwaitAdvance();

        }
    }

    public static void main(String[] args) {
        PhaserTest phaser = new PhaserTest();
        phaser.register(); // 每註冊一次,phaser 就會多維護一個執行緒
        for (int i = 0; i < 4; i++) {
            phaser.register();// 每註冊一次,phaser 就會多維護一個執行緒
            new Thread(new Runner(phaser)).start();
        }
        phaser.arriveAndDeregister();
    }
}

Semaphore

限制最多可以有幾個執行緒執行,以公共廁所為例


final int toilet = 3;// 設定3個廁所
final int user = 30;// 使用人數
ExecutorService es = Executors.newFixedThreadPool(user);
final Semaphore sp = new Semaphore(toilet, true);// 誰先等誰就先用(公平鎖)
    
for (int i = 0; i < user; i++) {
    try {
        TimeUnit.MILLISECONDS.sleep(100);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    
    Runnable runnable = () -> {
        try {
            System.out.println(Thread.currentThread().getName() + " 要求使用廁所");
            sp.acquire(); // 要求上廁所,如果還不滿3人才能使用
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        System.out.println(
                Thread.currentThread().getName() + " 拉屎中,目前有" + (toilet - sp.availablePermits()) + "個在使用");
    
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    
        System.out.println(Thread.currentThread().getName() + " 在沖馬桶了");
        sp.release(); // 歸還廁所
    
        System.out.println(
                Thread.currentThread().getName() + " 離開,目前有" + (toilet - sp.availablePermits()) + "個在使用\r\n");
    };
    es.execute(runnable);
}
es.shutdown();





Exchanger

執行緒相互交換資料


ExecutorService es = Executors.newFixedThreadPool(2);
Exchanger<String> exchanger = new Exchanger<>();
    
es.execute(() -> {
    try {
        String in = "xxx";
        System.out.println(Thread.currentThread().getName() + "將" + in + "送出去");
        Thread.sleep(1000);
        String out = exchanger.exchange(in);
        System.out.println(Thread.currentThread().getName() + "取得" + out);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});
    
es.execute(() -> {
    try {
        String in = "ooo";
        System.out.println(Thread.currentThread().getName() + "將" + in + "送出去");
        Thread.sleep(1000);
        String out = exchanger.exchange(in);
        System.out.println(Thread.currentThread().getName() + "取得" + out);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});
es.shutdown();

※可以用偶數個執行緒,但到底是哪兩個交換資料就不一定了