2018年8月11日 星期六

Executors

※基本的執行緒池

// ExecutorService es = Executors.newFixedThreadPool(3);
// ExecutorService es = Executors.newCachedThreadPool();
ExecutorService es = Executors.newSingleThreadExecutor();
    
for (int i = 1; i <= 7; i++) {
    es.execute(() -> {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int j = 1; j <= 5; j++) {
            System.out.println(Thread.currentThread().getName() + "->" + j);
        }
    });
}
es.shutdown();

※newFixedThreadPool:固定大小的執行緒
newCachedThreadPool:無限大小的執行緒
newSingleThreadExecutor:單一的執行緒

※shutdown():不增加新的執行緒,當前執行緒不會關閉
shutdownNow():不增加新的執行緒,當前執行緒也會關閉


※排程

ScheduledExecutorService ses = Executors.newScheduledThreadPool(3);
ses.schedule(() -> {
    System.out.println("yeah");
}, 2L, TimeUnit.SECONDS);
ses.shutdown();
    
ScheduledExecutorService sess = Executors.newScheduledThreadPool(3);
sess.scheduleAtFixedRate(() -> {
    System.out.println("xxx");
}, 2L, 1L, TimeUnit.SECONDS);
try {
    Thread.sleep(5000L);
    sess.shutdown();
} catch (InterruptedException e) {
    e.printStackTrace();
}
    
ScheduledExecutorService seas = Executors.newSingleThreadScheduledExecutor();
seas.scheduleAtFixedRate(() -> {
    System.out.println("xxx");
}, 2L, 1L, TimeUnit.SECONDS);
try {
    Thread.sleep(5000L);
    seas.shutdown();
} catch (InterruptedException e) {
    e.printStackTrace();
}

※newScheduledThreadPool、newSingleThreadScheduledExecutor 回傳 ScheduledExecutorService,裡面有執行一次的 schedule() 和重覆執行的 scheduleAtFixedRate()

※重覆執行要注意太早 shutdown() 就不執行了



※1.8 的 newWorkStealingPool

Executors.newWorkStealingPool();

※底層第一行就是 new ForkJoinPool,ForkJoinPool 是 1.7 增加的類別,但這個執行緒池是 1.8 才新增的,可參考這裡


※ThreadPoolExecutor

Executors 底層使用的就是這個,共有7個參數
new ThreadPoolExecutor(5,
20,
10,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

表示一開始就有 5 個執行緒可以用,最多20個
假設有30個 request 進來,5個先執行,然後25個會放在第5個參數的 queue 裡
再來15個執行緒去 queue 抓來執行,最後10個看第7個參數的放棄策略
執行完的15個執行緒,過10毫秒 (第3和4的參數) 就關閉
第六個參數是將 Runnable 包裝成 Thread

以下是第 5~7 可用的參數
BlockingQueue
ArrayBlockingQueue
DelayedWorkQueue 基於 heap 的資料結構,按照時間順序將每個任務進行排序
BlockingDeque
LinkedBlockingDeque 預設是 Integer.MAX_VALUE,所以有 OOM 的風險
SynchronousQueue 一生產就消費,如沒有消費者就阻塞,所以沒有容量大小
DelayQueue 只能消費已過期的
TransferQueue
LinkedTransferQueue 很像 SynchronousQueue,但有容量,消費者沒取到就會阻塞
LinkedBlockingQueue 預設是 Integer.MAX_VALUE,所以有 OOM 的風險
PriorityBlockingQueue 按照自定義的優先級消費
ThreadFactory
DaemonThreadFactory 創建背景執行的執行緒,之後出來的執行緒都是背景執行緒
DefaultThreadFactory 預設的執行緒工廠
PrivilegedThreadFactory 創建與當前執行緒具有相同權限的新執行緒
RejectedExecutionHandler
DiscardOldestPolicy 丟棄最老的任務,就是在 queue 裡面的
AbortPolicy 新來的任務直接丟棄並報異常
CallerRunsPolicy 同步調用,start() 調用後執行 run(),但這個策略是直接調用 run()
DiscardPolicy 新來的任務直接丟棄但不報異常 

沒有留言:

張貼留言